diff --git a/pkg/master/controller.go b/pkg/master/controller.go index 158b5d1f6f0..c0bd6c02508 100644 --- a/pkg/master/controller.go +++ b/pkg/master/controller.go @@ -112,6 +112,11 @@ func (c *Controller) PostStartHook(hookContext genericapiserver.PostStartHookCon return nil } +func (c *Controller) PreShutdownHook() error { + c.Stop() + return nil +} + // Start begins the core controller loops that must exist for bootstrapping // a cluster. func (c *Controller) Start() { @@ -140,6 +145,14 @@ func (c *Controller) Start() { c.runner.Start() } +func (c *Controller) Stop() { + if c.runner != nil { + c.runner.Stop() + } + endpointPorts := createEndpointPortSpec(c.PublicServicePort, "https", c.ExtraEndpointPorts) + c.EndpointReconciler.StopReconciling("kubernetes", c.PublicIP, endpointPorts) +} + // RunKubernetesNamespaces periodically makes sure that all internal namespaces exist func (c *Controller) RunKubernetesNamespaces(ch chan struct{}) { wait.Until(func() { diff --git a/pkg/master/master.go b/pkg/master/master.go index 6e3dfe248c4..6e9fb382bd8 100644 --- a/pkg/master/master.go +++ b/pkg/master/master.go @@ -372,9 +372,11 @@ func (m *Master) InstallLegacyAPI(c *completedConfig, restOptionsGetter generic. } if c.ExtraConfig.EnableCoreControllers { + controllerName := "bootstrap-controller" coreClient := coreclient.NewForConfigOrDie(c.GenericConfig.LoopbackClientConfig) bootstrapController := c.NewBootstrapController(legacyRESTStorage, coreClient, coreClient) - m.GenericAPIServer.AddPostStartHookOrDie("bootstrap-controller", bootstrapController.PostStartHook) + m.GenericAPIServer.AddPostStartHookOrDie(controllerName, bootstrapController.PostStartHook) + m.GenericAPIServer.AddPreShutdownHookOrDie(controllerName, bootstrapController.PreShutdownHook) } if err := m.GenericAPIServer.InstallLegacyAPIGroup(genericapiserver.DefaultLegacyAPIPrefix, &apiGroupInfo); err != nil { diff --git a/pkg/master/reconcilers/BUILD b/pkg/master/reconcilers/BUILD index 5030537eb72..2761dd92ef9 100644 --- a/pkg/master/reconcilers/BUILD +++ b/pkg/master/reconcilers/BUILD @@ -21,6 +21,7 @@ go_library( "//vendor/k8s.io/apimachinery/pkg/runtime:go_default_library", "//vendor/k8s.io/apiserver/pkg/endpoints/request:go_default_library", "//vendor/k8s.io/apiserver/pkg/storage:go_default_library", + "//vendor/k8s.io/client-go/util/retry:go_default_library", ], ) diff --git a/pkg/master/reconcilers/lease.go b/pkg/master/reconcilers/lease.go index 75e8d64c4a0..87fd033c420 100644 --- a/pkg/master/reconcilers/lease.go +++ b/pkg/master/reconcilers/lease.go @@ -24,6 +24,7 @@ https://github.com/openshift/origin/blob/bb340c5dd5ff72718be86fb194dedc0faed7f4c import ( "fmt" "net" + "sync" "time" "github.com/golang/glog" @@ -44,6 +45,9 @@ type Leases interface { // UpdateLease adds or refreshes a master's lease UpdateLease(ip string) error + + // RemoveLease removes a master's lease + RemoveLease(ip string) error } type storageLeases struct { @@ -96,6 +100,11 @@ func (s *storageLeases) UpdateLease(ip string) error { }) } +// RemoveLease removes the lease on a master IP in storage +func (s *storageLeases) RemoveLease(ip string) error { + return s.storage.Delete(apirequest.NewDefaultContext(), s.baseKey+"/"+ip, &api.Endpoints{}, nil) +} + // NewLeases creates a new etcd-based Leases implementation. func NewLeases(storage storage.Interface, baseKey string, leaseTime time.Duration) Leases { return &storageLeases{ @@ -106,15 +115,18 @@ func NewLeases(storage storage.Interface, baseKey string, leaseTime time.Duratio } type leaseEndpointReconciler struct { - endpointRegistry endpoint.Registry - masterLeases Leases + endpointRegistry endpoint.Registry + masterLeases Leases + stopReconcilingCalled bool + reconcilingLock sync.Mutex } // NewLeaseEndpointReconciler creates a new LeaseEndpoint reconciler func NewLeaseEndpointReconciler(endpointRegistry endpoint.Registry, masterLeases Leases) EndpointReconciler { return &leaseEndpointReconciler{ - endpointRegistry: endpointRegistry, - masterLeases: masterLeases, + endpointRegistry: endpointRegistry, + masterLeases: masterLeases, + stopReconcilingCalled: false, } } @@ -126,7 +138,12 @@ func NewLeaseEndpointReconciler(endpointRegistry endpoint.Registry, masterLeases // different from the directory listing, and update the endpoints object // accordingly. func (r *leaseEndpointReconciler) ReconcileEndpoints(serviceName string, ip net.IP, endpointPorts []api.EndpointPort, reconcilePorts bool) error { - ctx := apirequest.NewDefaultContext() + r.reconcilingLock.Lock() + defer r.reconcilingLock.Unlock() + + if r.stopReconcilingCalled { + return nil + } // Refresh the TTL on our key, independently of whether any error or // update conflict happens below. This makes sure that at least some of @@ -135,6 +152,12 @@ func (r *leaseEndpointReconciler) ReconcileEndpoints(serviceName string, ip net. return err } + return r.doReconcile(serviceName, endpointPorts, reconcilePorts) +} + +func (r *leaseEndpointReconciler) doReconcile(serviceName string, endpointPorts []api.EndpointPort, reconcilePorts bool) error { + ctx := apirequest.NewDefaultContext() + // Retrieve the current list of endpoints... e, err := r.endpointRegistry.GetEndpoints(ctx, serviceName, &metav1.GetOptions{}) if err != nil { @@ -249,3 +272,15 @@ func checkEndpointSubsetFormatWithLease(e *api.Endpoints, expectedIPs []string, return true, ipsCorrect, portsCorrect } + +func (r *leaseEndpointReconciler) StopReconciling(serviceName string, ip net.IP, endpointPorts []api.EndpointPort) error { + r.reconcilingLock.Lock() + defer r.reconcilingLock.Unlock() + r.stopReconcilingCalled = true + + if err := r.masterLeases.RemoveLease(ip.String()); err != nil { + return err + } + + return r.doReconcile(serviceName, endpointPorts, true) +} diff --git a/pkg/master/reconcilers/lease_test.go b/pkg/master/reconcilers/lease_test.go index 5f2c66ee1e4..1cdab4e4b53 100644 --- a/pkg/master/reconcilers/lease_test.go +++ b/pkg/master/reconcilers/lease_test.go @@ -54,6 +54,11 @@ func (f *fakeLeases) UpdateLease(ip string) error { return nil } +func (f *fakeLeases) RemoveLease(ip string) error { + delete(f.keys, ip) + return nil +} + func (f *fakeLeases) SetKeys(keys []string) { for _, ip := range keys { f.keys[ip] = false @@ -529,3 +534,100 @@ func TestLeaseEndpointReconciler(t *testing.T) { } } } + +func TestLeaseStopReconciling(t *testing.T) { + ns := api.NamespaceDefault + om := func(name string) metav1.ObjectMeta { + return metav1.ObjectMeta{Namespace: ns, Name: name} + } + stopTests := []struct { + testName string + serviceName string + ip string + endpointPorts []api.EndpointPort + endpointKeys []string + endpoints *api.EndpointsList + expectUpdate *api.Endpoints // nil means none expected + }{ + { + testName: "successful stop reconciling", + serviceName: "foo", + ip: "1.2.3.4", + endpointPorts: []api.EndpointPort{{Name: "foo", Port: 8080, Protocol: "TCP"}}, + endpointKeys: []string{"1.2.3.4", "4.3.2.2", "4.3.2.3", "4.3.2.4"}, + endpoints: &api.EndpointsList{ + Items: []api.Endpoints{{ + ObjectMeta: om("foo"), + Subsets: []api.EndpointSubset{{ + Addresses: []api.EndpointAddress{ + {IP: "1.2.3.4"}, + {IP: "4.3.2.2"}, + {IP: "4.3.2.3"}, + {IP: "4.3.2.4"}, + }, + Ports: []api.EndpointPort{{Name: "foo", Port: 8080, Protocol: "TCP"}}, + }}, + }}, + }, + expectUpdate: &api.Endpoints{ + ObjectMeta: om("foo"), + Subsets: []api.EndpointSubset{{ + Addresses: []api.EndpointAddress{ + {IP: "4.3.2.2"}, + {IP: "4.3.2.3"}, + {IP: "4.3.2.4"}, + }, + Ports: []api.EndpointPort{{Name: "foo", Port: 8080, Protocol: "TCP"}}, + }}, + }, + }, + { + testName: "stop reconciling with ip not in endpoint ip list", + serviceName: "foo", + ip: "5.6.7.8", + endpointPorts: []api.EndpointPort{{Name: "foo", Port: 8080, Protocol: "TCP"}}, + endpointKeys: []string{"1.2.3.4", "4.3.2.2", "4.3.2.3", "4.3.2.4"}, + endpoints: &api.EndpointsList{ + Items: []api.Endpoints{{ + ObjectMeta: om("foo"), + Subsets: []api.EndpointSubset{{ + Addresses: []api.EndpointAddress{ + {IP: "1.2.3.4"}, + {IP: "4.3.2.2"}, + {IP: "4.3.2.3"}, + {IP: "4.3.2.4"}, + }, + Ports: []api.EndpointPort{{Name: "foo", Port: 8080, Protocol: "TCP"}}, + }}, + }}, + }, + }, + } + for _, test := range stopTests { + fakeLeases := newFakeLeases() + fakeLeases.SetKeys(test.endpointKeys) + registry := ®istrytest.EndpointRegistry{ + Endpoints: test.endpoints, + } + r := NewLeaseEndpointReconciler(registry, fakeLeases) + err := r.StopReconciling(test.serviceName, net.ParseIP(test.ip), test.endpointPorts) + if err != nil { + t.Errorf("case %q: unexpected error: %v", test.testName, err) + } + if test.expectUpdate != nil { + if len(registry.Updates) != 1 { + t.Errorf("case %q: unexpected updates: %v", test.testName, registry.Updates) + } else if e, a := test.expectUpdate, ®istry.Updates[0]; !reflect.DeepEqual(e, a) { + t.Errorf("case %q: expected update:\n%#v\ngot:\n%#v\n", test.testName, e, a) + } + } + if test.expectUpdate == nil && len(registry.Updates) > 0 { + t.Errorf("case %q: no update expected, yet saw: %v", test.testName, registry.Updates) + } + for _, key := range fakeLeases.GetUpdatedKeys() { + if key == test.ip { + t.Errorf("case %q: Found ip %s in leases but shouldn't be there", test.testName, key) + } + } + } +} diff --git a/pkg/master/reconcilers/mastercount.go b/pkg/master/reconcilers/mastercount.go index 5984fec2d60..8d5493e59dc 100644 --- a/pkg/master/reconcilers/mastercount.go +++ b/pkg/master/reconcilers/mastercount.go @@ -19,10 +19,12 @@ package reconcilers import ( "net" + "sync" "github.com/golang/glog" "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/client-go/util/retry" "k8s.io/kubernetes/pkg/api" "k8s.io/kubernetes/pkg/api/endpoints" coreclient "k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset/typed/core/internalversion" @@ -31,8 +33,10 @@ import ( // masterCountEndpointReconciler reconciles endpoints based on a specified expected number of // masters. masterCountEndpointReconciler implements EndpointReconciler. type masterCountEndpointReconciler struct { - masterCount int - endpointClient coreclient.EndpointsGetter + masterCount int + endpointClient coreclient.EndpointsGetter + stopReconcilingCalled bool + reconcilingLock sync.Mutex } // NewMasterCountEndpointReconciler creates a new EndpointReconciler that reconciles based on a @@ -57,6 +61,13 @@ func NewMasterCountEndpointReconciler(masterCount int, endpointClient coreclient // to be running (c.masterCount). // * ReconcileEndpoints is called periodically from all apiservers. func (r *masterCountEndpointReconciler) ReconcileEndpoints(serviceName string, ip net.IP, endpointPorts []api.EndpointPort, reconcilePorts bool) error { + r.reconcilingLock.Lock() + defer r.reconcilingLock.Unlock() + + if r.stopReconcilingCalled { + return nil + } + e, err := r.endpointClient.Endpoints(metav1.NamespaceDefault).Get(serviceName, metav1.GetOptions{}) if err != nil { e = &api.Endpoints{ @@ -126,6 +137,36 @@ func (r *masterCountEndpointReconciler) ReconcileEndpoints(serviceName string, i return err } +func (r *masterCountEndpointReconciler) StopReconciling(serviceName string, ip net.IP, endpointPorts []api.EndpointPort) error { + r.reconcilingLock.Lock() + defer r.reconcilingLock.Unlock() + r.stopReconcilingCalled = true + + e, err := r.endpointClient.Endpoints(metav1.NamespaceDefault).Get(serviceName, metav1.GetOptions{}) + if err != nil { + if errors.IsNotFound(err) { + // Endpoint doesn't exist + return nil + } + return err + } + + // Remove our IP from the list of addresses + new := []api.EndpointAddress{} + for _, addr := range e.Subsets[0].Addresses { + if addr.IP != ip.String() { + new = append(new, addr) + } + } + e.Subsets[0].Addresses = new + e.Subsets = endpoints.RepackSubsets(e.Subsets) + err = retry.RetryOnConflict(retry.DefaultBackoff, func() error { + _, err := r.endpointClient.Endpoints(metav1.NamespaceDefault).Update(e) + return err + }) + return err +} + // Determine if the endpoint is in the format ReconcileEndpoints expects. // // Return values: diff --git a/pkg/master/reconcilers/none.go b/pkg/master/reconcilers/none.go index 33e65ab7db0..ca73e1a5913 100644 --- a/pkg/master/reconcilers/none.go +++ b/pkg/master/reconcilers/none.go @@ -36,3 +36,8 @@ func NewNoneEndpointReconciler() EndpointReconciler { func (r *noneEndpointReconciler) ReconcileEndpoints(serviceName string, ip net.IP, endpointPorts []api.EndpointPort, reconcilePorts bool) error { return nil } + +// StopReconciling noop reconcile +func (r *noneEndpointReconciler) StopReconciling(serviceName string, ip net.IP, endpointPorts []api.EndpointPort) error { + return nil +} diff --git a/pkg/master/reconcilers/reconcilers.go b/pkg/master/reconcilers/reconcilers.go index 346fa8fb529..abaabaa03c0 100644 --- a/pkg/master/reconcilers/reconcilers.go +++ b/pkg/master/reconcilers/reconcilers.go @@ -36,6 +36,7 @@ type EndpointReconciler interface { // endpoints for their {rw, ro} services. // * ReconcileEndpoints is called periodically from all apiservers. ReconcileEndpoints(serviceName string, ip net.IP, endpointPorts []api.EndpointPort, reconcilePorts bool) error + StopReconciling(serviceName string, ip net.IP, endpointPorts []api.EndpointPort) error } // Type the reconciler type