mirror of
https://github.com/k3s-io/kubernetes.git
synced 2026-01-04 23:17:50 +00:00
Updating EndpointSliceMirroring controller to listen for Service changes
This fixes a bug that could occur if a custom Endpoints resource was created before a Service was created.
This commit is contained in:
@@ -25,7 +25,6 @@ import (
|
||||
v1 "k8s.io/api/core/v1"
|
||||
discovery "k8s.io/api/discovery/v1beta1"
|
||||
apierrors "k8s.io/apimachinery/pkg/api/errors"
|
||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||
"k8s.io/apimachinery/pkg/labels"
|
||||
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
|
||||
"k8s.io/apimachinery/pkg/util/wait"
|
||||
@@ -121,6 +120,11 @@ func NewController(endpointsInformer coreinformers.EndpointsInformer,
|
||||
|
||||
c.serviceLister = serviceInformer.Lister()
|
||||
c.servicesSynced = serviceInformer.Informer().HasSynced
|
||||
serviceInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
|
||||
AddFunc: c.onServiceAdd,
|
||||
UpdateFunc: c.onServiceUpdate,
|
||||
DeleteFunc: c.onServiceDelete,
|
||||
})
|
||||
|
||||
c.maxEndpointsPerSubset = maxEndpointsPerSubset
|
||||
|
||||
@@ -273,28 +277,47 @@ func (c *Controller) syncEndpoints(key string) error {
|
||||
return err
|
||||
}
|
||||
|
||||
endpointSlices, err := endpointSlicesMirroredForService(c.endpointSliceLister, namespace, name)
|
||||
|
||||
endpoints, err := c.endpointsLister.Endpoints(namespace).Get(name)
|
||||
if err != nil {
|
||||
ep := &v1.Endpoints{ObjectMeta: metav1.ObjectMeta{Name: name, Namespace: namespace}}
|
||||
c.eventRecorder.Eventf(ep, FailedToListEndpointSlices,
|
||||
"Error listing EndpointSlices for Endpoints %s/%s: %v", ep.Namespace, ep.Name, err)
|
||||
if apierrors.IsNotFound(err) {
|
||||
klog.V(4).Infof("%s/%s Endpoints not found, cleaning up any mirrored EndpointSlices", namespace, name)
|
||||
c.endpointSliceTracker.DeleteService(namespace, name)
|
||||
return c.deleteMirroredSlices(namespace, name)
|
||||
}
|
||||
return err
|
||||
}
|
||||
|
||||
endpoints, err := c.endpointsLister.Endpoints(namespace).Get(name)
|
||||
if err != nil || !c.shouldMirror(endpoints) {
|
||||
if apierrors.IsNotFound(err) || !c.shouldMirror(endpoints) {
|
||||
if !c.shouldMirror(endpoints) {
|
||||
klog.V(4).Infof("%s/%s Endpoints should not be mirrored, cleaning up any mirrored EndpointSlices", namespace, name)
|
||||
c.endpointSliceTracker.DeleteService(namespace, name)
|
||||
return c.deleteMirroredSlices(namespace, name)
|
||||
}
|
||||
|
||||
svc, err := c.serviceLister.Services(namespace).Get(name)
|
||||
if err != nil {
|
||||
if apierrors.IsNotFound(err) {
|
||||
klog.V(4).Infof("%s/%s Service not found, cleaning up any mirrored EndpointSlices", namespace, name)
|
||||
c.endpointSliceTracker.DeleteService(namespace, name)
|
||||
return c.reconciler.deleteEndpoints(namespace, name, endpointSlices)
|
||||
return c.deleteMirroredSlices(namespace, name)
|
||||
}
|
||||
return err
|
||||
}
|
||||
|
||||
// This means that if a Service transitions away from a nil selector, any
|
||||
// mirrored EndpointSlices will not be cleaned up. #91072 tracks this issue
|
||||
// for this controller along with the Endpoints and EndpointSlice
|
||||
// controllers.
|
||||
if svc.Spec.Selector != nil {
|
||||
return nil
|
||||
}
|
||||
|
||||
endpointSlices, err := endpointSlicesMirroredForService(c.endpointSliceLister, namespace, name)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
err = c.reconciler.reconcile(endpoints, endpointSlices)
|
||||
if err != nil {
|
||||
c.eventRecorder.Eventf(endpoints, v1.EventTypeWarning, FailedToUpdateEndpointSlices,
|
||||
"Error updating EndpointSlices for Endpoints %s/%s: %v", endpoints.Namespace, endpoints.Name, err)
|
||||
return err
|
||||
}
|
||||
|
||||
@@ -314,30 +337,56 @@ func (c *Controller) queueEndpoints(obj interface{}) {
|
||||
|
||||
// shouldMirror returns true if an Endpoints resource should be mirrored by this
|
||||
// controller. This will be false if:
|
||||
// - the Endpoints resource is nil.
|
||||
// - the Endpoints resource has a skip-mirror label.
|
||||
// - the Endpoints resource has a leader election annotation.
|
||||
// - the corresponding Service resource does not exist.
|
||||
// - the corresponding Service resource has a non-nil selector.
|
||||
// This does not ensure that a corresponding Service exists with a nil selector.
|
||||
// That check should be performed separately.
|
||||
func (c *Controller) shouldMirror(endpoints *v1.Endpoints) bool {
|
||||
if endpoints == nil || skipMirror(endpoints.Labels) || hasLeaderElection(endpoints.Annotations) {
|
||||
return false
|
||||
}
|
||||
|
||||
svc, err := c.serviceLister.Services(endpoints.Namespace).Get(endpoints.Name)
|
||||
if err != nil {
|
||||
if !apierrors.IsNotFound(err) {
|
||||
klog.Errorf("Error fetching %s/%s Service: %v", endpoints.Namespace, endpoints.Name, err)
|
||||
}
|
||||
return false
|
||||
}
|
||||
|
||||
if svc.Spec.Selector != nil {
|
||||
return false
|
||||
}
|
||||
|
||||
return true
|
||||
}
|
||||
|
||||
// onServiceAdd queues a sync for the relevant Endpoints resource.
|
||||
func (c *Controller) onServiceAdd(obj interface{}) {
|
||||
service := obj.(*v1.Service)
|
||||
if service == nil {
|
||||
utilruntime.HandleError(fmt.Errorf("onServiceAdd() expected type v1.Service, got %T", obj))
|
||||
return
|
||||
}
|
||||
if service.Spec.Selector == nil {
|
||||
c.queueEndpoints(obj)
|
||||
}
|
||||
}
|
||||
|
||||
// onServiceUpdate queues a sync for the relevant Endpoints resource.
|
||||
func (c *Controller) onServiceUpdate(prevObj, obj interface{}) {
|
||||
service := obj.(*v1.Service)
|
||||
prevService := prevObj.(*v1.Service)
|
||||
if service == nil || prevService == nil {
|
||||
utilruntime.HandleError(fmt.Errorf("onServiceUpdate() expected type v1.Endpoints, got %T, %T", prevObj, obj))
|
||||
return
|
||||
}
|
||||
if (service.Spec.Selector == nil) != (prevService.Spec.Selector == nil) {
|
||||
c.queueEndpoints(obj)
|
||||
}
|
||||
}
|
||||
|
||||
// onServiceDelete queues a sync for the relevant Endpoints resource.
|
||||
func (c *Controller) onServiceDelete(obj interface{}) {
|
||||
service := getServiceFromDeleteAction(obj)
|
||||
if service == nil {
|
||||
utilruntime.HandleError(fmt.Errorf("onServiceDelete() expected type v1.Service, got %T", obj))
|
||||
return
|
||||
}
|
||||
if service.Spec.Selector == nil {
|
||||
c.queueEndpoints(obj)
|
||||
}
|
||||
}
|
||||
|
||||
// onEndpointsAdd queues a sync for the relevant Endpoints resource.
|
||||
func (c *Controller) onEndpointsAdd(obj interface{}) {
|
||||
endpoints := obj.(*v1.Endpoints)
|
||||
@@ -437,6 +486,18 @@ func (c *Controller) queueEndpointsForEndpointSlice(endpointSlice *discovery.End
|
||||
c.queue.AddAfter(key, c.endpointUpdatesBatchPeriod)
|
||||
}
|
||||
|
||||
// deleteMirroredSlices will delete and EndpointSlices that have been mirrored
|
||||
// for Endpoints with this namespace and name.
|
||||
func (c *Controller) deleteMirroredSlices(namespace, name string) error {
|
||||
endpointSlices, err := endpointSlicesMirroredForService(c.endpointSliceLister, namespace, name)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
c.endpointSliceTracker.DeleteService(namespace, name)
|
||||
return c.reconciler.deleteEndpoints(namespace, name, endpointSlices)
|
||||
}
|
||||
|
||||
// endpointSlicesMirroredForService returns the EndpointSlices that have been
|
||||
// mirrored for a Service by this controller.
|
||||
func endpointSlicesMirroredForService(endpointSliceLister discoverylisters.EndpointSliceLister, namespace, name string) ([]*discovery.EndpointSlice, error) {
|
||||
|
||||
Reference in New Issue
Block a user