Merge pull request #53821 from rrati/apiserver-clean-shutdown

Automatic merge from submit-queue (batch tested with PRs 54145, 53821). If you want to cherry-pick this change to another branch, please follow the instructions <a href="https://github.com/kubernetes/community/blob/master/contributors/devel/cherry-picks.md">here</a>.

Added PreStopHooks to apiserver to allow clean shutdown.  

BootStrapController now registers a PreStopHook to clean up the kubernetes service endpoints.  The PreStopHooks allow the apiserver to shutdown cleanly under a controlled shutdown case.  The BootStrapController's PreStopHook will clean up after itself by removing the apiserver from the list of IPs in the kubernetes service.

fixes #53438
This commit is contained in:
Kubernetes Submit Queue 2017-10-19 06:50:13 -07:00 committed by GitHub
commit 78ada62c30
12 changed files with 299 additions and 12 deletions

View File

@ -112,6 +112,11 @@ func (c *Controller) PostStartHook(hookContext genericapiserver.PostStartHookCon
return nil return nil
} }
func (c *Controller) PreShutdownHook() error {
c.Stop()
return nil
}
// Start begins the core controller loops that must exist for bootstrapping // Start begins the core controller loops that must exist for bootstrapping
// a cluster. // a cluster.
func (c *Controller) Start() { func (c *Controller) Start() {
@ -140,6 +145,14 @@ func (c *Controller) Start() {
c.runner.Start() c.runner.Start()
} }
func (c *Controller) Stop() {
if c.runner != nil {
c.runner.Stop()
}
endpointPorts := createEndpointPortSpec(c.PublicServicePort, "https", c.ExtraEndpointPorts)
c.EndpointReconciler.StopReconciling("kubernetes", c.PublicIP, endpointPorts)
}
// RunKubernetesNamespaces periodically makes sure that all internal namespaces exist // RunKubernetesNamespaces periodically makes sure that all internal namespaces exist
func (c *Controller) RunKubernetesNamespaces(ch chan struct{}) { func (c *Controller) RunKubernetesNamespaces(ch chan struct{}) {
wait.Until(func() { wait.Until(func() {

View File

@ -372,9 +372,11 @@ func (m *Master) InstallLegacyAPI(c *completedConfig, restOptionsGetter generic.
} }
if c.ExtraConfig.EnableCoreControllers { if c.ExtraConfig.EnableCoreControllers {
controllerName := "bootstrap-controller"
coreClient := coreclient.NewForConfigOrDie(c.GenericConfig.LoopbackClientConfig) coreClient := coreclient.NewForConfigOrDie(c.GenericConfig.LoopbackClientConfig)
bootstrapController := c.NewBootstrapController(legacyRESTStorage, coreClient, coreClient) bootstrapController := c.NewBootstrapController(legacyRESTStorage, coreClient, coreClient)
m.GenericAPIServer.AddPostStartHookOrDie("bootstrap-controller", bootstrapController.PostStartHook) m.GenericAPIServer.AddPostStartHookOrDie(controllerName, bootstrapController.PostStartHook)
m.GenericAPIServer.AddPreShutdownHookOrDie(controllerName, bootstrapController.PreShutdownHook)
} }
if err := m.GenericAPIServer.InstallLegacyAPIGroup(genericapiserver.DefaultLegacyAPIPrefix, &apiGroupInfo); err != nil { if err := m.GenericAPIServer.InstallLegacyAPIGroup(genericapiserver.DefaultLegacyAPIPrefix, &apiGroupInfo); err != nil {

View File

@ -22,6 +22,7 @@ go_library(
"//vendor/k8s.io/apimachinery/pkg/runtime:go_default_library", "//vendor/k8s.io/apimachinery/pkg/runtime:go_default_library",
"//vendor/k8s.io/apiserver/pkg/endpoints/request:go_default_library", "//vendor/k8s.io/apiserver/pkg/endpoints/request:go_default_library",
"//vendor/k8s.io/apiserver/pkg/storage:go_default_library", "//vendor/k8s.io/apiserver/pkg/storage:go_default_library",
"//vendor/k8s.io/client-go/util/retry:go_default_library",
], ],
) )

View File

@ -24,6 +24,7 @@ https://github.com/openshift/origin/blob/bb340c5dd5ff72718be86fb194dedc0faed7f4c
import ( import (
"fmt" "fmt"
"net" "net"
"sync"
"time" "time"
"github.com/golang/glog" "github.com/golang/glog"
@ -44,6 +45,9 @@ type Leases interface {
// UpdateLease adds or refreshes a master's lease // UpdateLease adds or refreshes a master's lease
UpdateLease(ip string) error UpdateLease(ip string) error
// RemoveLease removes a master's lease
RemoveLease(ip string) error
} }
type storageLeases struct { type storageLeases struct {
@ -96,6 +100,11 @@ func (s *storageLeases) UpdateLease(ip string) error {
}) })
} }
// RemoveLease removes the lease on a master IP in storage
func (s *storageLeases) RemoveLease(ip string) error {
return s.storage.Delete(apirequest.NewDefaultContext(), s.baseKey+"/"+ip, &api.Endpoints{}, nil)
}
// NewLeases creates a new etcd-based Leases implementation. // NewLeases creates a new etcd-based Leases implementation.
func NewLeases(storage storage.Interface, baseKey string, leaseTime time.Duration) Leases { func NewLeases(storage storage.Interface, baseKey string, leaseTime time.Duration) Leases {
return &storageLeases{ return &storageLeases{
@ -106,15 +115,18 @@ func NewLeases(storage storage.Interface, baseKey string, leaseTime time.Duratio
} }
type leaseEndpointReconciler struct { type leaseEndpointReconciler struct {
endpointRegistry endpoint.Registry endpointRegistry endpoint.Registry
masterLeases Leases masterLeases Leases
stopReconcilingCalled bool
reconcilingLock sync.Mutex
} }
// NewLeaseEndpointReconciler creates a new LeaseEndpoint reconciler // NewLeaseEndpointReconciler creates a new LeaseEndpoint reconciler
func NewLeaseEndpointReconciler(endpointRegistry endpoint.Registry, masterLeases Leases) EndpointReconciler { func NewLeaseEndpointReconciler(endpointRegistry endpoint.Registry, masterLeases Leases) EndpointReconciler {
return &leaseEndpointReconciler{ return &leaseEndpointReconciler{
endpointRegistry: endpointRegistry, endpointRegistry: endpointRegistry,
masterLeases: masterLeases, masterLeases: masterLeases,
stopReconcilingCalled: false,
} }
} }
@ -126,7 +138,12 @@ func NewLeaseEndpointReconciler(endpointRegistry endpoint.Registry, masterLeases
// different from the directory listing, and update the endpoints object // different from the directory listing, and update the endpoints object
// accordingly. // accordingly.
func (r *leaseEndpointReconciler) ReconcileEndpoints(serviceName string, ip net.IP, endpointPorts []api.EndpointPort, reconcilePorts bool) error { func (r *leaseEndpointReconciler) ReconcileEndpoints(serviceName string, ip net.IP, endpointPorts []api.EndpointPort, reconcilePorts bool) error {
ctx := apirequest.NewDefaultContext() r.reconcilingLock.Lock()
defer r.reconcilingLock.Unlock()
if r.stopReconcilingCalled {
return nil
}
// Refresh the TTL on our key, independently of whether any error or // Refresh the TTL on our key, independently of whether any error or
// update conflict happens below. This makes sure that at least some of // update conflict happens below. This makes sure that at least some of
@ -135,6 +152,12 @@ func (r *leaseEndpointReconciler) ReconcileEndpoints(serviceName string, ip net.
return err return err
} }
return r.doReconcile(serviceName, endpointPorts, reconcilePorts)
}
func (r *leaseEndpointReconciler) doReconcile(serviceName string, endpointPorts []api.EndpointPort, reconcilePorts bool) error {
ctx := apirequest.NewDefaultContext()
// Retrieve the current list of endpoints... // Retrieve the current list of endpoints...
e, err := r.endpointRegistry.GetEndpoints(ctx, serviceName, &metav1.GetOptions{}) e, err := r.endpointRegistry.GetEndpoints(ctx, serviceName, &metav1.GetOptions{})
if err != nil { if err != nil {
@ -249,3 +272,15 @@ func checkEndpointSubsetFormatWithLease(e *api.Endpoints, expectedIPs []string,
return true, ipsCorrect, portsCorrect return true, ipsCorrect, portsCorrect
} }
func (r *leaseEndpointReconciler) StopReconciling(serviceName string, ip net.IP, endpointPorts []api.EndpointPort) error {
r.reconcilingLock.Lock()
defer r.reconcilingLock.Unlock()
r.stopReconcilingCalled = true
if err := r.masterLeases.RemoveLease(ip.String()); err != nil {
return err
}
return r.doReconcile(serviceName, endpointPorts, true)
}

View File

@ -54,6 +54,11 @@ func (f *fakeLeases) UpdateLease(ip string) error {
return nil return nil
} }
func (f *fakeLeases) RemoveLease(ip string) error {
delete(f.keys, ip)
return nil
}
func (f *fakeLeases) SetKeys(keys []string) { func (f *fakeLeases) SetKeys(keys []string) {
for _, ip := range keys { for _, ip := range keys {
f.keys[ip] = false f.keys[ip] = false
@ -529,3 +534,100 @@ func TestLeaseEndpointReconciler(t *testing.T) {
} }
} }
} }
func TestLeaseStopReconciling(t *testing.T) {
ns := api.NamespaceDefault
om := func(name string) metav1.ObjectMeta {
return metav1.ObjectMeta{Namespace: ns, Name: name}
}
stopTests := []struct {
testName string
serviceName string
ip string
endpointPorts []api.EndpointPort
endpointKeys []string
endpoints *api.EndpointsList
expectUpdate *api.Endpoints // nil means none expected
}{
{
testName: "successful stop reconciling",
serviceName: "foo",
ip: "1.2.3.4",
endpointPorts: []api.EndpointPort{{Name: "foo", Port: 8080, Protocol: "TCP"}},
endpointKeys: []string{"1.2.3.4", "4.3.2.2", "4.3.2.3", "4.3.2.4"},
endpoints: &api.EndpointsList{
Items: []api.Endpoints{{
ObjectMeta: om("foo"),
Subsets: []api.EndpointSubset{{
Addresses: []api.EndpointAddress{
{IP: "1.2.3.4"},
{IP: "4.3.2.2"},
{IP: "4.3.2.3"},
{IP: "4.3.2.4"},
},
Ports: []api.EndpointPort{{Name: "foo", Port: 8080, Protocol: "TCP"}},
}},
}},
},
expectUpdate: &api.Endpoints{
ObjectMeta: om("foo"),
Subsets: []api.EndpointSubset{{
Addresses: []api.EndpointAddress{
{IP: "4.3.2.2"},
{IP: "4.3.2.3"},
{IP: "4.3.2.4"},
},
Ports: []api.EndpointPort{{Name: "foo", Port: 8080, Protocol: "TCP"}},
}},
},
},
{
testName: "stop reconciling with ip not in endpoint ip list",
serviceName: "foo",
ip: "5.6.7.8",
endpointPorts: []api.EndpointPort{{Name: "foo", Port: 8080, Protocol: "TCP"}},
endpointKeys: []string{"1.2.3.4", "4.3.2.2", "4.3.2.3", "4.3.2.4"},
endpoints: &api.EndpointsList{
Items: []api.Endpoints{{
ObjectMeta: om("foo"),
Subsets: []api.EndpointSubset{{
Addresses: []api.EndpointAddress{
{IP: "1.2.3.4"},
{IP: "4.3.2.2"},
{IP: "4.3.2.3"},
{IP: "4.3.2.4"},
},
Ports: []api.EndpointPort{{Name: "foo", Port: 8080, Protocol: "TCP"}},
}},
}},
},
},
}
for _, test := range stopTests {
fakeLeases := newFakeLeases()
fakeLeases.SetKeys(test.endpointKeys)
registry := &registrytest.EndpointRegistry{
Endpoints: test.endpoints,
}
r := NewLeaseEndpointReconciler(registry, fakeLeases)
err := r.StopReconciling(test.serviceName, net.ParseIP(test.ip), test.endpointPorts)
if err != nil {
t.Errorf("case %q: unexpected error: %v", test.testName, err)
}
if test.expectUpdate != nil {
if len(registry.Updates) != 1 {
t.Errorf("case %q: unexpected updates: %v", test.testName, registry.Updates)
} else if e, a := test.expectUpdate, &registry.Updates[0]; !reflect.DeepEqual(e, a) {
t.Errorf("case %q: expected update:\n%#v\ngot:\n%#v\n", test.testName, e, a)
}
}
if test.expectUpdate == nil && len(registry.Updates) > 0 {
t.Errorf("case %q: no update expected, yet saw: %v", test.testName, registry.Updates)
}
for _, key := range fakeLeases.GetUpdatedKeys() {
if key == test.ip {
t.Errorf("case %q: Found ip %s in leases but shouldn't be there", test.testName, key)
}
}
}
}

View File

@ -19,10 +19,12 @@ package reconcilers
import ( import (
"net" "net"
"sync"
"github.com/golang/glog" "github.com/golang/glog"
"k8s.io/apimachinery/pkg/api/errors" "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/client-go/util/retry"
"k8s.io/kubernetes/pkg/api" "k8s.io/kubernetes/pkg/api"
"k8s.io/kubernetes/pkg/api/endpoints" "k8s.io/kubernetes/pkg/api/endpoints"
coreclient "k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset/typed/core/internalversion" coreclient "k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset/typed/core/internalversion"
@ -31,8 +33,10 @@ import (
// masterCountEndpointReconciler reconciles endpoints based on a specified expected number of // masterCountEndpointReconciler reconciles endpoints based on a specified expected number of
// masters. masterCountEndpointReconciler implements EndpointReconciler. // masters. masterCountEndpointReconciler implements EndpointReconciler.
type masterCountEndpointReconciler struct { type masterCountEndpointReconciler struct {
masterCount int masterCount int
endpointClient coreclient.EndpointsGetter endpointClient coreclient.EndpointsGetter
stopReconcilingCalled bool
reconcilingLock sync.Mutex
} }
// NewMasterCountEndpointReconciler creates a new EndpointReconciler that reconciles based on a // NewMasterCountEndpointReconciler creates a new EndpointReconciler that reconciles based on a
@ -57,6 +61,13 @@ func NewMasterCountEndpointReconciler(masterCount int, endpointClient coreclient
// to be running (c.masterCount). // to be running (c.masterCount).
// * ReconcileEndpoints is called periodically from all apiservers. // * ReconcileEndpoints is called periodically from all apiservers.
func (r *masterCountEndpointReconciler) ReconcileEndpoints(serviceName string, ip net.IP, endpointPorts []api.EndpointPort, reconcilePorts bool) error { func (r *masterCountEndpointReconciler) ReconcileEndpoints(serviceName string, ip net.IP, endpointPorts []api.EndpointPort, reconcilePorts bool) error {
r.reconcilingLock.Lock()
defer r.reconcilingLock.Unlock()
if r.stopReconcilingCalled {
return nil
}
e, err := r.endpointClient.Endpoints(metav1.NamespaceDefault).Get(serviceName, metav1.GetOptions{}) e, err := r.endpointClient.Endpoints(metav1.NamespaceDefault).Get(serviceName, metav1.GetOptions{})
if err != nil { if err != nil {
e = &api.Endpoints{ e = &api.Endpoints{
@ -126,6 +137,36 @@ func (r *masterCountEndpointReconciler) ReconcileEndpoints(serviceName string, i
return err return err
} }
func (r *masterCountEndpointReconciler) StopReconciling(serviceName string, ip net.IP, endpointPorts []api.EndpointPort) error {
r.reconcilingLock.Lock()
defer r.reconcilingLock.Unlock()
r.stopReconcilingCalled = true
e, err := r.endpointClient.Endpoints(metav1.NamespaceDefault).Get(serviceName, metav1.GetOptions{})
if err != nil {
if errors.IsNotFound(err) {
// Endpoint doesn't exist
return nil
}
return err
}
// Remove our IP from the list of addresses
new := []api.EndpointAddress{}
for _, addr := range e.Subsets[0].Addresses {
if addr.IP != ip.String() {
new = append(new, addr)
}
}
e.Subsets[0].Addresses = new
e.Subsets = endpoints.RepackSubsets(e.Subsets)
err = retry.RetryOnConflict(retry.DefaultBackoff, func() error {
_, err := r.endpointClient.Endpoints(metav1.NamespaceDefault).Update(e)
return err
})
return err
}
// Determine if the endpoint is in the format ReconcileEndpoints expects. // Determine if the endpoint is in the format ReconcileEndpoints expects.
// //
// Return values: // Return values:

View File

@ -36,3 +36,8 @@ func NewNoneEndpointReconciler() EndpointReconciler {
func (r *noneEndpointReconciler) ReconcileEndpoints(serviceName string, ip net.IP, endpointPorts []api.EndpointPort, reconcilePorts bool) error { func (r *noneEndpointReconciler) ReconcileEndpoints(serviceName string, ip net.IP, endpointPorts []api.EndpointPort, reconcilePorts bool) error {
return nil return nil
} }
// StopReconciling noop reconcile
func (r *noneEndpointReconciler) StopReconciling(serviceName string, ip net.IP, endpointPorts []api.EndpointPort) error {
return nil
}

View File

@ -36,6 +36,7 @@ type EndpointReconciler interface {
// endpoints for their {rw, ro} services. // endpoints for their {rw, ro} services.
// * ReconcileEndpoints is called periodically from all apiservers. // * ReconcileEndpoints is called periodically from all apiservers.
ReconcileEndpoints(serviceName string, ip net.IP, endpointPorts []api.EndpointPort, reconcilePorts bool) error ReconcileEndpoints(serviceName string, ip net.IP, endpointPorts []api.EndpointPort, reconcilePorts bool) error
StopReconciling(serviceName string, ip net.IP, endpointPorts []api.EndpointPort) error
} }
// Type the reconciler type // Type the reconciler type

View File

@ -77,6 +77,7 @@ go_library(
"//vendor/k8s.io/apimachinery/pkg/runtime:go_default_library", "//vendor/k8s.io/apimachinery/pkg/runtime:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/runtime/schema:go_default_library", "//vendor/k8s.io/apimachinery/pkg/runtime/schema:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/runtime/serializer:go_default_library", "//vendor/k8s.io/apimachinery/pkg/runtime/serializer:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/util/errors:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/util/runtime:go_default_library", "//vendor/k8s.io/apimachinery/pkg/util/runtime:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/util/sets:go_default_library", "//vendor/k8s.io/apimachinery/pkg/util/sets:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/util/validation:go_default_library", "//vendor/k8s.io/apimachinery/pkg/util/validation:go_default_library",

View File

@ -460,6 +460,7 @@ func (c completedConfig) New(name string, delegationTarget DelegationTarget) (*G
openAPIConfig: c.OpenAPIConfig, openAPIConfig: c.OpenAPIConfig,
postStartHooks: map[string]postStartHookEntry{}, postStartHooks: map[string]postStartHookEntry{},
preShutdownHooks: map[string]preShutdownHookEntry{},
disabledPostStartHooks: c.DisabledPostStartHooks, disabledPostStartHooks: c.DisabledPostStartHooks,
healthzChecks: c.HealthzChecks, healthzChecks: c.HealthzChecks,
@ -473,8 +474,12 @@ func (c completedConfig) New(name string, delegationTarget DelegationTarget) (*G
s.postStartHooks[k] = v s.postStartHooks[k] = v
} }
for k, v := range delegationTarget.PreShutdownHooks() {
s.preShutdownHooks[k] = v
}
genericApiServerHookName := "generic-apiserver-start-informers" genericApiServerHookName := "generic-apiserver-start-informers"
if c.SharedInformerFactory != nil && !s.isHookRegistered(genericApiServerHookName) { if c.SharedInformerFactory != nil && !s.isPostStartHookRegistered(genericApiServerHookName) {
err := s.AddPostStartHook(genericApiServerHookName, func(context PostStartHookContext) error { err := s.AddPostStartHook(genericApiServerHookName, func(context PostStartHookContext) error {
c.SharedInformerFactory.Start(context.StopCh) c.SharedInformerFactory.Start(context.StopCh)
return nil return nil

View File

@ -134,6 +134,10 @@ type GenericAPIServer struct {
postStartHooksCalled bool postStartHooksCalled bool
disabledPostStartHooks sets.String disabledPostStartHooks sets.String
preShutdownHookLock sync.Mutex
preShutdownHooks map[string]preShutdownHookEntry
preShutdownHooksCalled bool
// healthz checks // healthz checks
healthzLock sync.Mutex healthzLock sync.Mutex
healthzChecks []healthz.HealthzChecker healthzChecks []healthz.HealthzChecker
@ -163,6 +167,9 @@ type DelegationTarget interface {
// PostStartHooks returns the post-start hooks that need to be combined // PostStartHooks returns the post-start hooks that need to be combined
PostStartHooks() map[string]postStartHookEntry PostStartHooks() map[string]postStartHookEntry
// PreShutdownHooks returns the pre-stop hooks that need to be combined
PreShutdownHooks() map[string]preShutdownHookEntry
// HealthzChecks returns the healthz checks that need to be combined // HealthzChecks returns the healthz checks that need to be combined
HealthzChecks() []healthz.HealthzChecker HealthzChecks() []healthz.HealthzChecker
@ -180,6 +187,9 @@ func (s *GenericAPIServer) UnprotectedHandler() http.Handler {
func (s *GenericAPIServer) PostStartHooks() map[string]postStartHookEntry { func (s *GenericAPIServer) PostStartHooks() map[string]postStartHookEntry {
return s.postStartHooks return s.postStartHooks
} }
func (s *GenericAPIServer) PreShutdownHooks() map[string]preShutdownHookEntry {
return s.preShutdownHooks
}
func (s *GenericAPIServer) HealthzChecks() []healthz.HealthzChecker { func (s *GenericAPIServer) HealthzChecks() []healthz.HealthzChecker {
return s.healthzChecks return s.healthzChecks
} }
@ -205,6 +215,9 @@ func (s emptyDelegate) UnprotectedHandler() http.Handler {
func (s emptyDelegate) PostStartHooks() map[string]postStartHookEntry { func (s emptyDelegate) PostStartHooks() map[string]postStartHookEntry {
return map[string]postStartHookEntry{} return map[string]postStartHookEntry{}
} }
func (s emptyDelegate) PreShutdownHooks() map[string]preShutdownHookEntry {
return map[string]preShutdownHookEntry{}
}
func (s emptyDelegate) HealthzChecks() []healthz.HealthzChecker { func (s emptyDelegate) HealthzChecks() []healthz.HealthzChecker {
return []healthz.HealthzChecker{} return []healthz.HealthzChecker{}
} }
@ -264,7 +277,7 @@ func (s preparedGenericAPIServer) Run(stopCh <-chan struct{}) error {
s.GenericAPIServer.AuditBackend.Shutdown() s.GenericAPIServer.AuditBackend.Shutdown()
} }
return nil return s.RunPreShutdownHooks()
} }
// NonBlockingRun spawns the secure http server. An error is // NonBlockingRun spawns the secure http server. An error is

View File

@ -23,6 +23,7 @@ import (
"github.com/golang/glog" "github.com/golang/glog"
utilerrors "k8s.io/apimachinery/pkg/util/errors"
utilruntime "k8s.io/apimachinery/pkg/util/runtime" utilruntime "k8s.io/apimachinery/pkg/util/runtime"
"k8s.io/apiserver/pkg/server/healthz" "k8s.io/apiserver/pkg/server/healthz"
restclient "k8s.io/client-go/rest" restclient "k8s.io/client-go/rest"
@ -39,6 +40,9 @@ import (
// until it becomes easier to use. // until it becomes easier to use.
type PostStartHookFunc func(context PostStartHookContext) error type PostStartHookFunc func(context PostStartHookContext) error
// PreShutdownHookFunc is a function that can be added to the shutdown logic.
type PreShutdownHookFunc func() error
// PostStartHookContext provides information about this API server to a PostStartHookFunc // PostStartHookContext provides information about this API server to a PostStartHookFunc
type PostStartHookContext struct { type PostStartHookContext struct {
// LoopbackClientConfig is a config for a privileged loopback connection to the API server // LoopbackClientConfig is a config for a privileged loopback connection to the API server
@ -59,6 +63,10 @@ type postStartHookEntry struct {
done chan struct{} done chan struct{}
} }
type preShutdownHookEntry struct {
hook PreShutdownHookFunc
}
// AddPostStartHook allows you to add a PostStartHook. // AddPostStartHook allows you to add a PostStartHook.
func (s *GenericAPIServer) AddPostStartHook(name string, hook PostStartHookFunc) error { func (s *GenericAPIServer) AddPostStartHook(name string, hook PostStartHookFunc) error {
if len(name) == 0 { if len(name) == 0 {
@ -97,6 +105,37 @@ func (s *GenericAPIServer) AddPostStartHookOrDie(name string, hook PostStartHook
} }
} }
// AddPreShutdownHook allows you to add a PreShutdownHook.
func (s *GenericAPIServer) AddPreShutdownHook(name string, hook PreShutdownHookFunc) error {
if len(name) == 0 {
return fmt.Errorf("missing name")
}
if hook == nil {
return nil
}
s.preShutdownHookLock.Lock()
defer s.preShutdownHookLock.Unlock()
if s.preShutdownHooksCalled {
return fmt.Errorf("unable to add %q because PreShutdownHooks have already been called", name)
}
if _, exists := s.preShutdownHooks[name]; exists {
return fmt.Errorf("unable to add %q because it is already registered", name)
}
s.preShutdownHooks[name] = preShutdownHookEntry{hook: hook}
return nil
}
// AddPreShutdownHookOrDie allows you to add a PostStartHook, but dies on failure
func (s *GenericAPIServer) AddPreShutdownHookOrDie(name string, hook PreShutdownHookFunc) {
if err := s.AddPreShutdownHook(name, hook); err != nil {
glog.Fatalf("Error registering PreShutdownHook %q: %v", name, err)
}
}
// RunPostStartHooks runs the PostStartHooks for the server // RunPostStartHooks runs the PostStartHooks for the server
func (s *GenericAPIServer) RunPostStartHooks(stopCh <-chan struct{}) { func (s *GenericAPIServer) RunPostStartHooks(stopCh <-chan struct{}) {
s.postStartHookLock.Lock() s.postStartHookLock.Lock()
@ -113,8 +152,24 @@ func (s *GenericAPIServer) RunPostStartHooks(stopCh <-chan struct{}) {
} }
} }
// isHookRegistered checks whether a given hook is registered // RunPreShutdownHooks runs the PreShutdownHooks for the server
func (s *GenericAPIServer) isHookRegistered(name string) bool { func (s *GenericAPIServer) RunPreShutdownHooks() error {
var errorList []error
s.preShutdownHookLock.Lock()
defer s.preShutdownHookLock.Unlock()
s.preShutdownHooksCalled = true
for hookName, hookEntry := range s.preShutdownHooks {
if err := runPreShutdownHook(hookName, hookEntry); err != nil {
errorList = append(errorList, err)
}
}
return utilerrors.NewAggregate(errorList)
}
// isPostStartHookRegistered checks whether a given PostStartHook is registered
func (s *GenericAPIServer) isPostStartHookRegistered(name string) bool {
s.postStartHookLock.Lock() s.postStartHookLock.Lock()
defer s.postStartHookLock.Unlock() defer s.postStartHookLock.Unlock()
_, exists := s.postStartHooks[name] _, exists := s.postStartHooks[name]
@ -135,6 +190,19 @@ func runPostStartHook(name string, entry postStartHookEntry, context PostStartHo
close(entry.done) close(entry.done)
} }
func runPreShutdownHook(name string, entry preShutdownHookEntry) error {
var err error
func() {
// don't let the hook *accidentally* panic and kill the server
defer utilruntime.HandleCrash()
err = entry.hook()
}()
if err != nil {
return fmt.Errorf("PreShutdownHook %q failed: %v", name, err)
}
return nil
}
// postStartHookHealthz implements a healthz check for poststarthooks. It will return a "hookNotFinished" // postStartHookHealthz implements a healthz check for poststarthooks. It will return a "hookNotFinished"
// error until the poststarthook is finished. // error until the poststarthook is finished.
type postStartHookHealthz struct { type postStartHookHealthz struct {