mirror of
https://github.com/k3s-io/kubernetes.git
synced 2025-07-19 18:02:01 +00:00
Merge pull request #94171 from robscott/endpointslicemirroring-fix
Updating EndpointSliceMirroring controller to listen for Service changes
This commit is contained in:
commit
4db3a096ce
@ -25,7 +25,6 @@ import (
|
||||
v1 "k8s.io/api/core/v1"
|
||||
discovery "k8s.io/api/discovery/v1beta1"
|
||||
apierrors "k8s.io/apimachinery/pkg/api/errors"
|
||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||
"k8s.io/apimachinery/pkg/labels"
|
||||
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
|
||||
"k8s.io/apimachinery/pkg/util/wait"
|
||||
@ -121,6 +120,11 @@ func NewController(endpointsInformer coreinformers.EndpointsInformer,
|
||||
|
||||
c.serviceLister = serviceInformer.Lister()
|
||||
c.servicesSynced = serviceInformer.Informer().HasSynced
|
||||
serviceInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
|
||||
AddFunc: c.onServiceAdd,
|
||||
UpdateFunc: c.onServiceUpdate,
|
||||
DeleteFunc: c.onServiceDelete,
|
||||
})
|
||||
|
||||
c.maxEndpointsPerSubset = maxEndpointsPerSubset
|
||||
|
||||
@ -273,28 +277,47 @@ func (c *Controller) syncEndpoints(key string) error {
|
||||
return err
|
||||
}
|
||||
|
||||
endpointSlices, err := endpointSlicesMirroredForService(c.endpointSliceLister, namespace, name)
|
||||
|
||||
endpoints, err := c.endpointsLister.Endpoints(namespace).Get(name)
|
||||
if err != nil {
|
||||
ep := &v1.Endpoints{ObjectMeta: metav1.ObjectMeta{Name: name, Namespace: namespace}}
|
||||
c.eventRecorder.Eventf(ep, FailedToListEndpointSlices,
|
||||
"Error listing EndpointSlices for Endpoints %s/%s: %v", ep.Namespace, ep.Name, err)
|
||||
if apierrors.IsNotFound(err) {
|
||||
klog.V(4).Infof("%s/%s Endpoints not found, cleaning up any mirrored EndpointSlices", namespace, name)
|
||||
c.endpointSliceTracker.DeleteService(namespace, name)
|
||||
return c.deleteMirroredSlices(namespace, name)
|
||||
}
|
||||
return err
|
||||
}
|
||||
|
||||
endpoints, err := c.endpointsLister.Endpoints(namespace).Get(name)
|
||||
if err != nil || !c.shouldMirror(endpoints) {
|
||||
if apierrors.IsNotFound(err) || !c.shouldMirror(endpoints) {
|
||||
if !c.shouldMirror(endpoints) {
|
||||
klog.V(4).Infof("%s/%s Endpoints should not be mirrored, cleaning up any mirrored EndpointSlices", namespace, name)
|
||||
c.endpointSliceTracker.DeleteService(namespace, name)
|
||||
return c.deleteMirroredSlices(namespace, name)
|
||||
}
|
||||
|
||||
svc, err := c.serviceLister.Services(namespace).Get(name)
|
||||
if err != nil {
|
||||
if apierrors.IsNotFound(err) {
|
||||
klog.V(4).Infof("%s/%s Service not found, cleaning up any mirrored EndpointSlices", namespace, name)
|
||||
c.endpointSliceTracker.DeleteService(namespace, name)
|
||||
return c.reconciler.deleteEndpoints(namespace, name, endpointSlices)
|
||||
return c.deleteMirroredSlices(namespace, name)
|
||||
}
|
||||
return err
|
||||
}
|
||||
|
||||
// This means that if a Service transitions away from a nil selector, any
|
||||
// mirrored EndpointSlices will not be cleaned up. #91072 tracks this issue
|
||||
// for this controller along with the Endpoints and EndpointSlice
|
||||
// controllers.
|
||||
if svc.Spec.Selector != nil {
|
||||
return nil
|
||||
}
|
||||
|
||||
endpointSlices, err := endpointSlicesMirroredForService(c.endpointSliceLister, namespace, name)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
err = c.reconciler.reconcile(endpoints, endpointSlices)
|
||||
if err != nil {
|
||||
c.eventRecorder.Eventf(endpoints, v1.EventTypeWarning, FailedToUpdateEndpointSlices,
|
||||
"Error updating EndpointSlices for Endpoints %s/%s: %v", endpoints.Namespace, endpoints.Name, err)
|
||||
return err
|
||||
}
|
||||
|
||||
@ -314,30 +337,56 @@ func (c *Controller) queueEndpoints(obj interface{}) {
|
||||
|
||||
// shouldMirror returns true if an Endpoints resource should be mirrored by this
|
||||
// controller. This will be false if:
|
||||
// - the Endpoints resource is nil.
|
||||
// - the Endpoints resource has a skip-mirror label.
|
||||
// - the Endpoints resource has a leader election annotation.
|
||||
// - the corresponding Service resource does not exist.
|
||||
// - the corresponding Service resource has a non-nil selector.
|
||||
// This does not ensure that a corresponding Service exists with a nil selector.
|
||||
// That check should be performed separately.
|
||||
func (c *Controller) shouldMirror(endpoints *v1.Endpoints) bool {
|
||||
if endpoints == nil || skipMirror(endpoints.Labels) || hasLeaderElection(endpoints.Annotations) {
|
||||
return false
|
||||
}
|
||||
|
||||
svc, err := c.serviceLister.Services(endpoints.Namespace).Get(endpoints.Name)
|
||||
if err != nil {
|
||||
if !apierrors.IsNotFound(err) {
|
||||
klog.Errorf("Error fetching %s/%s Service: %v", endpoints.Namespace, endpoints.Name, err)
|
||||
}
|
||||
return false
|
||||
}
|
||||
|
||||
if svc.Spec.Selector != nil {
|
||||
return false
|
||||
}
|
||||
|
||||
return true
|
||||
}
|
||||
|
||||
// onServiceAdd queues a sync for the relevant Endpoints resource.
|
||||
func (c *Controller) onServiceAdd(obj interface{}) {
|
||||
service := obj.(*v1.Service)
|
||||
if service == nil {
|
||||
utilruntime.HandleError(fmt.Errorf("onServiceAdd() expected type v1.Service, got %T", obj))
|
||||
return
|
||||
}
|
||||
if service.Spec.Selector == nil {
|
||||
c.queueEndpoints(obj)
|
||||
}
|
||||
}
|
||||
|
||||
// onServiceUpdate queues a sync for the relevant Endpoints resource.
|
||||
func (c *Controller) onServiceUpdate(prevObj, obj interface{}) {
|
||||
service := obj.(*v1.Service)
|
||||
prevService := prevObj.(*v1.Service)
|
||||
if service == nil || prevService == nil {
|
||||
utilruntime.HandleError(fmt.Errorf("onServiceUpdate() expected type v1.Endpoints, got %T, %T", prevObj, obj))
|
||||
return
|
||||
}
|
||||
if (service.Spec.Selector == nil) != (prevService.Spec.Selector == nil) {
|
||||
c.queueEndpoints(obj)
|
||||
}
|
||||
}
|
||||
|
||||
// onServiceDelete queues a sync for the relevant Endpoints resource.
|
||||
func (c *Controller) onServiceDelete(obj interface{}) {
|
||||
service := getServiceFromDeleteAction(obj)
|
||||
if service == nil {
|
||||
utilruntime.HandleError(fmt.Errorf("onServiceDelete() expected type v1.Service, got %T", obj))
|
||||
return
|
||||
}
|
||||
if service.Spec.Selector == nil {
|
||||
c.queueEndpoints(obj)
|
||||
}
|
||||
}
|
||||
|
||||
// onEndpointsAdd queues a sync for the relevant Endpoints resource.
|
||||
func (c *Controller) onEndpointsAdd(obj interface{}) {
|
||||
endpoints := obj.(*v1.Endpoints)
|
||||
@ -437,6 +486,18 @@ func (c *Controller) queueEndpointsForEndpointSlice(endpointSlice *discovery.End
|
||||
c.queue.AddAfter(key, c.endpointUpdatesBatchPeriod)
|
||||
}
|
||||
|
||||
// deleteMirroredSlices will delete and EndpointSlices that have been mirrored
|
||||
// for Endpoints with this namespace and name.
|
||||
func (c *Controller) deleteMirroredSlices(namespace, name string) error {
|
||||
endpointSlices, err := endpointSlicesMirroredForService(c.endpointSliceLister, namespace, name)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
c.endpointSliceTracker.DeleteService(namespace, name)
|
||||
return c.reconciler.deleteEndpoints(namespace, name, endpointSlices)
|
||||
}
|
||||
|
||||
// endpointSlicesMirroredForService returns the EndpointSlices that have been
|
||||
// mirrored for a Service by this controller.
|
||||
func endpointSlicesMirroredForService(endpointSliceLister discoverylisters.EndpointSliceLister, namespace, name string) ([]*discovery.EndpointSlice, error) {
|
||||
|
@ -74,12 +74,14 @@ func TestSyncEndpoints(t *testing.T) {
|
||||
|
||||
testCases := []struct {
|
||||
testName string
|
||||
service *v1.Service
|
||||
endpoints *v1.Endpoints
|
||||
endpointSlices []*discovery.EndpointSlice
|
||||
expectedNumActions int
|
||||
expectedNumSlices int
|
||||
}{{
|
||||
testName: "Endpoints with no addresses",
|
||||
service: &v1.Service{},
|
||||
endpoints: &v1.Endpoints{
|
||||
Subsets: []v1.EndpointSubset{{
|
||||
Ports: []v1.EndpointPort{{Port: 80}},
|
||||
@ -90,6 +92,7 @@ func TestSyncEndpoints(t *testing.T) {
|
||||
expectedNumSlices: 0,
|
||||
}, {
|
||||
testName: "Endpoints with skip label true",
|
||||
service: &v1.Service{},
|
||||
endpoints: &v1.Endpoints{
|
||||
ObjectMeta: metav1.ObjectMeta{
|
||||
Labels: map[string]string{discovery.LabelSkipMirror: "true"},
|
||||
@ -104,6 +107,7 @@ func TestSyncEndpoints(t *testing.T) {
|
||||
expectedNumSlices: 0,
|
||||
}, {
|
||||
testName: "Endpoints with skip label false",
|
||||
service: &v1.Service{},
|
||||
endpoints: &v1.Endpoints{
|
||||
ObjectMeta: metav1.ObjectMeta{
|
||||
Labels: map[string]string{discovery.LabelSkipMirror: "false"},
|
||||
@ -116,8 +120,37 @@ func TestSyncEndpoints(t *testing.T) {
|
||||
endpointSlices: []*discovery.EndpointSlice{},
|
||||
expectedNumActions: 1,
|
||||
expectedNumSlices: 1,
|
||||
}, {
|
||||
testName: "Endpoints with missing Service",
|
||||
service: nil,
|
||||
endpoints: &v1.Endpoints{
|
||||
Subsets: []v1.EndpointSubset{{
|
||||
Ports: []v1.EndpointPort{{Port: 80}},
|
||||
Addresses: []v1.EndpointAddress{{IP: "10.0.0.1"}},
|
||||
}},
|
||||
},
|
||||
endpointSlices: []*discovery.EndpointSlice{},
|
||||
expectedNumActions: 0,
|
||||
expectedNumSlices: 0,
|
||||
}, {
|
||||
testName: "Endpoints with Service with selector specified",
|
||||
service: &v1.Service{
|
||||
Spec: v1.ServiceSpec{
|
||||
Selector: map[string]string{"foo": "bar"},
|
||||
},
|
||||
},
|
||||
endpoints: &v1.Endpoints{
|
||||
Subsets: []v1.EndpointSubset{{
|
||||
Ports: []v1.EndpointPort{{Port: 80}},
|
||||
Addresses: []v1.EndpointAddress{{IP: "10.0.0.1"}},
|
||||
}},
|
||||
},
|
||||
endpointSlices: []*discovery.EndpointSlice{},
|
||||
expectedNumActions: 0,
|
||||
expectedNumSlices: 0,
|
||||
}, {
|
||||
testName: "Existing EndpointSlices that need to be cleaned up",
|
||||
service: &v1.Service{},
|
||||
endpoints: &v1.Endpoints{
|
||||
Subsets: []v1.EndpointSubset{{
|
||||
Ports: []v1.EndpointPort{{Port: 80}},
|
||||
@ -136,6 +169,7 @@ func TestSyncEndpoints(t *testing.T) {
|
||||
expectedNumSlices: 0,
|
||||
}, {
|
||||
testName: "Existing EndpointSlices managed by a different controller, no addresses to sync",
|
||||
service: &v1.Service{},
|
||||
endpoints: &v1.Endpoints{
|
||||
Subsets: []v1.EndpointSubset{{
|
||||
Ports: []v1.EndpointPort{{Port: 80}},
|
||||
@ -154,6 +188,7 @@ func TestSyncEndpoints(t *testing.T) {
|
||||
expectedNumSlices: 0,
|
||||
}, {
|
||||
testName: "Endpoints with 1000 addresses",
|
||||
service: &v1.Service{},
|
||||
endpoints: &v1.Endpoints{
|
||||
Subsets: []v1.EndpointSubset{{
|
||||
Ports: []v1.EndpointPort{{Port: 80}},
|
||||
@ -165,6 +200,7 @@ func TestSyncEndpoints(t *testing.T) {
|
||||
expectedNumSlices: 1,
|
||||
}, {
|
||||
testName: "Endpoints with 1001 addresses - 1 should not be mirrored",
|
||||
service: &v1.Service{},
|
||||
endpoints: &v1.Endpoints{
|
||||
Subsets: []v1.EndpointSubset{{
|
||||
Ports: []v1.EndpointPort{{Port: 80}},
|
||||
@ -182,10 +218,11 @@ func TestSyncEndpoints(t *testing.T) {
|
||||
tc.endpoints.Name = endpointsName
|
||||
tc.endpoints.Namespace = namespace
|
||||
esController.endpointsStore.Add(tc.endpoints)
|
||||
esController.serviceStore.Add(&v1.Service{ObjectMeta: metav1.ObjectMeta{
|
||||
Name: endpointsName,
|
||||
Namespace: namespace,
|
||||
}})
|
||||
if tc.service != nil {
|
||||
tc.service.Name = endpointsName
|
||||
tc.service.Namespace = namespace
|
||||
esController.serviceStore.Add(tc.service)
|
||||
}
|
||||
|
||||
for _, epSlice := range tc.endpointSlices {
|
||||
epSlice.Namespace = namespace
|
||||
@ -214,45 +251,23 @@ func TestSyncEndpoints(t *testing.T) {
|
||||
}
|
||||
|
||||
func TestShouldMirror(t *testing.T) {
|
||||
svcWithSelector := &v1.Service{
|
||||
ObjectMeta: metav1.ObjectMeta{
|
||||
Name: "with-selector",
|
||||
Namespace: "example1",
|
||||
},
|
||||
Spec: v1.ServiceSpec{
|
||||
Selector: map[string]string{"with": "selector"},
|
||||
},
|
||||
}
|
||||
svcWithoutSelector := &v1.Service{
|
||||
ObjectMeta: metav1.ObjectMeta{
|
||||
Name: "without-selector",
|
||||
Namespace: "example1",
|
||||
},
|
||||
Spec: v1.ServiceSpec{},
|
||||
}
|
||||
|
||||
testCases := []struct {
|
||||
testName string
|
||||
endpoints *v1.Endpoints
|
||||
service *v1.Service
|
||||
shouldMirror bool
|
||||
}{{
|
||||
testName: "Service without selector with matching endpoints",
|
||||
service: svcWithoutSelector,
|
||||
testName: "Standard Endpoints",
|
||||
endpoints: &v1.Endpoints{
|
||||
ObjectMeta: metav1.ObjectMeta{
|
||||
Name: svcWithoutSelector.Name,
|
||||
Namespace: svcWithoutSelector.Namespace,
|
||||
Name: "test-endpoints",
|
||||
},
|
||||
},
|
||||
shouldMirror: true,
|
||||
}, {
|
||||
testName: "Service without selector, matching Endpoints with skip-mirror=true",
|
||||
service: svcWithoutSelector,
|
||||
testName: "Endpoints with skip-mirror=true",
|
||||
endpoints: &v1.Endpoints{
|
||||
ObjectMeta: metav1.ObjectMeta{
|
||||
Name: svcWithSelector.Name,
|
||||
Namespace: svcWithSelector.Namespace,
|
||||
Name: "test-endpoints",
|
||||
Labels: map[string]string{
|
||||
discovery.LabelSkipMirror: "true",
|
||||
},
|
||||
@ -260,12 +275,10 @@ func TestShouldMirror(t *testing.T) {
|
||||
},
|
||||
shouldMirror: false,
|
||||
}, {
|
||||
testName: "Service without selector, matching Endpoints with skip-mirror=invalid",
|
||||
service: svcWithoutSelector,
|
||||
testName: "Endpoints with skip-mirror=invalid",
|
||||
endpoints: &v1.Endpoints{
|
||||
ObjectMeta: metav1.ObjectMeta{
|
||||
Name: svcWithoutSelector.Name,
|
||||
Namespace: svcWithoutSelector.Namespace,
|
||||
Name: "test-endpoints",
|
||||
Labels: map[string]string{
|
||||
discovery.LabelSkipMirror: "invalid",
|
||||
},
|
||||
@ -273,43 +286,16 @@ func TestShouldMirror(t *testing.T) {
|
||||
},
|
||||
shouldMirror: true,
|
||||
}, {
|
||||
testName: "Service without selector, matching Endpoints with leader election annotation",
|
||||
service: svcWithoutSelector,
|
||||
testName: "Endpoints with leader election annotation",
|
||||
endpoints: &v1.Endpoints{
|
||||
ObjectMeta: metav1.ObjectMeta{
|
||||
Name: svcWithSelector.Name,
|
||||
Namespace: svcWithSelector.Namespace,
|
||||
Name: "test-endpoints",
|
||||
Annotations: map[string]string{
|
||||
resourcelock.LeaderElectionRecordAnnotationKey: "",
|
||||
},
|
||||
},
|
||||
},
|
||||
shouldMirror: false,
|
||||
}, {
|
||||
testName: "Service without selector, matching Endpoints without skip label in different namespace",
|
||||
service: svcWithSelector,
|
||||
endpoints: &v1.Endpoints{
|
||||
ObjectMeta: metav1.ObjectMeta{
|
||||
Name: svcWithSelector.Name,
|
||||
Namespace: svcWithSelector.Namespace + "different",
|
||||
},
|
||||
},
|
||||
shouldMirror: false,
|
||||
}, {
|
||||
testName: "Service without selector or matching endpoints",
|
||||
service: svcWithoutSelector,
|
||||
endpoints: nil,
|
||||
shouldMirror: false,
|
||||
}, {
|
||||
testName: "Endpoints without matching Service",
|
||||
service: nil,
|
||||
endpoints: &v1.Endpoints{
|
||||
ObjectMeta: metav1.ObjectMeta{
|
||||
Name: svcWithoutSelector.Name,
|
||||
Namespace: svcWithoutSelector.Namespace,
|
||||
},
|
||||
},
|
||||
shouldMirror: false,
|
||||
}}
|
||||
|
||||
for _, tc := range testCases {
|
||||
@ -323,13 +309,6 @@ func TestShouldMirror(t *testing.T) {
|
||||
}
|
||||
}
|
||||
|
||||
if tc.service != nil {
|
||||
err := c.serviceStore.Add(tc.service)
|
||||
if err != nil {
|
||||
t.Fatalf("Error adding Service to store: %v", err)
|
||||
}
|
||||
}
|
||||
|
||||
shouldMirror := c.shouldMirror(tc.endpoints)
|
||||
|
||||
if shouldMirror != tc.shouldMirror {
|
||||
|
@ -185,14 +185,35 @@ func objectRefPtrEqual(ref1, ref2 *corev1.ObjectReference) bool {
|
||||
return true
|
||||
}
|
||||
|
||||
// getServiceFromDeleteAction parses a Service resource from a delete
|
||||
// action.
|
||||
func getServiceFromDeleteAction(obj interface{}) *corev1.Service {
|
||||
if service, ok := obj.(*corev1.Service); ok {
|
||||
return service
|
||||
}
|
||||
// If we reached here it means the Service was deleted but its final state
|
||||
// is unrecorded.
|
||||
tombstone, ok := obj.(cache.DeletedFinalStateUnknown)
|
||||
if !ok {
|
||||
utilruntime.HandleError(fmt.Errorf("Couldn't get object from tombstone %#v", obj))
|
||||
return nil
|
||||
}
|
||||
service, ok := tombstone.Obj.(*corev1.Service)
|
||||
if !ok {
|
||||
utilruntime.HandleError(fmt.Errorf("Tombstone contained object that is not a Service resource: %#v", obj))
|
||||
return nil
|
||||
}
|
||||
return service
|
||||
}
|
||||
|
||||
// getEndpointsFromDeleteAction parses an Endpoints resource from a delete
|
||||
// action.
|
||||
func getEndpointsFromDeleteAction(obj interface{}) *corev1.Endpoints {
|
||||
if endpointSlice, ok := obj.(*corev1.Endpoints); ok {
|
||||
return endpointSlice
|
||||
if endpoints, ok := obj.(*corev1.Endpoints); ok {
|
||||
return endpoints
|
||||
}
|
||||
// If we reached here it means the EndpointSlice was deleted but its final
|
||||
// state is unrecorded.
|
||||
// If we reached here it means the Endpoints resource was deleted but its
|
||||
// final state is unrecorded.
|
||||
tombstone, ok := obj.(cache.DeletedFinalStateUnknown)
|
||||
if !ok {
|
||||
utilruntime.HandleError(fmt.Errorf("Couldn't get object from tombstone %#v", obj))
|
||||
|
@ -156,6 +156,14 @@ func TestEndpointSliceMirroring(t *testing.T) {
|
||||
ObjectMeta: metav1.ObjectMeta{
|
||||
Name: "test-123",
|
||||
},
|
||||
Subsets: []corev1.EndpointSubset{{
|
||||
Ports: []corev1.EndpointPort{{
|
||||
Port: 80,
|
||||
}},
|
||||
Addresses: []corev1.EndpointAddress{{
|
||||
IP: "10.0.0.1",
|
||||
}},
|
||||
}},
|
||||
},
|
||||
expectEndpointSlice: false,
|
||||
}}
|
||||
|
Loading…
Reference in New Issue
Block a user