move endpointslice reconciler to staging endpointslice repo

This commit is contained in:
Maciej Skrocki
2023-06-26 18:38:36 +00:00
parent 22c66784e0
commit 29fad383da
40 changed files with 953 additions and 344 deletions

View File

@@ -40,12 +40,13 @@ import (
"k8s.io/client-go/tools/cache"
"k8s.io/client-go/tools/record"
"k8s.io/client-go/util/workqueue"
endpointslicerec "k8s.io/endpointslice"
endpointslicemetrics "k8s.io/endpointslice/metrics"
"k8s.io/endpointslice/topologycache"
endpointsliceutil "k8s.io/endpointslice/util"
"k8s.io/klog/v2"
"k8s.io/kubernetes/pkg/controller"
endpointslicemetrics "k8s.io/kubernetes/pkg/controller/endpointslice/metrics"
"k8s.io/kubernetes/pkg/controller/endpointslice/topologycache"
endpointutil "k8s.io/kubernetes/pkg/controller/util/endpoint"
endpointsliceutil "k8s.io/kubernetes/pkg/controller/util/endpointslice"
endpointslicepkg "k8s.io/kubernetes/pkg/controller/util/endpointslice"
"k8s.io/kubernetes/pkg/features"
)
@@ -143,7 +144,7 @@ func NewController(ctx context.Context, podInformer coreinformers.PodInformer,
c.maxEndpointsPerSlice = maxEndpointsPerSlice
c.triggerTimeTracker = endpointutil.NewTriggerTimeTracker()
c.triggerTimeTracker = endpointsliceutil.NewTriggerTimeTracker()
c.eventBroadcaster = broadcaster
c.eventRecorder = recorder
@@ -166,15 +167,14 @@ func NewController(ctx context.Context, podInformer coreinformers.PodInformer,
c.topologyCache = topologycache.NewTopologyCache()
}
c.reconciler = &reconciler{
client: c.client,
nodeLister: c.nodeLister,
maxEndpointsPerSlice: c.maxEndpointsPerSlice,
endpointSliceTracker: c.endpointSliceTracker,
metricsCache: endpointslicemetrics.NewCache(maxEndpointsPerSlice),
topologyCache: c.topologyCache,
eventRecorder: c.eventRecorder,
}
c.reconciler = endpointslicerec.NewReconciler(
c.client,
c.nodeLister,
c.maxEndpointsPerSlice,
c.endpointSliceTracker,
c.topologyCache,
c.eventRecorder,
)
return c
}
@@ -218,11 +218,11 @@ type Controller struct {
nodesSynced cache.InformerSynced
// reconciler is an util used to reconcile EndpointSlice changes.
reconciler *reconciler
reconciler *endpointslicerec.Reconciler
// triggerTimeTracker is an util used to compute and export the
// EndpointsLastChangeTriggerTime annotation.
triggerTimeTracker *endpointutil.TriggerTimeTracker
triggerTimeTracker *endpointsliceutil.TriggerTimeTracker
// Services that need to be updated. A channel is inappropriate here,
// because it allows services with lots of pods to be serviced much
@@ -334,7 +334,7 @@ func (c *Controller) syncService(logger klog.Logger, key string) error {
}
c.triggerTimeTracker.DeleteService(namespace, name)
c.reconciler.deleteService(namespace, name)
c.reconciler.DeleteService(namespace, name)
c.endpointSliceTracker.DeleteService(namespace, name)
// The service has been deleted, return nil so that it won't be retried.
return nil
@@ -366,7 +366,7 @@ func (c *Controller) syncService(logger klog.Logger, key string) error {
esLabelSelector := labels.Set(map[string]string{
discovery.LabelServiceName: service.Name,
discovery.LabelManagedBy: controllerName,
discovery.LabelManagedBy: endpointslicerec.GetReconcilerName(),
}).AsSelectorPreValidated()
endpointSlices, err := c.endpointSliceLister.EndpointSlices(service.Namespace).List(esLabelSelector)
@@ -382,7 +382,7 @@ func (c *Controller) syncService(logger klog.Logger, key string) error {
endpointSlices = dropEndpointSlicesPendingDeletion(endpointSlices)
if c.endpointSliceTracker.StaleSlices(service, endpointSlices) {
return endpointsliceutil.NewStaleInformerCache("EndpointSlice informer cache is out of date")
return endpointslicepkg.NewStaleInformerCache("EndpointSlice informer cache is out of date")
}
// We call ComputeEndpointLastChangeTriggerTime here to make sure that the
@@ -391,7 +391,7 @@ func (c *Controller) syncService(logger klog.Logger, key string) error {
lastChangeTriggerTime := c.triggerTimeTracker.
ComputeEndpointLastChangeTriggerTime(namespace, service, pods)
err = c.reconciler.reconcile(logger, service, pods, endpointSlices, lastChangeTriggerTime)
err = c.reconciler.Reconcile(logger, service, pods, endpointSlices, lastChangeTriggerTime)
if err != nil {
c.eventRecorder.Eventf(service, v1.EventTypeWarning, "FailedToUpdateEndpointSlices",
"Error updating Endpoint Slices for Service %s/%s: %v", service.Namespace, service.Name, err)
@@ -432,7 +432,7 @@ func (c *Controller) onEndpointSliceAdd(obj interface{}) {
utilruntime.HandleError(fmt.Errorf("Invalid EndpointSlice provided to onEndpointSliceAdd()"))
return
}
if managedByController(endpointSlice) && c.endpointSliceTracker.ShouldSync(endpointSlice) {
if endpointslicerec.ManagedByController(endpointSlice) && c.endpointSliceTracker.ShouldSync(endpointSlice) {
c.queueServiceForEndpointSlice(endpointSlice)
}
}
@@ -459,7 +459,7 @@ func (c *Controller) onEndpointSliceUpdate(logger klog.Logger, prevObj, obj inte
c.queueServiceForEndpointSlice(prevEndpointSlice)
return
}
if managedByChanged(prevEndpointSlice, endpointSlice) || (managedByController(endpointSlice) && c.endpointSliceTracker.ShouldSync(endpointSlice)) {
if endpointslicerec.ManagedByChanged(prevEndpointSlice, endpointSlice) || (endpointslicerec.ManagedByController(endpointSlice) && c.endpointSliceTracker.ShouldSync(endpointSlice)) {
c.queueServiceForEndpointSlice(endpointSlice)
}
}
@@ -469,7 +469,7 @@ func (c *Controller) onEndpointSliceUpdate(logger klog.Logger, prevObj, obj inte
// endpointSliceTracker.
func (c *Controller) onEndpointSliceDelete(obj interface{}) {
endpointSlice := getEndpointSliceFromDeleteAction(obj)
if endpointSlice != nil && managedByController(endpointSlice) && c.endpointSliceTracker.Has(endpointSlice) {
if endpointSlice != nil && endpointslicerec.ManagedByController(endpointSlice) && c.endpointSliceTracker.Has(endpointSlice) {
// This returns false if we didn't expect the EndpointSlice to be
// deleted. If that is the case, we queue the Service for another sync.
if !c.endpointSliceTracker.HandleDeletion(endpointSlice) {
@@ -481,7 +481,7 @@ func (c *Controller) onEndpointSliceDelete(obj interface{}) {
// queueServiceForEndpointSlice attempts to queue the corresponding Service for
// the provided EndpointSlice.
func (c *Controller) queueServiceForEndpointSlice(endpointSlice *discovery.EndpointSlice) {
key, err := serviceControllerKey(endpointSlice)
key, err := endpointslicerec.ServiceControllerKey(endpointSlice)
if err != nil {
utilruntime.HandleError(fmt.Errorf("Couldn't get key for EndpointSlice %+v: %v", endpointSlice, err))
return
@@ -498,7 +498,7 @@ func (c *Controller) queueServiceForEndpointSlice(endpointSlice *discovery.Endpo
func (c *Controller) addPod(obj interface{}) {
pod := obj.(*v1.Pod)
services, err := endpointutil.GetPodServiceMemberships(c.serviceLister, pod)
services, err := endpointsliceutil.GetPodServiceMemberships(c.serviceLister, pod)
if err != nil {
utilruntime.HandleError(fmt.Errorf("Unable to get pod %s/%s's service memberships: %v", pod.Namespace, pod.Name, err))
return
@@ -509,7 +509,7 @@ func (c *Controller) addPod(obj interface{}) {
}
func (c *Controller) updatePod(old, cur interface{}) {
services := endpointutil.GetServicesToUpdateOnPodChange(c.serviceLister, old, cur)
services := endpointsliceutil.GetServicesToUpdateOnPodChange(c.serviceLister, old, cur)
for key := range services {
c.queue.AddAfter(key, c.endpointUpdatesBatchPeriod)
}
@@ -518,7 +518,7 @@ func (c *Controller) updatePod(old, cur interface{}) {
// When a pod is deleted, enqueue the services the pod used to be a member of
// obj could be an *v1.Pod, or a DeletionFinalStateUnknown marker item.
func (c *Controller) deletePod(obj interface{}) {
pod := endpointutil.GetPodFromDeleteAction(obj)
pod := endpointsliceutil.GetPodFromDeleteAction(obj)
if pod != nil {
c.addPod(pod)
}
@@ -567,7 +567,7 @@ func (c *Controller) checkNodeTopologyDistribution(logger klog.Logger) {
func trackSync(err error) {
metricLabel := "success"
if err != nil {
if endpointsliceutil.IsStaleInformerCacheErr(err) {
if endpointslicepkg.IsStaleInformerCacheErr(err) {
metricLabel = "stale"
} else {
metricLabel = "error"
@@ -586,3 +586,34 @@ func dropEndpointSlicesPendingDeletion(endpointSlices []*discovery.EndpointSlice
}
return endpointSlices[:n]
}
// getEndpointSliceFromDeleteAction parses an EndpointSlice from a delete action.
func getEndpointSliceFromDeleteAction(obj interface{}) *discovery.EndpointSlice {
if endpointSlice, ok := obj.(*discovery.EndpointSlice); ok {
// Enqueue all the services that the pod used to be a member of.
// This is the same thing we do when we add a pod.
return endpointSlice
}
// If we reached here it means the pod was deleted but its final state is unrecorded.
tombstone, ok := obj.(cache.DeletedFinalStateUnknown)
if !ok {
utilruntime.HandleError(fmt.Errorf("Couldn't get object from tombstone %#v", obj))
return nil
}
endpointSlice, ok := tombstone.Obj.(*discovery.EndpointSlice)
if !ok {
utilruntime.HandleError(fmt.Errorf("Tombstone contained object that is not a EndpointSlice: %#v", obj))
return nil
}
return endpointSlice
}
// isNodeReady returns true if a node is ready; false otherwise.
func isNodeReady(node *v1.Node) bool {
for _, c := range node.Status.Conditions {
if c.Type == v1.NodeReady {
return c.Status == v1.ConditionTrue
}
}
return false
}