Merge pull request #116685 from czybjtu/fix_lease_remove_endpoints

Remove last endpoint for kubernetes Service during graceful shutdown of final kube-apiserver
This commit is contained in:
Kubernetes Prow Robot 2023-04-28 06:02:16 -07:00 committed by GitHub
commit f66e1a3386
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 188 additions and 22 deletions

View File

@ -26,6 +26,7 @@ import (
"net"
"path"
"sync"
"sync/atomic"
"time"
"k8s.io/klog/v2"
@ -92,6 +93,7 @@ func (s *storageLeases) ListLeases() ([]string, error) {
}
// UpdateLease resets the TTL on a master IP in storage
// UpdateLease will create a new key if it doesn't exist.
func (s *storageLeases) UpdateLease(ip string) error {
key := path.Join(s.baseKey, ip)
return s.storage.GuaranteedUpdate(apirequest.NewDefaultContext(), key, &corev1.Endpoints{}, true, nil, func(input kruntime.Object, respMeta storage.ResponseMeta) (kruntime.Object, *uint64, error) {
@ -146,16 +148,15 @@ func NewLeases(config *storagebackend.ConfigForResource, baseKey string, leaseTi
type leaseEndpointReconciler struct {
epAdapter EndpointsAdapter
masterLeases Leases
stopReconcilingCalled bool
stopReconcilingCalled atomic.Bool
reconcilingLock sync.Mutex
}
// NewLeaseEndpointReconciler creates a new LeaseEndpoint reconciler
func NewLeaseEndpointReconciler(epAdapter EndpointsAdapter, masterLeases Leases) EndpointReconciler {
return &leaseEndpointReconciler{
epAdapter: epAdapter,
masterLeases: masterLeases,
stopReconcilingCalled: false,
epAdapter: epAdapter,
masterLeases: masterLeases,
}
}
@ -167,13 +168,15 @@ func NewLeaseEndpointReconciler(epAdapter EndpointsAdapter, masterLeases Leases)
// different from the directory listing, and update the endpoints object
// accordingly.
func (r *leaseEndpointReconciler) ReconcileEndpoints(serviceName string, ip net.IP, endpointPorts []corev1.EndpointPort, reconcilePorts bool) error {
r.reconcilingLock.Lock()
defer r.reconcilingLock.Unlock()
if r.stopReconcilingCalled {
// reconcile endpoints only if apiserver was not shutdown
if r.stopReconcilingCalled.Load() {
return nil
}
// Ensure that there will be no race condition with the RemoveEndpoints.
r.reconcilingLock.Lock()
defer r.reconcilingLock.Unlock()
// Refresh the TTL on our key, independently of whether any error or
// update conflict happens below. This makes sure that at least some of
// the masters will add our endpoint.
@ -184,6 +187,8 @@ func (r *leaseEndpointReconciler) ReconcileEndpoints(serviceName string, ip net.
return r.doReconcile(serviceName, endpointPorts, reconcilePorts)
}
// doReconcile can be called from ReconcileEndpoints() or RemoveEndpoints().
// it is NOT SAFE to call it from multiple goroutines.
func (r *leaseEndpointReconciler) doReconcile(serviceName string, endpointPorts []corev1.EndpointPort, reconcilePorts bool) error {
e, err := r.epAdapter.Get(corev1.NamespaceDefault, serviceName, metav1.GetOptions{})
shouldCreate := false
@ -192,6 +197,11 @@ func (r *leaseEndpointReconciler) doReconcile(serviceName string, endpointPorts
return err
}
// there are no endpoints and we should stop reconciling
if r.stopReconcilingCalled.Load() {
return nil
}
shouldCreate = true
e = &corev1.Endpoints{
ObjectMeta: metav1.ObjectMeta{
@ -210,8 +220,10 @@ func (r *leaseEndpointReconciler) doReconcile(serviceName string, endpointPorts
// Since we just refreshed our own key, assume that zero endpoints
// returned from storage indicates an issue or invalid state, and thus do
// not update the endpoints list based on the result.
if len(masterIPs) == 0 {
return fmt.Errorf("no master IPs were listed in storage, refusing to erase all endpoints for the kubernetes service")
// If the controller was ordered to stop and is this is the last apiserver
// we keep going to remove our endpoint before shutting down.
if !r.stopReconcilingCalled.Load() && len(masterIPs) == 0 {
return fmt.Errorf("no API server IP addresses were listed in storage, refusing to erase all endpoints for the kubernetes Service")
}
// Don't use the EndpointSliceMirroring controller to mirror this to
@ -243,7 +255,7 @@ func (r *leaseEndpointReconciler) doReconcile(serviceName string, endpointPorts
e.Subsets = endpointsv1.RepackSubsets(e.Subsets)
}
if !portsCorrect {
if len(e.Subsets) != 0 && !portsCorrect {
// Reset ports.
e.Subsets[0].Ports = endpointPorts
}
@ -313,6 +325,10 @@ func checkEndpointSubsetFormatWithLease(e *corev1.Endpoints, expectedIPs []strin
}
func (r *leaseEndpointReconciler) RemoveEndpoints(serviceName string, ip net.IP, endpointPorts []corev1.EndpointPort) error {
// Ensure that there will be no race condition with the ReconcileEndpoints.
r.reconcilingLock.Lock()
defer r.reconcilingLock.Unlock()
if err := r.masterLeases.RemoveLease(ip.String()); err != nil {
return err
}
@ -321,9 +337,7 @@ func (r *leaseEndpointReconciler) RemoveEndpoints(serviceName string, ip net.IP,
}
func (r *leaseEndpointReconciler) StopReconciling() {
r.reconcilingLock.Lock()
defer r.reconcilingLock.Unlock()
r.stopReconcilingCalled = true
r.stopReconcilingCalled.Store(true)
}
func (r *leaseEndpointReconciler) Destroy() {

View File

@ -465,15 +465,27 @@ func TestLeaseRemoveEndpoints(t *testing.T) {
t.Cleanup(dFunc)
stopTests := []struct {
testName string
serviceName string
ip string
endpointPorts []corev1.EndpointPort
endpointKeys []string
initialState []runtime.Object
expectUpdate []runtime.Object
expectLeases []string
testName string
serviceName string
ip string
endpointPorts []corev1.EndpointPort
endpointKeys []string
initialState []runtime.Object
expectUpdate []runtime.Object
expectLeases []string
apiServerStartup bool
}{
{
testName: "successful remove previous endpoints before apiserver starts",
serviceName: "foo",
ip: "1.2.3.4",
endpointPorts: []corev1.EndpointPort{{Name: "foo", Port: 8080, Protocol: "TCP"}},
endpointKeys: []string{"1.2.3.4", "4.3.2.2", "4.3.2.3", "4.3.2.4"},
initialState: makeEndpointsArray("foo", []string{"1.2.3.4", "4.3.2.2", "4.3.2.3", "4.3.2.4"}, []corev1.EndpointPort{{Name: "foo", Port: 8080, Protocol: "TCP"}}),
expectUpdate: makeEndpointsArray("foo", []string{"4.3.2.2", "4.3.2.3", "4.3.2.4"}, []corev1.EndpointPort{{Name: "foo", Port: 8080, Protocol: "TCP"}}),
expectLeases: []string{"4.3.2.2", "4.3.2.3", "4.3.2.4"},
apiServerStartup: true,
},
{
testName: "successful stop reconciling",
serviceName: "foo",
@ -503,6 +515,16 @@ func TestLeaseRemoveEndpoints(t *testing.T) {
expectUpdate: makeEndpointsArray("foo", []string{"4.3.2.2", "4.3.2.3", "4.3.2.4"}, []corev1.EndpointPort{{Name: "foo", Port: 8080, Protocol: "TCP"}}),
expectLeases: []string{"4.3.2.2", "4.3.2.3", "4.3.2.4"},
},
{
testName: "the last API server was shut down cleanly",
serviceName: "foo",
ip: "1.2.3.4",
endpointPorts: []corev1.EndpointPort{{Name: "foo", Port: 8080, Protocol: "TCP"}},
endpointKeys: []string{"1.2.3.4"},
initialState: makeEndpointsArray("foo", []string{"1.2.3.4"}, []corev1.EndpointPort{{Name: "foo", Port: 8080, Protocol: "TCP"}}),
expectUpdate: makeEndpointsArray("foo", []string{}, []corev1.EndpointPort{}),
expectLeases: []string{},
},
}
for _, test := range stopTests {
t.Run(test.testName, func(t *testing.T) {
@ -514,6 +536,9 @@ func TestLeaseRemoveEndpoints(t *testing.T) {
clientset := fake.NewSimpleClientset(test.initialState...)
epAdapter := NewEndpointsAdapter(clientset.CoreV1(), clientset.DiscoveryV1())
r := NewLeaseEndpointReconciler(epAdapter, fakeLeases)
if !test.apiServerStartup {
r.StopReconciling()
}
err = r.RemoveEndpoints(test.serviceName, netutils.ParseIPSloppy(test.ip), test.endpointPorts)
// if the ip is not on the endpoints, it must return an storage error and stop reconciling
if !contains(test.endpointKeys, test.ip) {
@ -551,3 +576,130 @@ func contains(s []string, str string) bool {
}
return false
}
func TestApiserverShutdown(t *testing.T) {
server, sc := etcd3testing.NewUnsecuredEtcd3TestClientServer(t)
t.Cleanup(func() { server.Terminate(t) })
newFunc := func() runtime.Object { return &corev1.Endpoints{} }
sc.Codec = apitesting.TestStorageCodec(codecs, corev1.SchemeGroupVersion)
s, dFunc, err := factory.Create(*sc.ForResource(schema.GroupResource{Resource: "endpoints"}), newFunc)
if err != nil {
t.Fatalf("Error creating storage: %v", err)
}
t.Cleanup(dFunc)
reconcileTests := []struct {
testName string
serviceName string
ip string
endpointPorts []corev1.EndpointPort
endpointKeys []string
initialState []runtime.Object
expectUpdate []runtime.Object
expectLeases []string
shutDownBeforeReconcile bool
}{
{
testName: "last apiserver shutdown after endpoint reconcile",
serviceName: "foo",
ip: "1.2.3.4",
endpointPorts: []corev1.EndpointPort{{Name: "foo", Port: 8080, Protocol: "TCP"}},
endpointKeys: []string{"1.2.3.4"},
initialState: makeEndpointsArray("foo", []string{"1.2.3.4"}, []corev1.EndpointPort{{Name: "foo", Port: 8080, Protocol: "TCP"}}),
expectUpdate: makeEndpointsArray("foo", []string{}, []corev1.EndpointPort{}),
expectLeases: []string{},
shutDownBeforeReconcile: false,
},
{
testName: "last apiserver shutdown before endpoint reconcile",
serviceName: "foo",
ip: "1.2.3.4",
endpointPorts: []corev1.EndpointPort{{Name: "foo", Port: 8080, Protocol: "TCP"}},
endpointKeys: []string{"1.2.3.4"},
initialState: makeEndpointsArray("foo", []string{"1.2.3.4"}, []corev1.EndpointPort{{Name: "foo", Port: 8080, Protocol: "TCP"}}),
expectUpdate: makeEndpointsArray("foo", []string{}, []corev1.EndpointPort{}),
expectLeases: []string{},
shutDownBeforeReconcile: true,
},
{
testName: "not the last apiserver which was shutdown before endpoint reconcile",
serviceName: "foo",
ip: "1.2.3.4",
endpointPorts: []corev1.EndpointPort{{Name: "foo", Port: 8080, Protocol: "TCP"}},
endpointKeys: []string{"1.2.3.4", "4.3.2.1"},
initialState: makeEndpointsArray("foo", []string{"1.2.3.4", "4.3.2.1"}, []corev1.EndpointPort{{Name: "foo", Port: 8080, Protocol: "TCP"}}),
expectUpdate: makeEndpointsArray("foo", []string{"4.3.2.1"}, []corev1.EndpointPort{{Name: "foo", Port: 8080, Protocol: "TCP"}}),
expectLeases: []string{"4.3.2.1"},
shutDownBeforeReconcile: true,
},
{
testName: "not the last apiserver which was shutdown after endpoint reconcile",
serviceName: "foo",
ip: "1.2.3.4",
endpointPorts: []corev1.EndpointPort{{Name: "foo", Port: 8080, Protocol: "TCP"}},
endpointKeys: []string{"1.2.3.4", "4.3.2.1"},
initialState: makeEndpointsArray("foo", []string{"1.2.3.4", "4.3.2.1"}, []corev1.EndpointPort{{Name: "foo", Port: 8080, Protocol: "TCP"}}),
expectUpdate: makeEndpointsArray("foo", []string{"4.3.2.1"}, []corev1.EndpointPort{{Name: "foo", Port: 8080, Protocol: "TCP"}}),
expectLeases: []string{"4.3.2.1"},
shutDownBeforeReconcile: false,
},
}
for _, test := range reconcileTests {
t.Run(test.testName, func(t *testing.T) {
fakeLeases := newFakeLeases(t, s)
err := fakeLeases.SetKeys(test.endpointKeys)
if err != nil {
t.Errorf("unexpected error creating keys: %v", err)
}
clientset := fake.NewSimpleClientset(test.initialState...)
epAdapter := NewEndpointsAdapter(clientset.CoreV1(), clientset.DiscoveryV1())
r := NewLeaseEndpointReconciler(epAdapter, fakeLeases)
if test.shutDownBeforeReconcile {
// shutdown apiserver first
r.StopReconciling()
err = r.RemoveEndpoints(test.serviceName, netutils.ParseIPSloppy(test.ip), test.endpointPorts)
if err != nil {
t.Errorf("unexpected error remove endpoints: %v", err)
}
// reconcile endpoints in another goroutine
err = r.ReconcileEndpoints(test.serviceName, netutils.ParseIPSloppy(test.ip), test.endpointPorts, false)
if err != nil {
t.Errorf("unexpected error reconciling: %v", err)
}
} else {
// reconcile endpoints first
err = r.ReconcileEndpoints(test.serviceName, netutils.ParseIPSloppy(test.ip), test.endpointPorts, false)
if err != nil {
t.Errorf("unexpected error reconciling: %v", err)
}
r.StopReconciling()
err = r.RemoveEndpoints(test.serviceName, netutils.ParseIPSloppy(test.ip), test.endpointPorts)
if err != nil {
t.Errorf("unexpected error remove endpoints: %v", err)
}
}
err = verifyCreatesAndUpdates(clientset, nil, test.expectUpdate)
if err != nil {
t.Errorf("unexpected error in side effects: %v", err)
}
leases, err := fakeLeases.ListLeases()
if err != nil {
t.Errorf("unexpected error: %v", err)
}
// sort for comparison
sort.Strings(leases)
sort.Strings(test.expectLeases)
if !reflect.DeepEqual(leases, test.expectLeases) {
t.Errorf("expected %v got: %v", test.expectLeases, leases)
}
})
}
}