Added PreShutdownHook functions for endpoint reconcilers

This commit is contained in:
Robert Rati 2017-10-12 21:27:31 -04:00
parent 0840e6d869
commit 00b085ad4a
8 changed files with 208 additions and 8 deletions

View File

@ -112,6 +112,11 @@ func (c *Controller) PostStartHook(hookContext genericapiserver.PostStartHookCon
return nil return nil
} }
func (c *Controller) PreShutdownHook() error {
c.Stop()
return nil
}
// Start begins the core controller loops that must exist for bootstrapping // Start begins the core controller loops that must exist for bootstrapping
// a cluster. // a cluster.
func (c *Controller) Start() { func (c *Controller) Start() {
@ -140,6 +145,14 @@ func (c *Controller) Start() {
c.runner.Start() c.runner.Start()
} }
func (c *Controller) Stop() {
if c.runner != nil {
c.runner.Stop()
}
endpointPorts := createEndpointPortSpec(c.PublicServicePort, "https", c.ExtraEndpointPorts)
c.EndpointReconciler.StopReconciling("kubernetes", c.PublicIP, endpointPorts)
}
// RunKubernetesNamespaces periodically makes sure that all internal namespaces exist // RunKubernetesNamespaces periodically makes sure that all internal namespaces exist
func (c *Controller) RunKubernetesNamespaces(ch chan struct{}) { func (c *Controller) RunKubernetesNamespaces(ch chan struct{}) {
wait.Until(func() { wait.Until(func() {

View File

@ -372,9 +372,11 @@ func (m *Master) InstallLegacyAPI(c *completedConfig, restOptionsGetter generic.
} }
if c.ExtraConfig.EnableCoreControllers { if c.ExtraConfig.EnableCoreControllers {
controllerName := "bootstrap-controller"
coreClient := coreclient.NewForConfigOrDie(c.GenericConfig.LoopbackClientConfig) coreClient := coreclient.NewForConfigOrDie(c.GenericConfig.LoopbackClientConfig)
bootstrapController := c.NewBootstrapController(legacyRESTStorage, coreClient, coreClient) bootstrapController := c.NewBootstrapController(legacyRESTStorage, coreClient, coreClient)
m.GenericAPIServer.AddPostStartHookOrDie("bootstrap-controller", bootstrapController.PostStartHook) m.GenericAPIServer.AddPostStartHookOrDie(controllerName, bootstrapController.PostStartHook)
m.GenericAPIServer.AddPreShutdownHookOrDie(controllerName, bootstrapController.PreShutdownHook)
} }
if err := m.GenericAPIServer.InstallLegacyAPIGroup(genericapiserver.DefaultLegacyAPIPrefix, &apiGroupInfo); err != nil { if err := m.GenericAPIServer.InstallLegacyAPIGroup(genericapiserver.DefaultLegacyAPIPrefix, &apiGroupInfo); err != nil {

View File

@ -21,6 +21,7 @@ go_library(
"//vendor/k8s.io/apimachinery/pkg/runtime:go_default_library", "//vendor/k8s.io/apimachinery/pkg/runtime:go_default_library",
"//vendor/k8s.io/apiserver/pkg/endpoints/request:go_default_library", "//vendor/k8s.io/apiserver/pkg/endpoints/request:go_default_library",
"//vendor/k8s.io/apiserver/pkg/storage:go_default_library", "//vendor/k8s.io/apiserver/pkg/storage:go_default_library",
"//vendor/k8s.io/client-go/util/retry:go_default_library",
], ],
) )

View File

@ -24,6 +24,7 @@ https://github.com/openshift/origin/blob/bb340c5dd5ff72718be86fb194dedc0faed7f4c
import ( import (
"fmt" "fmt"
"net" "net"
"sync"
"time" "time"
"github.com/golang/glog" "github.com/golang/glog"
@ -44,6 +45,9 @@ type Leases interface {
// UpdateLease adds or refreshes a master's lease // UpdateLease adds or refreshes a master's lease
UpdateLease(ip string) error UpdateLease(ip string) error
// RemoveLease removes a master's lease
RemoveLease(ip string) error
} }
type storageLeases struct { type storageLeases struct {
@ -96,6 +100,11 @@ func (s *storageLeases) UpdateLease(ip string) error {
}) })
} }
// RemoveLease removes the lease on a master IP in storage
func (s *storageLeases) RemoveLease(ip string) error {
return s.storage.Delete(apirequest.NewDefaultContext(), s.baseKey+"/"+ip, &api.Endpoints{}, nil)
}
// NewLeases creates a new etcd-based Leases implementation. // NewLeases creates a new etcd-based Leases implementation.
func NewLeases(storage storage.Interface, baseKey string, leaseTime time.Duration) Leases { func NewLeases(storage storage.Interface, baseKey string, leaseTime time.Duration) Leases {
return &storageLeases{ return &storageLeases{
@ -108,6 +117,8 @@ func NewLeases(storage storage.Interface, baseKey string, leaseTime time.Duratio
type leaseEndpointReconciler struct { type leaseEndpointReconciler struct {
endpointRegistry endpoint.Registry endpointRegistry endpoint.Registry
masterLeases Leases masterLeases Leases
stopReconcilingCalled bool
reconcilingLock sync.Mutex
} }
// NewLeaseEndpointReconciler creates a new LeaseEndpoint reconciler // NewLeaseEndpointReconciler creates a new LeaseEndpoint reconciler
@ -115,6 +126,7 @@ func NewLeaseEndpointReconciler(endpointRegistry endpoint.Registry, masterLeases
return &leaseEndpointReconciler{ return &leaseEndpointReconciler{
endpointRegistry: endpointRegistry, endpointRegistry: endpointRegistry,
masterLeases: masterLeases, masterLeases: masterLeases,
stopReconcilingCalled: false,
} }
} }
@ -126,7 +138,12 @@ func NewLeaseEndpointReconciler(endpointRegistry endpoint.Registry, masterLeases
// different from the directory listing, and update the endpoints object // different from the directory listing, and update the endpoints object
// accordingly. // accordingly.
func (r *leaseEndpointReconciler) ReconcileEndpoints(serviceName string, ip net.IP, endpointPorts []api.EndpointPort, reconcilePorts bool) error { func (r *leaseEndpointReconciler) ReconcileEndpoints(serviceName string, ip net.IP, endpointPorts []api.EndpointPort, reconcilePorts bool) error {
ctx := apirequest.NewDefaultContext() r.reconcilingLock.Lock()
defer r.reconcilingLock.Unlock()
if r.stopReconcilingCalled {
return nil
}
// Refresh the TTL on our key, independently of whether any error or // Refresh the TTL on our key, independently of whether any error or
// update conflict happens below. This makes sure that at least some of // update conflict happens below. This makes sure that at least some of
@ -135,6 +152,12 @@ func (r *leaseEndpointReconciler) ReconcileEndpoints(serviceName string, ip net.
return err return err
} }
return r.doReconcile(serviceName, endpointPorts, reconcilePorts)
}
func (r *leaseEndpointReconciler) doReconcile(serviceName string, endpointPorts []api.EndpointPort, reconcilePorts bool) error {
ctx := apirequest.NewDefaultContext()
// Retrieve the current list of endpoints... // Retrieve the current list of endpoints...
e, err := r.endpointRegistry.GetEndpoints(ctx, serviceName, &metav1.GetOptions{}) e, err := r.endpointRegistry.GetEndpoints(ctx, serviceName, &metav1.GetOptions{})
if err != nil { if err != nil {
@ -249,3 +272,15 @@ func checkEndpointSubsetFormatWithLease(e *api.Endpoints, expectedIPs []string,
return true, ipsCorrect, portsCorrect return true, ipsCorrect, portsCorrect
} }
func (r *leaseEndpointReconciler) StopReconciling(serviceName string, ip net.IP, endpointPorts []api.EndpointPort) error {
r.reconcilingLock.Lock()
defer r.reconcilingLock.Unlock()
r.stopReconcilingCalled = true
if err := r.masterLeases.RemoveLease(ip.String()); err != nil {
return err
}
return r.doReconcile(serviceName, endpointPorts, true)
}

View File

@ -54,6 +54,11 @@ func (f *fakeLeases) UpdateLease(ip string) error {
return nil return nil
} }
func (f *fakeLeases) RemoveLease(ip string) error {
delete(f.keys, ip)
return nil
}
func (f *fakeLeases) SetKeys(keys []string) { func (f *fakeLeases) SetKeys(keys []string) {
for _, ip := range keys { for _, ip := range keys {
f.keys[ip] = false f.keys[ip] = false
@ -529,3 +534,100 @@ func TestLeaseEndpointReconciler(t *testing.T) {
} }
} }
} }
func TestLeaseStopReconciling(t *testing.T) {
ns := api.NamespaceDefault
om := func(name string) metav1.ObjectMeta {
return metav1.ObjectMeta{Namespace: ns, Name: name}
}
stopTests := []struct {
testName string
serviceName string
ip string
endpointPorts []api.EndpointPort
endpointKeys []string
endpoints *api.EndpointsList
expectUpdate *api.Endpoints // nil means none expected
}{
{
testName: "successful stop reconciling",
serviceName: "foo",
ip: "1.2.3.4",
endpointPorts: []api.EndpointPort{{Name: "foo", Port: 8080, Protocol: "TCP"}},
endpointKeys: []string{"1.2.3.4", "4.3.2.2", "4.3.2.3", "4.3.2.4"},
endpoints: &api.EndpointsList{
Items: []api.Endpoints{{
ObjectMeta: om("foo"),
Subsets: []api.EndpointSubset{{
Addresses: []api.EndpointAddress{
{IP: "1.2.3.4"},
{IP: "4.3.2.2"},
{IP: "4.3.2.3"},
{IP: "4.3.2.4"},
},
Ports: []api.EndpointPort{{Name: "foo", Port: 8080, Protocol: "TCP"}},
}},
}},
},
expectUpdate: &api.Endpoints{
ObjectMeta: om("foo"),
Subsets: []api.EndpointSubset{{
Addresses: []api.EndpointAddress{
{IP: "4.3.2.2"},
{IP: "4.3.2.3"},
{IP: "4.3.2.4"},
},
Ports: []api.EndpointPort{{Name: "foo", Port: 8080, Protocol: "TCP"}},
}},
},
},
{
testName: "stop reconciling with ip not in endpoint ip list",
serviceName: "foo",
ip: "5.6.7.8",
endpointPorts: []api.EndpointPort{{Name: "foo", Port: 8080, Protocol: "TCP"}},
endpointKeys: []string{"1.2.3.4", "4.3.2.2", "4.3.2.3", "4.3.2.4"},
endpoints: &api.EndpointsList{
Items: []api.Endpoints{{
ObjectMeta: om("foo"),
Subsets: []api.EndpointSubset{{
Addresses: []api.EndpointAddress{
{IP: "1.2.3.4"},
{IP: "4.3.2.2"},
{IP: "4.3.2.3"},
{IP: "4.3.2.4"},
},
Ports: []api.EndpointPort{{Name: "foo", Port: 8080, Protocol: "TCP"}},
}},
}},
},
},
}
for _, test := range stopTests {
fakeLeases := newFakeLeases()
fakeLeases.SetKeys(test.endpointKeys)
registry := &registrytest.EndpointRegistry{
Endpoints: test.endpoints,
}
r := NewLeaseEndpointReconciler(registry, fakeLeases)
err := r.StopReconciling(test.serviceName, net.ParseIP(test.ip), test.endpointPorts)
if err != nil {
t.Errorf("case %q: unexpected error: %v", test.testName, err)
}
if test.expectUpdate != nil {
if len(registry.Updates) != 1 {
t.Errorf("case %q: unexpected updates: %v", test.testName, registry.Updates)
} else if e, a := test.expectUpdate, &registry.Updates[0]; !reflect.DeepEqual(e, a) {
t.Errorf("case %q: expected update:\n%#v\ngot:\n%#v\n", test.testName, e, a)
}
}
if test.expectUpdate == nil && len(registry.Updates) > 0 {
t.Errorf("case %q: no update expected, yet saw: %v", test.testName, registry.Updates)
}
for _, key := range fakeLeases.GetUpdatedKeys() {
if key == test.ip {
t.Errorf("case %q: Found ip %s in leases but shouldn't be there", test.testName, key)
}
}
}
}

View File

@ -19,10 +19,12 @@ package reconcilers
import ( import (
"net" "net"
"sync"
"github.com/golang/glog" "github.com/golang/glog"
"k8s.io/apimachinery/pkg/api/errors" "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/client-go/util/retry"
"k8s.io/kubernetes/pkg/api" "k8s.io/kubernetes/pkg/api"
"k8s.io/kubernetes/pkg/api/endpoints" "k8s.io/kubernetes/pkg/api/endpoints"
coreclient "k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset/typed/core/internalversion" coreclient "k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset/typed/core/internalversion"
@ -33,6 +35,8 @@ import (
type masterCountEndpointReconciler struct { type masterCountEndpointReconciler struct {
masterCount int masterCount int
endpointClient coreclient.EndpointsGetter endpointClient coreclient.EndpointsGetter
stopReconcilingCalled bool
reconcilingLock sync.Mutex
} }
// NewMasterCountEndpointReconciler creates a new EndpointReconciler that reconciles based on a // NewMasterCountEndpointReconciler creates a new EndpointReconciler that reconciles based on a
@ -57,6 +61,13 @@ func NewMasterCountEndpointReconciler(masterCount int, endpointClient coreclient
// to be running (c.masterCount). // to be running (c.masterCount).
// * ReconcileEndpoints is called periodically from all apiservers. // * ReconcileEndpoints is called periodically from all apiservers.
func (r *masterCountEndpointReconciler) ReconcileEndpoints(serviceName string, ip net.IP, endpointPorts []api.EndpointPort, reconcilePorts bool) error { func (r *masterCountEndpointReconciler) ReconcileEndpoints(serviceName string, ip net.IP, endpointPorts []api.EndpointPort, reconcilePorts bool) error {
r.reconcilingLock.Lock()
defer r.reconcilingLock.Unlock()
if r.stopReconcilingCalled {
return nil
}
e, err := r.endpointClient.Endpoints(metav1.NamespaceDefault).Get(serviceName, metav1.GetOptions{}) e, err := r.endpointClient.Endpoints(metav1.NamespaceDefault).Get(serviceName, metav1.GetOptions{})
if err != nil { if err != nil {
e = &api.Endpoints{ e = &api.Endpoints{
@ -126,6 +137,36 @@ func (r *masterCountEndpointReconciler) ReconcileEndpoints(serviceName string, i
return err return err
} }
func (r *masterCountEndpointReconciler) StopReconciling(serviceName string, ip net.IP, endpointPorts []api.EndpointPort) error {
r.reconcilingLock.Lock()
defer r.reconcilingLock.Unlock()
r.stopReconcilingCalled = true
e, err := r.endpointClient.Endpoints(metav1.NamespaceDefault).Get(serviceName, metav1.GetOptions{})
if err != nil {
if errors.IsNotFound(err) {
// Endpoint doesn't exist
return nil
}
return err
}
// Remove our IP from the list of addresses
new := []api.EndpointAddress{}
for _, addr := range e.Subsets[0].Addresses {
if addr.IP != ip.String() {
new = append(new, addr)
}
}
e.Subsets[0].Addresses = new
e.Subsets = endpoints.RepackSubsets(e.Subsets)
err = retry.RetryOnConflict(retry.DefaultBackoff, func() error {
_, err := r.endpointClient.Endpoints(metav1.NamespaceDefault).Update(e)
return err
})
return err
}
// Determine if the endpoint is in the format ReconcileEndpoints expects. // Determine if the endpoint is in the format ReconcileEndpoints expects.
// //
// Return values: // Return values:

View File

@ -36,3 +36,8 @@ func NewNoneEndpointReconciler() EndpointReconciler {
func (r *noneEndpointReconciler) ReconcileEndpoints(serviceName string, ip net.IP, endpointPorts []api.EndpointPort, reconcilePorts bool) error { func (r *noneEndpointReconciler) ReconcileEndpoints(serviceName string, ip net.IP, endpointPorts []api.EndpointPort, reconcilePorts bool) error {
return nil return nil
} }
// StopReconciling noop reconcile
func (r *noneEndpointReconciler) StopReconciling(serviceName string, ip net.IP, endpointPorts []api.EndpointPort) error {
return nil
}

View File

@ -36,6 +36,7 @@ type EndpointReconciler interface {
// endpoints for their {rw, ro} services. // endpoints for their {rw, ro} services.
// * ReconcileEndpoints is called periodically from all apiservers. // * ReconcileEndpoints is called periodically from all apiservers.
ReconcileEndpoints(serviceName string, ip net.IP, endpointPorts []api.EndpointPort, reconcilePorts bool) error ReconcileEndpoints(serviceName string, ip net.IP, endpointPorts []api.EndpointPort, reconcilePorts bool) error
StopReconciling(serviceName string, ip net.IP, endpointPorts []api.EndpointPort) error
} }
// Type the reconciler type // Type the reconciler type