From e567490317850bbf9441938b928575a4f4e7818c Mon Sep 17 00:00:00 2001 From: czybjtu Date: Thu, 16 Mar 2023 22:55:50 +0800 Subject: [PATCH] fix: remove last ip when apiserver was shut down --- pkg/controlplane/reconcilers/lease.go | 42 ++++-- pkg/controlplane/reconcilers/lease_test.go | 168 ++++++++++++++++++++- 2 files changed, 188 insertions(+), 22 deletions(-) diff --git a/pkg/controlplane/reconcilers/lease.go b/pkg/controlplane/reconcilers/lease.go index 9911d6a4272..ce7b478cb78 100644 --- a/pkg/controlplane/reconcilers/lease.go +++ b/pkg/controlplane/reconcilers/lease.go @@ -26,6 +26,7 @@ import ( "net" "path" "sync" + "sync/atomic" "time" "k8s.io/klog/v2" @@ -92,6 +93,7 @@ func (s *storageLeases) ListLeases() ([]string, error) { } // UpdateLease resets the TTL on a master IP in storage +// UpdateLease will create a new key if it doesn't exist. func (s *storageLeases) UpdateLease(ip string) error { key := path.Join(s.baseKey, ip) return s.storage.GuaranteedUpdate(apirequest.NewDefaultContext(), key, &corev1.Endpoints{}, true, nil, func(input kruntime.Object, respMeta storage.ResponseMeta) (kruntime.Object, *uint64, error) { @@ -146,16 +148,15 @@ func NewLeases(config *storagebackend.ConfigForResource, baseKey string, leaseTi type leaseEndpointReconciler struct { epAdapter EndpointsAdapter masterLeases Leases - stopReconcilingCalled bool + stopReconcilingCalled atomic.Bool reconcilingLock sync.Mutex } // NewLeaseEndpointReconciler creates a new LeaseEndpoint reconciler func NewLeaseEndpointReconciler(epAdapter EndpointsAdapter, masterLeases Leases) EndpointReconciler { return &leaseEndpointReconciler{ - epAdapter: epAdapter, - masterLeases: masterLeases, - stopReconcilingCalled: false, + epAdapter: epAdapter, + masterLeases: masterLeases, } } @@ -167,13 +168,15 @@ func NewLeaseEndpointReconciler(epAdapter EndpointsAdapter, masterLeases Leases) // different from the directory listing, and update the endpoints object // accordingly. func (r *leaseEndpointReconciler) ReconcileEndpoints(serviceName string, ip net.IP, endpointPorts []corev1.EndpointPort, reconcilePorts bool) error { - r.reconcilingLock.Lock() - defer r.reconcilingLock.Unlock() - - if r.stopReconcilingCalled { + // reconcile endpoints only if apiserver was not shutdown + if r.stopReconcilingCalled.Load() { return nil } + // Ensure that there will be no race condition with the RemoveEndpoints. + r.reconcilingLock.Lock() + defer r.reconcilingLock.Unlock() + // Refresh the TTL on our key, independently of whether any error or // update conflict happens below. This makes sure that at least some of // the masters will add our endpoint. @@ -184,6 +187,8 @@ func (r *leaseEndpointReconciler) ReconcileEndpoints(serviceName string, ip net. return r.doReconcile(serviceName, endpointPorts, reconcilePorts) } +// doReconcile can be called from ReconcileEndpoints() or RemoveEndpoints(). +// it is NOT SAFE to call it from multiple goroutines. func (r *leaseEndpointReconciler) doReconcile(serviceName string, endpointPorts []corev1.EndpointPort, reconcilePorts bool) error { e, err := r.epAdapter.Get(corev1.NamespaceDefault, serviceName, metav1.GetOptions{}) shouldCreate := false @@ -192,6 +197,11 @@ func (r *leaseEndpointReconciler) doReconcile(serviceName string, endpointPorts return err } + // there are no endpoints and we should stop reconciling + if r.stopReconcilingCalled.Load() { + return nil + } + shouldCreate = true e = &corev1.Endpoints{ ObjectMeta: metav1.ObjectMeta{ @@ -210,8 +220,10 @@ func (r *leaseEndpointReconciler) doReconcile(serviceName string, endpointPorts // Since we just refreshed our own key, assume that zero endpoints // returned from storage indicates an issue or invalid state, and thus do // not update the endpoints list based on the result. - if len(masterIPs) == 0 { - return fmt.Errorf("no master IPs were listed in storage, refusing to erase all endpoints for the kubernetes service") + // If the controller was ordered to stop and is this is the last apiserver + // we keep going to remove our endpoint before shutting down. + if !r.stopReconcilingCalled.Load() && len(masterIPs) == 0 { + return fmt.Errorf("no API server IP addresses were listed in storage, refusing to erase all endpoints for the kubernetes Service") } // Don't use the EndpointSliceMirroring controller to mirror this to @@ -243,7 +255,7 @@ func (r *leaseEndpointReconciler) doReconcile(serviceName string, endpointPorts e.Subsets = endpointsv1.RepackSubsets(e.Subsets) } - if !portsCorrect { + if len(e.Subsets) != 0 && !portsCorrect { // Reset ports. e.Subsets[0].Ports = endpointPorts } @@ -313,6 +325,10 @@ func checkEndpointSubsetFormatWithLease(e *corev1.Endpoints, expectedIPs []strin } func (r *leaseEndpointReconciler) RemoveEndpoints(serviceName string, ip net.IP, endpointPorts []corev1.EndpointPort) error { + // Ensure that there will be no race condition with the ReconcileEndpoints. + r.reconcilingLock.Lock() + defer r.reconcilingLock.Unlock() + if err := r.masterLeases.RemoveLease(ip.String()); err != nil { return err } @@ -321,9 +337,7 @@ func (r *leaseEndpointReconciler) RemoveEndpoints(serviceName string, ip net.IP, } func (r *leaseEndpointReconciler) StopReconciling() { - r.reconcilingLock.Lock() - defer r.reconcilingLock.Unlock() - r.stopReconcilingCalled = true + r.stopReconcilingCalled.Store(true) } func (r *leaseEndpointReconciler) Destroy() { diff --git a/pkg/controlplane/reconcilers/lease_test.go b/pkg/controlplane/reconcilers/lease_test.go index 79195b3f54d..cc4884d3610 100644 --- a/pkg/controlplane/reconcilers/lease_test.go +++ b/pkg/controlplane/reconcilers/lease_test.go @@ -465,15 +465,27 @@ func TestLeaseRemoveEndpoints(t *testing.T) { t.Cleanup(dFunc) stopTests := []struct { - testName string - serviceName string - ip string - endpointPorts []corev1.EndpointPort - endpointKeys []string - initialState []runtime.Object - expectUpdate []runtime.Object - expectLeases []string + testName string + serviceName string + ip string + endpointPorts []corev1.EndpointPort + endpointKeys []string + initialState []runtime.Object + expectUpdate []runtime.Object + expectLeases []string + apiServerStartup bool }{ + { + testName: "successful remove previous endpoints before apiserver starts", + serviceName: "foo", + ip: "1.2.3.4", + endpointPorts: []corev1.EndpointPort{{Name: "foo", Port: 8080, Protocol: "TCP"}}, + endpointKeys: []string{"1.2.3.4", "4.3.2.2", "4.3.2.3", "4.3.2.4"}, + initialState: makeEndpointsArray("foo", []string{"1.2.3.4", "4.3.2.2", "4.3.2.3", "4.3.2.4"}, []corev1.EndpointPort{{Name: "foo", Port: 8080, Protocol: "TCP"}}), + expectUpdate: makeEndpointsArray("foo", []string{"4.3.2.2", "4.3.2.3", "4.3.2.4"}, []corev1.EndpointPort{{Name: "foo", Port: 8080, Protocol: "TCP"}}), + expectLeases: []string{"4.3.2.2", "4.3.2.3", "4.3.2.4"}, + apiServerStartup: true, + }, { testName: "successful stop reconciling", serviceName: "foo", @@ -503,6 +515,16 @@ func TestLeaseRemoveEndpoints(t *testing.T) { expectUpdate: makeEndpointsArray("foo", []string{"4.3.2.2", "4.3.2.3", "4.3.2.4"}, []corev1.EndpointPort{{Name: "foo", Port: 8080, Protocol: "TCP"}}), expectLeases: []string{"4.3.2.2", "4.3.2.3", "4.3.2.4"}, }, + { + testName: "the last API server was shut down cleanly", + serviceName: "foo", + ip: "1.2.3.4", + endpointPorts: []corev1.EndpointPort{{Name: "foo", Port: 8080, Protocol: "TCP"}}, + endpointKeys: []string{"1.2.3.4"}, + initialState: makeEndpointsArray("foo", []string{"1.2.3.4"}, []corev1.EndpointPort{{Name: "foo", Port: 8080, Protocol: "TCP"}}), + expectUpdate: makeEndpointsArray("foo", []string{}, []corev1.EndpointPort{}), + expectLeases: []string{}, + }, } for _, test := range stopTests { t.Run(test.testName, func(t *testing.T) { @@ -514,6 +536,9 @@ func TestLeaseRemoveEndpoints(t *testing.T) { clientset := fake.NewSimpleClientset(test.initialState...) epAdapter := NewEndpointsAdapter(clientset.CoreV1(), clientset.DiscoveryV1()) r := NewLeaseEndpointReconciler(epAdapter, fakeLeases) + if !test.apiServerStartup { + r.StopReconciling() + } err = r.RemoveEndpoints(test.serviceName, netutils.ParseIPSloppy(test.ip), test.endpointPorts) // if the ip is not on the endpoints, it must return an storage error and stop reconciling if !contains(test.endpointKeys, test.ip) { @@ -551,3 +576,130 @@ func contains(s []string, str string) bool { } return false } + +func TestApiserverShutdown(t *testing.T) { + server, sc := etcd3testing.NewUnsecuredEtcd3TestClientServer(t) + t.Cleanup(func() { server.Terminate(t) }) + + newFunc := func() runtime.Object { return &corev1.Endpoints{} } + sc.Codec = apitesting.TestStorageCodec(codecs, corev1.SchemeGroupVersion) + + s, dFunc, err := factory.Create(*sc.ForResource(schema.GroupResource{Resource: "endpoints"}), newFunc) + if err != nil { + t.Fatalf("Error creating storage: %v", err) + } + t.Cleanup(dFunc) + + reconcileTests := []struct { + testName string + serviceName string + ip string + endpointPorts []corev1.EndpointPort + endpointKeys []string + initialState []runtime.Object + expectUpdate []runtime.Object + expectLeases []string + shutDownBeforeReconcile bool + }{ + { + testName: "last apiserver shutdown after endpoint reconcile", + serviceName: "foo", + ip: "1.2.3.4", + endpointPorts: []corev1.EndpointPort{{Name: "foo", Port: 8080, Protocol: "TCP"}}, + endpointKeys: []string{"1.2.3.4"}, + initialState: makeEndpointsArray("foo", []string{"1.2.3.4"}, []corev1.EndpointPort{{Name: "foo", Port: 8080, Protocol: "TCP"}}), + expectUpdate: makeEndpointsArray("foo", []string{}, []corev1.EndpointPort{}), + expectLeases: []string{}, + shutDownBeforeReconcile: false, + }, + { + testName: "last apiserver shutdown before endpoint reconcile", + serviceName: "foo", + ip: "1.2.3.4", + endpointPorts: []corev1.EndpointPort{{Name: "foo", Port: 8080, Protocol: "TCP"}}, + endpointKeys: []string{"1.2.3.4"}, + initialState: makeEndpointsArray("foo", []string{"1.2.3.4"}, []corev1.EndpointPort{{Name: "foo", Port: 8080, Protocol: "TCP"}}), + expectUpdate: makeEndpointsArray("foo", []string{}, []corev1.EndpointPort{}), + expectLeases: []string{}, + shutDownBeforeReconcile: true, + }, + { + testName: "not the last apiserver which was shutdown before endpoint reconcile", + serviceName: "foo", + ip: "1.2.3.4", + endpointPorts: []corev1.EndpointPort{{Name: "foo", Port: 8080, Protocol: "TCP"}}, + endpointKeys: []string{"1.2.3.4", "4.3.2.1"}, + initialState: makeEndpointsArray("foo", []string{"1.2.3.4", "4.3.2.1"}, []corev1.EndpointPort{{Name: "foo", Port: 8080, Protocol: "TCP"}}), + expectUpdate: makeEndpointsArray("foo", []string{"4.3.2.1"}, []corev1.EndpointPort{{Name: "foo", Port: 8080, Protocol: "TCP"}}), + expectLeases: []string{"4.3.2.1"}, + shutDownBeforeReconcile: true, + }, + { + testName: "not the last apiserver which was shutdown after endpoint reconcile", + serviceName: "foo", + ip: "1.2.3.4", + endpointPorts: []corev1.EndpointPort{{Name: "foo", Port: 8080, Protocol: "TCP"}}, + endpointKeys: []string{"1.2.3.4", "4.3.2.1"}, + initialState: makeEndpointsArray("foo", []string{"1.2.3.4", "4.3.2.1"}, []corev1.EndpointPort{{Name: "foo", Port: 8080, Protocol: "TCP"}}), + expectUpdate: makeEndpointsArray("foo", []string{"4.3.2.1"}, []corev1.EndpointPort{{Name: "foo", Port: 8080, Protocol: "TCP"}}), + expectLeases: []string{"4.3.2.1"}, + shutDownBeforeReconcile: false, + }, + } + for _, test := range reconcileTests { + t.Run(test.testName, func(t *testing.T) { + fakeLeases := newFakeLeases(t, s) + err := fakeLeases.SetKeys(test.endpointKeys) + if err != nil { + t.Errorf("unexpected error creating keys: %v", err) + } + clientset := fake.NewSimpleClientset(test.initialState...) + + epAdapter := NewEndpointsAdapter(clientset.CoreV1(), clientset.DiscoveryV1()) + r := NewLeaseEndpointReconciler(epAdapter, fakeLeases) + + if test.shutDownBeforeReconcile { + // shutdown apiserver first + r.StopReconciling() + err = r.RemoveEndpoints(test.serviceName, netutils.ParseIPSloppy(test.ip), test.endpointPorts) + if err != nil { + t.Errorf("unexpected error remove endpoints: %v", err) + } + + // reconcile endpoints in another goroutine + err = r.ReconcileEndpoints(test.serviceName, netutils.ParseIPSloppy(test.ip), test.endpointPorts, false) + if err != nil { + t.Errorf("unexpected error reconciling: %v", err) + } + } else { + // reconcile endpoints first + err = r.ReconcileEndpoints(test.serviceName, netutils.ParseIPSloppy(test.ip), test.endpointPorts, false) + if err != nil { + t.Errorf("unexpected error reconciling: %v", err) + } + + r.StopReconciling() + err = r.RemoveEndpoints(test.serviceName, netutils.ParseIPSloppy(test.ip), test.endpointPorts) + if err != nil { + t.Errorf("unexpected error remove endpoints: %v", err) + } + } + + err = verifyCreatesAndUpdates(clientset, nil, test.expectUpdate) + if err != nil { + t.Errorf("unexpected error in side effects: %v", err) + } + + leases, err := fakeLeases.ListLeases() + if err != nil { + t.Errorf("unexpected error: %v", err) + } + // sort for comparison + sort.Strings(leases) + sort.Strings(test.expectLeases) + if !reflect.DeepEqual(leases, test.expectLeases) { + t.Errorf("expected %v got: %v", test.expectLeases, leases) + } + }) + } +}