diff --git a/pkg/master/controller.go b/pkg/master/controller.go index 158b5d1f6f0..c0bd6c02508 100644 --- a/pkg/master/controller.go +++ b/pkg/master/controller.go @@ -112,6 +112,11 @@ func (c *Controller) PostStartHook(hookContext genericapiserver.PostStartHookCon return nil } +func (c *Controller) PreShutdownHook() error { + c.Stop() + return nil +} + // Start begins the core controller loops that must exist for bootstrapping // a cluster. func (c *Controller) Start() { @@ -140,6 +145,14 @@ func (c *Controller) Start() { c.runner.Start() } +func (c *Controller) Stop() { + if c.runner != nil { + c.runner.Stop() + } + endpointPorts := createEndpointPortSpec(c.PublicServicePort, "https", c.ExtraEndpointPorts) + c.EndpointReconciler.StopReconciling("kubernetes", c.PublicIP, endpointPorts) +} + // RunKubernetesNamespaces periodically makes sure that all internal namespaces exist func (c *Controller) RunKubernetesNamespaces(ch chan struct{}) { wait.Until(func() { diff --git a/pkg/master/master.go b/pkg/master/master.go index b5a559dfd05..d3da15fef75 100644 --- a/pkg/master/master.go +++ b/pkg/master/master.go @@ -372,9 +372,11 @@ func (m *Master) InstallLegacyAPI(c *completedConfig, restOptionsGetter generic. } if c.ExtraConfig.EnableCoreControllers { + controllerName := "bootstrap-controller" coreClient := coreclient.NewForConfigOrDie(c.GenericConfig.LoopbackClientConfig) bootstrapController := c.NewBootstrapController(legacyRESTStorage, coreClient, coreClient) - m.GenericAPIServer.AddPostStartHookOrDie("bootstrap-controller", bootstrapController.PostStartHook) + m.GenericAPIServer.AddPostStartHookOrDie(controllerName, bootstrapController.PostStartHook) + m.GenericAPIServer.AddPreShutdownHookOrDie(controllerName, bootstrapController.PreShutdownHook) } if err := m.GenericAPIServer.InstallLegacyAPIGroup(genericapiserver.DefaultLegacyAPIPrefix, &apiGroupInfo); err != nil { diff --git a/pkg/master/reconcilers/BUILD b/pkg/master/reconcilers/BUILD index f24446b9798..ae48372d4f0 100644 --- a/pkg/master/reconcilers/BUILD +++ b/pkg/master/reconcilers/BUILD @@ -22,6 +22,7 @@ go_library( "//vendor/k8s.io/apimachinery/pkg/runtime:go_default_library", "//vendor/k8s.io/apiserver/pkg/endpoints/request:go_default_library", "//vendor/k8s.io/apiserver/pkg/storage:go_default_library", + "//vendor/k8s.io/client-go/util/retry:go_default_library", ], ) diff --git a/pkg/master/reconcilers/lease.go b/pkg/master/reconcilers/lease.go index 75e8d64c4a0..87fd033c420 100644 --- a/pkg/master/reconcilers/lease.go +++ b/pkg/master/reconcilers/lease.go @@ -24,6 +24,7 @@ https://github.com/openshift/origin/blob/bb340c5dd5ff72718be86fb194dedc0faed7f4c import ( "fmt" "net" + "sync" "time" "github.com/golang/glog" @@ -44,6 +45,9 @@ type Leases interface { // UpdateLease adds or refreshes a master's lease UpdateLease(ip string) error + + // RemoveLease removes a master's lease + RemoveLease(ip string) error } type storageLeases struct { @@ -96,6 +100,11 @@ func (s *storageLeases) UpdateLease(ip string) error { }) } +// RemoveLease removes the lease on a master IP in storage +func (s *storageLeases) RemoveLease(ip string) error { + return s.storage.Delete(apirequest.NewDefaultContext(), s.baseKey+"/"+ip, &api.Endpoints{}, nil) +} + // NewLeases creates a new etcd-based Leases implementation. func NewLeases(storage storage.Interface, baseKey string, leaseTime time.Duration) Leases { return &storageLeases{ @@ -106,15 +115,18 @@ func NewLeases(storage storage.Interface, baseKey string, leaseTime time.Duratio } type leaseEndpointReconciler struct { - endpointRegistry endpoint.Registry - masterLeases Leases + endpointRegistry endpoint.Registry + masterLeases Leases + stopReconcilingCalled bool + reconcilingLock sync.Mutex } // NewLeaseEndpointReconciler creates a new LeaseEndpoint reconciler func NewLeaseEndpointReconciler(endpointRegistry endpoint.Registry, masterLeases Leases) EndpointReconciler { return &leaseEndpointReconciler{ - endpointRegistry: endpointRegistry, - masterLeases: masterLeases, + endpointRegistry: endpointRegistry, + masterLeases: masterLeases, + stopReconcilingCalled: false, } } @@ -126,7 +138,12 @@ func NewLeaseEndpointReconciler(endpointRegistry endpoint.Registry, masterLeases // different from the directory listing, and update the endpoints object // accordingly. func (r *leaseEndpointReconciler) ReconcileEndpoints(serviceName string, ip net.IP, endpointPorts []api.EndpointPort, reconcilePorts bool) error { - ctx := apirequest.NewDefaultContext() + r.reconcilingLock.Lock() + defer r.reconcilingLock.Unlock() + + if r.stopReconcilingCalled { + return nil + } // Refresh the TTL on our key, independently of whether any error or // update conflict happens below. This makes sure that at least some of @@ -135,6 +152,12 @@ func (r *leaseEndpointReconciler) ReconcileEndpoints(serviceName string, ip net. return err } + return r.doReconcile(serviceName, endpointPorts, reconcilePorts) +} + +func (r *leaseEndpointReconciler) doReconcile(serviceName string, endpointPorts []api.EndpointPort, reconcilePorts bool) error { + ctx := apirequest.NewDefaultContext() + // Retrieve the current list of endpoints... e, err := r.endpointRegistry.GetEndpoints(ctx, serviceName, &metav1.GetOptions{}) if err != nil { @@ -249,3 +272,15 @@ func checkEndpointSubsetFormatWithLease(e *api.Endpoints, expectedIPs []string, return true, ipsCorrect, portsCorrect } + +func (r *leaseEndpointReconciler) StopReconciling(serviceName string, ip net.IP, endpointPorts []api.EndpointPort) error { + r.reconcilingLock.Lock() + defer r.reconcilingLock.Unlock() + r.stopReconcilingCalled = true + + if err := r.masterLeases.RemoveLease(ip.String()); err != nil { + return err + } + + return r.doReconcile(serviceName, endpointPorts, true) +} diff --git a/pkg/master/reconcilers/lease_test.go b/pkg/master/reconcilers/lease_test.go index 5f2c66ee1e4..1cdab4e4b53 100644 --- a/pkg/master/reconcilers/lease_test.go +++ b/pkg/master/reconcilers/lease_test.go @@ -54,6 +54,11 @@ func (f *fakeLeases) UpdateLease(ip string) error { return nil } +func (f *fakeLeases) RemoveLease(ip string) error { + delete(f.keys, ip) + return nil +} + func (f *fakeLeases) SetKeys(keys []string) { for _, ip := range keys { f.keys[ip] = false @@ -529,3 +534,100 @@ func TestLeaseEndpointReconciler(t *testing.T) { } } } + +func TestLeaseStopReconciling(t *testing.T) { + ns := api.NamespaceDefault + om := func(name string) metav1.ObjectMeta { + return metav1.ObjectMeta{Namespace: ns, Name: name} + } + stopTests := []struct { + testName string + serviceName string + ip string + endpointPorts []api.EndpointPort + endpointKeys []string + endpoints *api.EndpointsList + expectUpdate *api.Endpoints // nil means none expected + }{ + { + testName: "successful stop reconciling", + serviceName: "foo", + ip: "1.2.3.4", + endpointPorts: []api.EndpointPort{{Name: "foo", Port: 8080, Protocol: "TCP"}}, + endpointKeys: []string{"1.2.3.4", "4.3.2.2", "4.3.2.3", "4.3.2.4"}, + endpoints: &api.EndpointsList{ + Items: []api.Endpoints{{ + ObjectMeta: om("foo"), + Subsets: []api.EndpointSubset{{ + Addresses: []api.EndpointAddress{ + {IP: "1.2.3.4"}, + {IP: "4.3.2.2"}, + {IP: "4.3.2.3"}, + {IP: "4.3.2.4"}, + }, + Ports: []api.EndpointPort{{Name: "foo", Port: 8080, Protocol: "TCP"}}, + }}, + }}, + }, + expectUpdate: &api.Endpoints{ + ObjectMeta: om("foo"), + Subsets: []api.EndpointSubset{{ + Addresses: []api.EndpointAddress{ + {IP: "4.3.2.2"}, + {IP: "4.3.2.3"}, + {IP: "4.3.2.4"}, + }, + Ports: []api.EndpointPort{{Name: "foo", Port: 8080, Protocol: "TCP"}}, + }}, + }, + }, + { + testName: "stop reconciling with ip not in endpoint ip list", + serviceName: "foo", + ip: "5.6.7.8", + endpointPorts: []api.EndpointPort{{Name: "foo", Port: 8080, Protocol: "TCP"}}, + endpointKeys: []string{"1.2.3.4", "4.3.2.2", "4.3.2.3", "4.3.2.4"}, + endpoints: &api.EndpointsList{ + Items: []api.Endpoints{{ + ObjectMeta: om("foo"), + Subsets: []api.EndpointSubset{{ + Addresses: []api.EndpointAddress{ + {IP: "1.2.3.4"}, + {IP: "4.3.2.2"}, + {IP: "4.3.2.3"}, + {IP: "4.3.2.4"}, + }, + Ports: []api.EndpointPort{{Name: "foo", Port: 8080, Protocol: "TCP"}}, + }}, + }}, + }, + }, + } + for _, test := range stopTests { + fakeLeases := newFakeLeases() + fakeLeases.SetKeys(test.endpointKeys) + registry := ®istrytest.EndpointRegistry{ + Endpoints: test.endpoints, + } + r := NewLeaseEndpointReconciler(registry, fakeLeases) + err := r.StopReconciling(test.serviceName, net.ParseIP(test.ip), test.endpointPorts) + if err != nil { + t.Errorf("case %q: unexpected error: %v", test.testName, err) + } + if test.expectUpdate != nil { + if len(registry.Updates) != 1 { + t.Errorf("case %q: unexpected updates: %v", test.testName, registry.Updates) + } else if e, a := test.expectUpdate, ®istry.Updates[0]; !reflect.DeepEqual(e, a) { + t.Errorf("case %q: expected update:\n%#v\ngot:\n%#v\n", test.testName, e, a) + } + } + if test.expectUpdate == nil && len(registry.Updates) > 0 { + t.Errorf("case %q: no update expected, yet saw: %v", test.testName, registry.Updates) + } + for _, key := range fakeLeases.GetUpdatedKeys() { + if key == test.ip { + t.Errorf("case %q: Found ip %s in leases but shouldn't be there", test.testName, key) + } + } + } +} diff --git a/pkg/master/reconcilers/mastercount.go b/pkg/master/reconcilers/mastercount.go index 5984fec2d60..8d5493e59dc 100644 --- a/pkg/master/reconcilers/mastercount.go +++ b/pkg/master/reconcilers/mastercount.go @@ -19,10 +19,12 @@ package reconcilers import ( "net" + "sync" "github.com/golang/glog" "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/client-go/util/retry" "k8s.io/kubernetes/pkg/api" "k8s.io/kubernetes/pkg/api/endpoints" coreclient "k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset/typed/core/internalversion" @@ -31,8 +33,10 @@ import ( // masterCountEndpointReconciler reconciles endpoints based on a specified expected number of // masters. masterCountEndpointReconciler implements EndpointReconciler. type masterCountEndpointReconciler struct { - masterCount int - endpointClient coreclient.EndpointsGetter + masterCount int + endpointClient coreclient.EndpointsGetter + stopReconcilingCalled bool + reconcilingLock sync.Mutex } // NewMasterCountEndpointReconciler creates a new EndpointReconciler that reconciles based on a @@ -57,6 +61,13 @@ func NewMasterCountEndpointReconciler(masterCount int, endpointClient coreclient // to be running (c.masterCount). // * ReconcileEndpoints is called periodically from all apiservers. func (r *masterCountEndpointReconciler) ReconcileEndpoints(serviceName string, ip net.IP, endpointPorts []api.EndpointPort, reconcilePorts bool) error { + r.reconcilingLock.Lock() + defer r.reconcilingLock.Unlock() + + if r.stopReconcilingCalled { + return nil + } + e, err := r.endpointClient.Endpoints(metav1.NamespaceDefault).Get(serviceName, metav1.GetOptions{}) if err != nil { e = &api.Endpoints{ @@ -126,6 +137,36 @@ func (r *masterCountEndpointReconciler) ReconcileEndpoints(serviceName string, i return err } +func (r *masterCountEndpointReconciler) StopReconciling(serviceName string, ip net.IP, endpointPorts []api.EndpointPort) error { + r.reconcilingLock.Lock() + defer r.reconcilingLock.Unlock() + r.stopReconcilingCalled = true + + e, err := r.endpointClient.Endpoints(metav1.NamespaceDefault).Get(serviceName, metav1.GetOptions{}) + if err != nil { + if errors.IsNotFound(err) { + // Endpoint doesn't exist + return nil + } + return err + } + + // Remove our IP from the list of addresses + new := []api.EndpointAddress{} + for _, addr := range e.Subsets[0].Addresses { + if addr.IP != ip.String() { + new = append(new, addr) + } + } + e.Subsets[0].Addresses = new + e.Subsets = endpoints.RepackSubsets(e.Subsets) + err = retry.RetryOnConflict(retry.DefaultBackoff, func() error { + _, err := r.endpointClient.Endpoints(metav1.NamespaceDefault).Update(e) + return err + }) + return err +} + // Determine if the endpoint is in the format ReconcileEndpoints expects. // // Return values: diff --git a/pkg/master/reconcilers/none.go b/pkg/master/reconcilers/none.go index 33e65ab7db0..ca73e1a5913 100644 --- a/pkg/master/reconcilers/none.go +++ b/pkg/master/reconcilers/none.go @@ -36,3 +36,8 @@ func NewNoneEndpointReconciler() EndpointReconciler { func (r *noneEndpointReconciler) ReconcileEndpoints(serviceName string, ip net.IP, endpointPorts []api.EndpointPort, reconcilePorts bool) error { return nil } + +// StopReconciling noop reconcile +func (r *noneEndpointReconciler) StopReconciling(serviceName string, ip net.IP, endpointPorts []api.EndpointPort) error { + return nil +} diff --git a/pkg/master/reconcilers/reconcilers.go b/pkg/master/reconcilers/reconcilers.go index 346fa8fb529..abaabaa03c0 100644 --- a/pkg/master/reconcilers/reconcilers.go +++ b/pkg/master/reconcilers/reconcilers.go @@ -36,6 +36,7 @@ type EndpointReconciler interface { // endpoints for their {rw, ro} services. // * ReconcileEndpoints is called periodically from all apiservers. ReconcileEndpoints(serviceName string, ip net.IP, endpointPorts []api.EndpointPort, reconcilePorts bool) error + StopReconciling(serviceName string, ip net.IP, endpointPorts []api.EndpointPort) error } // Type the reconciler type diff --git a/staging/src/k8s.io/apiserver/pkg/server/BUILD b/staging/src/k8s.io/apiserver/pkg/server/BUILD index f0e3ac3d480..353862cfc41 100644 --- a/staging/src/k8s.io/apiserver/pkg/server/BUILD +++ b/staging/src/k8s.io/apiserver/pkg/server/BUILD @@ -77,6 +77,7 @@ go_library( "//vendor/k8s.io/apimachinery/pkg/runtime:go_default_library", "//vendor/k8s.io/apimachinery/pkg/runtime/schema:go_default_library", "//vendor/k8s.io/apimachinery/pkg/runtime/serializer:go_default_library", + "//vendor/k8s.io/apimachinery/pkg/util/errors:go_default_library", "//vendor/k8s.io/apimachinery/pkg/util/runtime:go_default_library", "//vendor/k8s.io/apimachinery/pkg/util/sets:go_default_library", "//vendor/k8s.io/apimachinery/pkg/util/validation:go_default_library", diff --git a/staging/src/k8s.io/apiserver/pkg/server/config.go b/staging/src/k8s.io/apiserver/pkg/server/config.go index ebf621636be..e1c81c7d4a3 100644 --- a/staging/src/k8s.io/apiserver/pkg/server/config.go +++ b/staging/src/k8s.io/apiserver/pkg/server/config.go @@ -460,6 +460,7 @@ func (c completedConfig) New(name string, delegationTarget DelegationTarget) (*G openAPIConfig: c.OpenAPIConfig, postStartHooks: map[string]postStartHookEntry{}, + preShutdownHooks: map[string]preShutdownHookEntry{}, disabledPostStartHooks: c.DisabledPostStartHooks, healthzChecks: c.HealthzChecks, @@ -473,8 +474,12 @@ func (c completedConfig) New(name string, delegationTarget DelegationTarget) (*G s.postStartHooks[k] = v } + for k, v := range delegationTarget.PreShutdownHooks() { + s.preShutdownHooks[k] = v + } + genericApiServerHookName := "generic-apiserver-start-informers" - if c.SharedInformerFactory != nil && !s.isHookRegistered(genericApiServerHookName) { + if c.SharedInformerFactory != nil && !s.isPostStartHookRegistered(genericApiServerHookName) { err := s.AddPostStartHook(genericApiServerHookName, func(context PostStartHookContext) error { c.SharedInformerFactory.Start(context.StopCh) return nil diff --git a/staging/src/k8s.io/apiserver/pkg/server/genericapiserver.go b/staging/src/k8s.io/apiserver/pkg/server/genericapiserver.go index ad24415bc03..1927f7a8a0d 100644 --- a/staging/src/k8s.io/apiserver/pkg/server/genericapiserver.go +++ b/staging/src/k8s.io/apiserver/pkg/server/genericapiserver.go @@ -134,6 +134,10 @@ type GenericAPIServer struct { postStartHooksCalled bool disabledPostStartHooks sets.String + preShutdownHookLock sync.Mutex + preShutdownHooks map[string]preShutdownHookEntry + preShutdownHooksCalled bool + // healthz checks healthzLock sync.Mutex healthzChecks []healthz.HealthzChecker @@ -163,6 +167,9 @@ type DelegationTarget interface { // PostStartHooks returns the post-start hooks that need to be combined PostStartHooks() map[string]postStartHookEntry + // PreShutdownHooks returns the pre-stop hooks that need to be combined + PreShutdownHooks() map[string]preShutdownHookEntry + // HealthzChecks returns the healthz checks that need to be combined HealthzChecks() []healthz.HealthzChecker @@ -180,6 +187,9 @@ func (s *GenericAPIServer) UnprotectedHandler() http.Handler { func (s *GenericAPIServer) PostStartHooks() map[string]postStartHookEntry { return s.postStartHooks } +func (s *GenericAPIServer) PreShutdownHooks() map[string]preShutdownHookEntry { + return s.preShutdownHooks +} func (s *GenericAPIServer) HealthzChecks() []healthz.HealthzChecker { return s.healthzChecks } @@ -205,6 +215,9 @@ func (s emptyDelegate) UnprotectedHandler() http.Handler { func (s emptyDelegate) PostStartHooks() map[string]postStartHookEntry { return map[string]postStartHookEntry{} } +func (s emptyDelegate) PreShutdownHooks() map[string]preShutdownHookEntry { + return map[string]preShutdownHookEntry{} +} func (s emptyDelegate) HealthzChecks() []healthz.HealthzChecker { return []healthz.HealthzChecker{} } @@ -264,7 +277,7 @@ func (s preparedGenericAPIServer) Run(stopCh <-chan struct{}) error { s.GenericAPIServer.AuditBackend.Shutdown() } - return nil + return s.RunPreShutdownHooks() } // NonBlockingRun spawns the secure http server. An error is diff --git a/staging/src/k8s.io/apiserver/pkg/server/hooks.go b/staging/src/k8s.io/apiserver/pkg/server/hooks.go index a190f562202..ccf8ee17ad0 100644 --- a/staging/src/k8s.io/apiserver/pkg/server/hooks.go +++ b/staging/src/k8s.io/apiserver/pkg/server/hooks.go @@ -23,6 +23,7 @@ import ( "github.com/golang/glog" + utilerrors "k8s.io/apimachinery/pkg/util/errors" utilruntime "k8s.io/apimachinery/pkg/util/runtime" "k8s.io/apiserver/pkg/server/healthz" restclient "k8s.io/client-go/rest" @@ -39,6 +40,9 @@ import ( // until it becomes easier to use. type PostStartHookFunc func(context PostStartHookContext) error +// PreShutdownHookFunc is a function that can be added to the shutdown logic. +type PreShutdownHookFunc func() error + // PostStartHookContext provides information about this API server to a PostStartHookFunc type PostStartHookContext struct { // LoopbackClientConfig is a config for a privileged loopback connection to the API server @@ -59,6 +63,10 @@ type postStartHookEntry struct { done chan struct{} } +type preShutdownHookEntry struct { + hook PreShutdownHookFunc +} + // AddPostStartHook allows you to add a PostStartHook. func (s *GenericAPIServer) AddPostStartHook(name string, hook PostStartHookFunc) error { if len(name) == 0 { @@ -97,6 +105,37 @@ func (s *GenericAPIServer) AddPostStartHookOrDie(name string, hook PostStartHook } } +// AddPreShutdownHook allows you to add a PreShutdownHook. +func (s *GenericAPIServer) AddPreShutdownHook(name string, hook PreShutdownHookFunc) error { + if len(name) == 0 { + return fmt.Errorf("missing name") + } + if hook == nil { + return nil + } + + s.preShutdownHookLock.Lock() + defer s.preShutdownHookLock.Unlock() + + if s.preShutdownHooksCalled { + return fmt.Errorf("unable to add %q because PreShutdownHooks have already been called", name) + } + if _, exists := s.preShutdownHooks[name]; exists { + return fmt.Errorf("unable to add %q because it is already registered", name) + } + + s.preShutdownHooks[name] = preShutdownHookEntry{hook: hook} + + return nil +} + +// AddPreShutdownHookOrDie allows you to add a PostStartHook, but dies on failure +func (s *GenericAPIServer) AddPreShutdownHookOrDie(name string, hook PreShutdownHookFunc) { + if err := s.AddPreShutdownHook(name, hook); err != nil { + glog.Fatalf("Error registering PreShutdownHook %q: %v", name, err) + } +} + // RunPostStartHooks runs the PostStartHooks for the server func (s *GenericAPIServer) RunPostStartHooks(stopCh <-chan struct{}) { s.postStartHookLock.Lock() @@ -113,8 +152,24 @@ func (s *GenericAPIServer) RunPostStartHooks(stopCh <-chan struct{}) { } } -// isHookRegistered checks whether a given hook is registered -func (s *GenericAPIServer) isHookRegistered(name string) bool { +// RunPreShutdownHooks runs the PreShutdownHooks for the server +func (s *GenericAPIServer) RunPreShutdownHooks() error { + var errorList []error + + s.preShutdownHookLock.Lock() + defer s.preShutdownHookLock.Unlock() + s.preShutdownHooksCalled = true + + for hookName, hookEntry := range s.preShutdownHooks { + if err := runPreShutdownHook(hookName, hookEntry); err != nil { + errorList = append(errorList, err) + } + } + return utilerrors.NewAggregate(errorList) +} + +// isPostStartHookRegistered checks whether a given PostStartHook is registered +func (s *GenericAPIServer) isPostStartHookRegistered(name string) bool { s.postStartHookLock.Lock() defer s.postStartHookLock.Unlock() _, exists := s.postStartHooks[name] @@ -135,6 +190,19 @@ func runPostStartHook(name string, entry postStartHookEntry, context PostStartHo close(entry.done) } +func runPreShutdownHook(name string, entry preShutdownHookEntry) error { + var err error + func() { + // don't let the hook *accidentally* panic and kill the server + defer utilruntime.HandleCrash() + err = entry.hook() + }() + if err != nil { + return fmt.Errorf("PreShutdownHook %q failed: %v", name, err) + } + return nil +} + // postStartHookHealthz implements a healthz check for poststarthooks. It will return a "hookNotFinished" // error until the poststarthook is finished. type postStartHookHealthz struct {