diff --git a/go.mod b/go.mod index cf223e48b1f..49a606db95a 100644 --- a/go.mod +++ b/go.mod @@ -107,6 +107,7 @@ require ( k8s.io/cri-api v0.0.0 k8s.io/csi-translation-lib v0.0.0 k8s.io/dynamic-resource-allocation v0.0.0 + k8s.io/endpointslice v0.0.0 k8s.io/gengo v0.0.0-20220902162205-c0856e24416d k8s.io/klog/v2 v2.100.1 k8s.io/kms v0.0.0 diff --git a/pkg/controller/endpoint/endpoints_controller.go b/pkg/controller/endpoint/endpoints_controller.go index f2a6e19947a..c4c2236756c 100644 --- a/pkg/controller/endpoint/endpoints_controller.go +++ b/pkg/controller/endpoint/endpoints_controller.go @@ -26,6 +26,7 @@ import ( apiequality "k8s.io/apimachinery/pkg/api/equality" "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/conversion" "k8s.io/apimachinery/pkg/labels" utilruntime "k8s.io/apimachinery/pkg/util/runtime" "k8s.io/apimachinery/pkg/util/wait" @@ -38,13 +39,13 @@ import ( "k8s.io/client-go/tools/leaderelection/resourcelock" "k8s.io/client-go/tools/record" "k8s.io/client-go/util/workqueue" + endpointsliceutil "k8s.io/endpointslice/util" "k8s.io/klog/v2" "k8s.io/kubernetes/pkg/api/v1/endpoints" podutil "k8s.io/kubernetes/pkg/api/v1/pod" api "k8s.io/kubernetes/pkg/apis/core" helper "k8s.io/kubernetes/pkg/apis/core/v1/helper" "k8s.io/kubernetes/pkg/controller" - endpointutil "k8s.io/kubernetes/pkg/controller/util/endpoint" utillabels "k8s.io/kubernetes/pkg/util/labels" utilnet "k8s.io/utils/net" ) @@ -104,7 +105,7 @@ func NewEndpointController(podInformer coreinformers.PodInformer, serviceInforme e.endpointsLister = endpointsInformer.Lister() e.endpointsSynced = endpointsInformer.Informer().HasSynced - e.triggerTimeTracker = endpointutil.NewTriggerTimeTracker() + e.triggerTimeTracker = endpointsliceutil.NewTriggerTimeTracker() e.eventBroadcaster = broadcaster e.eventRecorder = recorder @@ -152,7 +153,7 @@ type Controller struct { // triggerTimeTracker is an util used to compute and export the EndpointsLastChangeTriggerTime // annotation. - triggerTimeTracker *endpointutil.TriggerTimeTracker + triggerTimeTracker *endpointsliceutil.TriggerTimeTracker endpointUpdatesBatchPeriod time.Duration } @@ -193,7 +194,7 @@ func (e *Controller) Run(ctx context.Context, workers int) { // enqueue them. obj must have *v1.Pod type. func (e *Controller) addPod(obj interface{}) { pod := obj.(*v1.Pod) - services, err := endpointutil.GetPodServiceMemberships(e.serviceLister, pod) + services, err := endpointsliceutil.GetPodServiceMemberships(e.serviceLister, pod) if err != nil { utilruntime.HandleError(fmt.Errorf("Unable to get pod %s/%s's service memberships: %v", pod.Namespace, pod.Name, err)) return @@ -262,7 +263,7 @@ func podToEndpointAddressForService(svc *v1.Service, pod *v1.Pod) (*v1.EndpointA // and what services it will be a member of, and enqueue the union of these. // old and cur must be *v1.Pod types. func (e *Controller) updatePod(old, cur interface{}) { - services := endpointutil.GetServicesToUpdateOnPodChange(e.serviceLister, old, cur) + services := endpointsliceutil.GetServicesToUpdateOnPodChange(e.serviceLister, old, cur) for key := range services { e.queue.AddAfter(key, e.endpointUpdatesBatchPeriod) } @@ -271,7 +272,7 @@ func (e *Controller) updatePod(old, cur interface{}) { // When a pod is deleted, enqueue the services the pod used to be a member of. // obj could be an *v1.Pod, or a DeletionFinalStateUnknown marker item. func (e *Controller) deletePod(obj interface{}) { - pod := endpointutil.GetPodFromDeleteAction(obj) + pod := endpointsliceutil.GetPodFromDeleteAction(obj) if pod != nil { e.addPod(pod) } @@ -412,7 +413,7 @@ func (e *Controller) syncService(ctx context.Context, key string) error { var totalNotReadyEps int for _, pod := range pods { - if !endpointutil.ShouldPodBeInEndpoints(pod, service.Spec.PublishNotReadyAddresses) { + if !endpointsliceutil.ShouldPodBeInEndpoints(pod, service.Spec.PublishNotReadyAddresses) { logger.V(5).Info("Pod is not included on endpoints for Service", "pod", klog.KObj(pod), "service", klog.KObj(service)) continue } @@ -426,7 +427,7 @@ func (e *Controller) syncService(ctx context.Context, key string) error { } epa := *ep - if endpointutil.ShouldSetHostname(pod, service) { + if endpointsliceutil.ShouldSetHostname(pod, service) { epa.Hostname = pod.Spec.Hostname } @@ -483,7 +484,7 @@ func (e *Controller) syncService(ctx context.Context, key string) error { // When comparing the subsets, we ignore the difference in ResourceVersion of Pod to avoid unnecessary Endpoints // updates caused by Pod updates that we don't care, e.g. annotation update. if !createEndpoints && - endpointutil.EndpointSubsetsEqualIgnoreResourceVersion(currentEndpoints.Subsets, subsets) && + endpointSubsetsEqualIgnoreResourceVersion(currentEndpoints.Subsets, subsets) && apiequality.Semantic.DeepEqual(compareLabels, service.Labels) && capacityAnnotationSetCorrectly(currentEndpoints.Annotations, currentEndpoints.Subsets) { logger.V(5).Info("endpoints are equal, skipping update", "service", klog.KObj(service)) @@ -702,3 +703,21 @@ func addressSubset(addresses []v1.EndpointAddress, maxNum int) []v1.EndpointAddr } return addresses[0:maxNum] } + +// semanticIgnoreResourceVersion does semantic deep equality checks for objects +// but excludes ResourceVersion of ObjectReference. They are used when comparing +// endpoints in Endpoints and EndpointSlice objects to avoid unnecessary updates +// caused by Pod resourceVersion change. +var semanticIgnoreResourceVersion = conversion.EqualitiesOrDie( + func(a, b v1.ObjectReference) bool { + a.ResourceVersion = "" + b.ResourceVersion = "" + return a == b + }, +) + +// endpointSubsetsEqualIgnoreResourceVersion returns true if EndpointSubsets +// have equal attributes but excludes ResourceVersion of Pod. +func endpointSubsetsEqualIgnoreResourceVersion(subsets1, subsets2 []v1.EndpointSubset) bool { + return semanticIgnoreResourceVersion.DeepEqual(subsets1, subsets2) +} diff --git a/pkg/controller/endpoint/endpoints_controller_test.go b/pkg/controller/endpoint/endpoints_controller_test.go index 8d16c7cddd0..e71f07259dd 100644 --- a/pkg/controller/endpoint/endpoints_controller_test.go +++ b/pkg/controller/endpoint/endpoints_controller_test.go @@ -2559,3 +2559,101 @@ func waitForChanReceive(t *testing.T, timeout time.Duration, receivingChan chan case <-receivingChan: } } + +func TestEndpointSubsetsEqualIgnoreResourceVersion(t *testing.T) { + copyAndMutateEndpointSubset := func(orig *v1.EndpointSubset, mutator func(*v1.EndpointSubset)) *v1.EndpointSubset { + newSubSet := orig.DeepCopy() + mutator(newSubSet) + return newSubSet + } + es1 := &v1.EndpointSubset{ + Addresses: []v1.EndpointAddress{ + { + IP: "1.1.1.1", + TargetRef: &v1.ObjectReference{Kind: "Pod", Name: "pod1-1", Namespace: "ns", ResourceVersion: "1"}, + }, + }, + NotReadyAddresses: []v1.EndpointAddress{ + { + IP: "1.1.1.2", + TargetRef: &v1.ObjectReference{Kind: "Pod", Name: "pod1-2", Namespace: "ns2", ResourceVersion: "2"}, + }, + }, + Ports: []v1.EndpointPort{{Port: 8081, Protocol: "TCP"}}, + } + es2 := &v1.EndpointSubset{ + Addresses: []v1.EndpointAddress{ + { + IP: "2.2.2.1", + TargetRef: &v1.ObjectReference{Kind: "Pod", Name: "pod2-1", Namespace: "ns", ResourceVersion: "3"}, + }, + }, + NotReadyAddresses: []v1.EndpointAddress{ + { + IP: "2.2.2.2", + TargetRef: &v1.ObjectReference{Kind: "Pod", Name: "pod2-2", Namespace: "ns2", ResourceVersion: "4"}, + }, + }, + Ports: []v1.EndpointPort{{Port: 8082, Protocol: "TCP"}}, + } + tests := []struct { + name string + subsets1 []v1.EndpointSubset + subsets2 []v1.EndpointSubset + expected bool + }{ + { + name: "Subsets removed", + subsets1: []v1.EndpointSubset{*es1, *es2}, + subsets2: []v1.EndpointSubset{*es1}, + expected: false, + }, + { + name: "Ready Pod IP changed", + subsets1: []v1.EndpointSubset{*es1, *es2}, + subsets2: []v1.EndpointSubset{*copyAndMutateEndpointSubset(es1, func(es *v1.EndpointSubset) { + es.Addresses[0].IP = "1.1.1.10" + }), *es2}, + expected: false, + }, + { + name: "NotReady Pod IP changed", + subsets1: []v1.EndpointSubset{*es1, *es2}, + subsets2: []v1.EndpointSubset{*es1, *copyAndMutateEndpointSubset(es2, func(es *v1.EndpointSubset) { + es.NotReadyAddresses[0].IP = "2.2.2.10" + })}, + expected: false, + }, + { + name: "Pod ResourceVersion changed", + subsets1: []v1.EndpointSubset{*es1, *es2}, + subsets2: []v1.EndpointSubset{*es1, *copyAndMutateEndpointSubset(es2, func(es *v1.EndpointSubset) { + es.Addresses[0].TargetRef.ResourceVersion = "100" + })}, + expected: true, + }, + { + name: "Pod ResourceVersion removed", + subsets1: []v1.EndpointSubset{*es1, *es2}, + subsets2: []v1.EndpointSubset{*es1, *copyAndMutateEndpointSubset(es2, func(es *v1.EndpointSubset) { + es.Addresses[0].TargetRef.ResourceVersion = "" + })}, + expected: true, + }, + { + name: "Ports changed", + subsets1: []v1.EndpointSubset{*es1, *es2}, + subsets2: []v1.EndpointSubset{*es1, *copyAndMutateEndpointSubset(es1, func(es *v1.EndpointSubset) { + es.Ports[0].Port = 8082 + })}, + expected: false, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + if got := endpointSubsetsEqualIgnoreResourceVersion(tt.subsets1, tt.subsets2); got != tt.expected { + t.Errorf("semanticIgnoreResourceVersion.DeepEqual() = %v, expected %v", got, tt.expected) + } + }) + } +} diff --git a/pkg/controller/endpointslice/endpointslice_controller.go b/pkg/controller/endpointslice/endpointslice_controller.go index ba0ed5df2fb..c25214d5344 100644 --- a/pkg/controller/endpointslice/endpointslice_controller.go +++ b/pkg/controller/endpointslice/endpointslice_controller.go @@ -40,12 +40,13 @@ import ( "k8s.io/client-go/tools/cache" "k8s.io/client-go/tools/record" "k8s.io/client-go/util/workqueue" + endpointslicerec "k8s.io/endpointslice" + endpointslicemetrics "k8s.io/endpointslice/metrics" + "k8s.io/endpointslice/topologycache" + endpointsliceutil "k8s.io/endpointslice/util" "k8s.io/klog/v2" "k8s.io/kubernetes/pkg/controller" - endpointslicemetrics "k8s.io/kubernetes/pkg/controller/endpointslice/metrics" - "k8s.io/kubernetes/pkg/controller/endpointslice/topologycache" - endpointutil "k8s.io/kubernetes/pkg/controller/util/endpoint" - endpointsliceutil "k8s.io/kubernetes/pkg/controller/util/endpointslice" + endpointslicepkg "k8s.io/kubernetes/pkg/controller/util/endpointslice" "k8s.io/kubernetes/pkg/features" ) @@ -143,7 +144,7 @@ func NewController(ctx context.Context, podInformer coreinformers.PodInformer, c.maxEndpointsPerSlice = maxEndpointsPerSlice - c.triggerTimeTracker = endpointutil.NewTriggerTimeTracker() + c.triggerTimeTracker = endpointsliceutil.NewTriggerTimeTracker() c.eventBroadcaster = broadcaster c.eventRecorder = recorder @@ -166,15 +167,14 @@ func NewController(ctx context.Context, podInformer coreinformers.PodInformer, c.topologyCache = topologycache.NewTopologyCache() } - c.reconciler = &reconciler{ - client: c.client, - nodeLister: c.nodeLister, - maxEndpointsPerSlice: c.maxEndpointsPerSlice, - endpointSliceTracker: c.endpointSliceTracker, - metricsCache: endpointslicemetrics.NewCache(maxEndpointsPerSlice), - topologyCache: c.topologyCache, - eventRecorder: c.eventRecorder, - } + c.reconciler = endpointslicerec.NewReconciler( + c.client, + c.nodeLister, + c.maxEndpointsPerSlice, + c.endpointSliceTracker, + c.topologyCache, + c.eventRecorder, + ) return c } @@ -218,11 +218,11 @@ type Controller struct { nodesSynced cache.InformerSynced // reconciler is an util used to reconcile EndpointSlice changes. - reconciler *reconciler + reconciler *endpointslicerec.Reconciler // triggerTimeTracker is an util used to compute and export the // EndpointsLastChangeTriggerTime annotation. - triggerTimeTracker *endpointutil.TriggerTimeTracker + triggerTimeTracker *endpointsliceutil.TriggerTimeTracker // Services that need to be updated. A channel is inappropriate here, // because it allows services with lots of pods to be serviced much @@ -334,7 +334,7 @@ func (c *Controller) syncService(logger klog.Logger, key string) error { } c.triggerTimeTracker.DeleteService(namespace, name) - c.reconciler.deleteService(namespace, name) + c.reconciler.DeleteService(namespace, name) c.endpointSliceTracker.DeleteService(namespace, name) // The service has been deleted, return nil so that it won't be retried. return nil @@ -366,7 +366,7 @@ func (c *Controller) syncService(logger klog.Logger, key string) error { esLabelSelector := labels.Set(map[string]string{ discovery.LabelServiceName: service.Name, - discovery.LabelManagedBy: controllerName, + discovery.LabelManagedBy: endpointslicerec.GetReconcilerName(), }).AsSelectorPreValidated() endpointSlices, err := c.endpointSliceLister.EndpointSlices(service.Namespace).List(esLabelSelector) @@ -382,7 +382,7 @@ func (c *Controller) syncService(logger klog.Logger, key string) error { endpointSlices = dropEndpointSlicesPendingDeletion(endpointSlices) if c.endpointSliceTracker.StaleSlices(service, endpointSlices) { - return endpointsliceutil.NewStaleInformerCache("EndpointSlice informer cache is out of date") + return endpointslicepkg.NewStaleInformerCache("EndpointSlice informer cache is out of date") } // We call ComputeEndpointLastChangeTriggerTime here to make sure that the @@ -391,7 +391,7 @@ func (c *Controller) syncService(logger klog.Logger, key string) error { lastChangeTriggerTime := c.triggerTimeTracker. ComputeEndpointLastChangeTriggerTime(namespace, service, pods) - err = c.reconciler.reconcile(logger, service, pods, endpointSlices, lastChangeTriggerTime) + err = c.reconciler.Reconcile(logger, service, pods, endpointSlices, lastChangeTriggerTime) if err != nil { c.eventRecorder.Eventf(service, v1.EventTypeWarning, "FailedToUpdateEndpointSlices", "Error updating Endpoint Slices for Service %s/%s: %v", service.Namespace, service.Name, err) @@ -432,7 +432,7 @@ func (c *Controller) onEndpointSliceAdd(obj interface{}) { utilruntime.HandleError(fmt.Errorf("Invalid EndpointSlice provided to onEndpointSliceAdd()")) return } - if managedByController(endpointSlice) && c.endpointSliceTracker.ShouldSync(endpointSlice) { + if endpointslicerec.ManagedByController(endpointSlice) && c.endpointSliceTracker.ShouldSync(endpointSlice) { c.queueServiceForEndpointSlice(endpointSlice) } } @@ -459,7 +459,7 @@ func (c *Controller) onEndpointSliceUpdate(logger klog.Logger, prevObj, obj inte c.queueServiceForEndpointSlice(prevEndpointSlice) return } - if managedByChanged(prevEndpointSlice, endpointSlice) || (managedByController(endpointSlice) && c.endpointSliceTracker.ShouldSync(endpointSlice)) { + if endpointslicerec.ManagedByChanged(prevEndpointSlice, endpointSlice) || (endpointslicerec.ManagedByController(endpointSlice) && c.endpointSliceTracker.ShouldSync(endpointSlice)) { c.queueServiceForEndpointSlice(endpointSlice) } } @@ -469,7 +469,7 @@ func (c *Controller) onEndpointSliceUpdate(logger klog.Logger, prevObj, obj inte // endpointSliceTracker. func (c *Controller) onEndpointSliceDelete(obj interface{}) { endpointSlice := getEndpointSliceFromDeleteAction(obj) - if endpointSlice != nil && managedByController(endpointSlice) && c.endpointSliceTracker.Has(endpointSlice) { + if endpointSlice != nil && endpointslicerec.ManagedByController(endpointSlice) && c.endpointSliceTracker.Has(endpointSlice) { // This returns false if we didn't expect the EndpointSlice to be // deleted. If that is the case, we queue the Service for another sync. if !c.endpointSliceTracker.HandleDeletion(endpointSlice) { @@ -481,7 +481,7 @@ func (c *Controller) onEndpointSliceDelete(obj interface{}) { // queueServiceForEndpointSlice attempts to queue the corresponding Service for // the provided EndpointSlice. func (c *Controller) queueServiceForEndpointSlice(endpointSlice *discovery.EndpointSlice) { - key, err := serviceControllerKey(endpointSlice) + key, err := endpointslicerec.ServiceControllerKey(endpointSlice) if err != nil { utilruntime.HandleError(fmt.Errorf("Couldn't get key for EndpointSlice %+v: %v", endpointSlice, err)) return @@ -498,7 +498,7 @@ func (c *Controller) queueServiceForEndpointSlice(endpointSlice *discovery.Endpo func (c *Controller) addPod(obj interface{}) { pod := obj.(*v1.Pod) - services, err := endpointutil.GetPodServiceMemberships(c.serviceLister, pod) + services, err := endpointsliceutil.GetPodServiceMemberships(c.serviceLister, pod) if err != nil { utilruntime.HandleError(fmt.Errorf("Unable to get pod %s/%s's service memberships: %v", pod.Namespace, pod.Name, err)) return @@ -509,7 +509,7 @@ func (c *Controller) addPod(obj interface{}) { } func (c *Controller) updatePod(old, cur interface{}) { - services := endpointutil.GetServicesToUpdateOnPodChange(c.serviceLister, old, cur) + services := endpointsliceutil.GetServicesToUpdateOnPodChange(c.serviceLister, old, cur) for key := range services { c.queue.AddAfter(key, c.endpointUpdatesBatchPeriod) } @@ -518,7 +518,7 @@ func (c *Controller) updatePod(old, cur interface{}) { // When a pod is deleted, enqueue the services the pod used to be a member of // obj could be an *v1.Pod, or a DeletionFinalStateUnknown marker item. func (c *Controller) deletePod(obj interface{}) { - pod := endpointutil.GetPodFromDeleteAction(obj) + pod := endpointsliceutil.GetPodFromDeleteAction(obj) if pod != nil { c.addPod(pod) } @@ -567,7 +567,7 @@ func (c *Controller) checkNodeTopologyDistribution(logger klog.Logger) { func trackSync(err error) { metricLabel := "success" if err != nil { - if endpointsliceutil.IsStaleInformerCacheErr(err) { + if endpointslicepkg.IsStaleInformerCacheErr(err) { metricLabel = "stale" } else { metricLabel = "error" @@ -586,3 +586,34 @@ func dropEndpointSlicesPendingDeletion(endpointSlices []*discovery.EndpointSlice } return endpointSlices[:n] } + +// getEndpointSliceFromDeleteAction parses an EndpointSlice from a delete action. +func getEndpointSliceFromDeleteAction(obj interface{}) *discovery.EndpointSlice { + if endpointSlice, ok := obj.(*discovery.EndpointSlice); ok { + // Enqueue all the services that the pod used to be a member of. + // This is the same thing we do when we add a pod. + return endpointSlice + } + // If we reached here it means the pod was deleted but its final state is unrecorded. + tombstone, ok := obj.(cache.DeletedFinalStateUnknown) + if !ok { + utilruntime.HandleError(fmt.Errorf("Couldn't get object from tombstone %#v", obj)) + return nil + } + endpointSlice, ok := tombstone.Obj.(*discovery.EndpointSlice) + if !ok { + utilruntime.HandleError(fmt.Errorf("Tombstone contained object that is not a EndpointSlice: %#v", obj)) + return nil + } + return endpointSlice +} + +// isNodeReady returns true if a node is ready; false otherwise. +func isNodeReady(node *v1.Node) bool { + for _, c := range node.Status.Conditions { + if c.Type == v1.NodeReady { + return c.Status == v1.ConditionTrue + } + } + return false +} diff --git a/pkg/controller/endpointslice/endpointslice_controller_test.go b/pkg/controller/endpointslice/endpointslice_controller_test.go index c129c87e4d6..2b0908a5509 100644 --- a/pkg/controller/endpointslice/endpointslice_controller_test.go +++ b/pkg/controller/endpointslice/endpointslice_controller_test.go @@ -40,11 +40,11 @@ import ( "k8s.io/client-go/kubernetes/fake" k8stesting "k8s.io/client-go/testing" "k8s.io/client-go/tools/cache" + "k8s.io/endpointslice/topologycache" + endpointsliceutil "k8s.io/endpointslice/util" "k8s.io/klog/v2/ktesting" "k8s.io/kubernetes/pkg/controller" - "k8s.io/kubernetes/pkg/controller/endpointslice/topologycache" - endpointutil "k8s.io/kubernetes/pkg/controller/util/endpoint" - endpointsliceutil "k8s.io/kubernetes/pkg/controller/util/endpointslice" + endpointslicepkg "k8s.io/kubernetes/pkg/controller/util/endpointslice" "k8s.io/utils/pointer" ) @@ -121,6 +121,65 @@ func newController(t *testing.T, nodeNames []string, batchPeriod time.Duration) } } +func newPod(n int, namespace string, ready bool, nPorts int, terminating bool) *v1.Pod { + status := v1.ConditionTrue + if !ready { + status = v1.ConditionFalse + } + + var deletionTimestamp *metav1.Time + if terminating { + deletionTimestamp = &metav1.Time{ + Time: time.Now(), + } + } + + p := &v1.Pod{ + TypeMeta: metav1.TypeMeta{APIVersion: "v1"}, + ObjectMeta: metav1.ObjectMeta{ + Namespace: namespace, + Name: fmt.Sprintf("pod%d", n), + Labels: map[string]string{"foo": "bar"}, + DeletionTimestamp: deletionTimestamp, + ResourceVersion: fmt.Sprint(n), + }, + Spec: v1.PodSpec{ + Containers: []v1.Container{{ + Name: "container-1", + }}, + NodeName: "node-1", + }, + Status: v1.PodStatus{ + PodIP: fmt.Sprintf("1.2.3.%d", 4+n), + PodIPs: []v1.PodIP{{ + IP: fmt.Sprintf("1.2.3.%d", 4+n), + }}, + Conditions: []v1.PodCondition{ + { + Type: v1.PodReady, + Status: status, + }, + }, + }, + } + + return p +} + +func expectActions(t *testing.T, actions []k8stesting.Action, num int, verb, resource string) { + t.Helper() + // if actions are less the below logic will panic + if num > len(actions) { + t.Fatalf("len of actions %v is unexpected. Expected to be at least %v", len(actions), num+1) + } + + for i := 0; i < num; i++ { + relativePos := len(actions) - i - 1 + assert.Equal(t, verb, actions[relativePos].GetVerb(), "Expected action -%d verb to be %s", i, verb) + assert.Equal(t, resource, actions[relativePos].GetResource().Resource, "Expected action -%d resource to be %s", i, resource) + } +} + // Ensure SyncService for service with no selector results in no action func TestSyncServiceNoSelector(t *testing.T) { ns := metav1.NamespaceDefault @@ -260,8 +319,8 @@ func TestSyncServiceMissing(t *testing.T) { // Build up existing service existingServiceName := "stillthere" - existingServiceKey := endpointutil.ServiceKey{Name: existingServiceName, Namespace: namespace} - esController.triggerTimeTracker.ServiceStates[existingServiceKey] = endpointutil.ServiceState{} + existingServiceKey := endpointsliceutil.ServiceKey{Name: existingServiceName, Namespace: namespace} + esController.triggerTimeTracker.ServiceStates[existingServiceKey] = endpointsliceutil.ServiceState{} esController.serviceStore.Add(&v1.Service{ ObjectMeta: metav1.ObjectMeta{Name: existingServiceName, Namespace: namespace}, Spec: v1.ServiceSpec{ @@ -272,8 +331,8 @@ func TestSyncServiceMissing(t *testing.T) { // Add missing service to triggerTimeTracker to ensure the reference is cleaned up missingServiceName := "notthere" - missingServiceKey := endpointutil.ServiceKey{Name: missingServiceName, Namespace: namespace} - esController.triggerTimeTracker.ServiceStates[missingServiceKey] = endpointutil.ServiceState{} + missingServiceKey := endpointsliceutil.ServiceKey{Name: missingServiceName, Namespace: namespace} + esController.triggerTimeTracker.ServiceStates[missingServiceKey] = endpointsliceutil.ServiceState{} logger, _ := ktesting.NewTestContext(t) err := esController.syncService(logger, fmt.Sprintf("%s/%s", namespace, missingServiceName)) @@ -1709,7 +1768,7 @@ func TestSyncServiceStaleInformer(t *testing.T) { logger, _ := ktesting.NewTestContext(t) err = esController.syncService(logger, fmt.Sprintf("%s/%s", ns, serviceName)) // Check if we got a StaleInformerCache error - if endpointsliceutil.IsStaleInformerCacheErr(err) != testcase.expectError { + if endpointslicepkg.IsStaleInformerCacheErr(err) != testcase.expectError { t.Fatalf("Expected error because informer cache is outdated") } diff --git a/pkg/controller/endpointslicemirroring/endpointslicemirroring_controller.go b/pkg/controller/endpointslicemirroring/endpointslicemirroring_controller.go index efd1c994c16..36d0206ec20 100644 --- a/pkg/controller/endpointslicemirroring/endpointslicemirroring_controller.go +++ b/pkg/controller/endpointslicemirroring/endpointslicemirroring_controller.go @@ -39,10 +39,11 @@ import ( "k8s.io/client-go/tools/cache" "k8s.io/client-go/tools/record" "k8s.io/client-go/util/workqueue" + endpointsliceutil "k8s.io/endpointslice/util" "k8s.io/klog/v2" "k8s.io/kubernetes/pkg/controller" "k8s.io/kubernetes/pkg/controller/endpointslicemirroring/metrics" - endpointsliceutil "k8s.io/kubernetes/pkg/controller/util/endpointslice" + endpointslicepkg "k8s.io/kubernetes/pkg/controller/util/endpointslice" ) const ( @@ -318,7 +319,7 @@ func (c *Controller) syncEndpoints(logger klog.Logger, key string) error { } if c.endpointSliceTracker.StaleSlices(svc, endpointSlices) { - return endpointsliceutil.NewStaleInformerCache("EndpointSlice informer cache is out of date") + return endpointslicepkg.NewStaleInformerCache("EndpointSlice informer cache is out of date") } err = c.reconciler.reconcile(logger, endpoints, endpointSlices) diff --git a/pkg/controller/endpointslicemirroring/metrics/cache.go b/pkg/controller/endpointslicemirroring/metrics/cache.go index 553449687a8..5bd498c018b 100644 --- a/pkg/controller/endpointslicemirroring/metrics/cache.go +++ b/pkg/controller/endpointslicemirroring/metrics/cache.go @@ -21,7 +21,7 @@ import ( "sync" "k8s.io/apimachinery/pkg/types" - endpointutil "k8s.io/kubernetes/pkg/controller/util/endpoint" + endpointsliceutil "k8s.io/endpointslice/util" ) // NewCache returns a new Cache with the specified endpointsPerSlice. @@ -53,7 +53,7 @@ type Cache struct { // as the efficiency of EndpointSlice endpoints distribution for each unique // Service Port combination. type EndpointPortCache struct { - items map[endpointutil.PortMapKey]EfficiencyInfo + items map[endpointsliceutil.PortMapKey]EfficiencyInfo } // EfficiencyInfo stores the number of Endpoints and Slices for calculating @@ -67,13 +67,13 @@ type EfficiencyInfo struct { // NewEndpointPortCache initializes and returns a new EndpointPortCache. func NewEndpointPortCache() *EndpointPortCache { return &EndpointPortCache{ - items: map[endpointutil.PortMapKey]EfficiencyInfo{}, + items: map[endpointsliceutil.PortMapKey]EfficiencyInfo{}, } } // Set updates the EndpointPortCache to contain the provided EfficiencyInfo // for the provided PortMapKey. -func (spc *EndpointPortCache) Set(pmKey endpointutil.PortMapKey, eInfo EfficiencyInfo) { +func (spc *EndpointPortCache) Set(pmKey endpointsliceutil.PortMapKey, eInfo EfficiencyInfo) { spc.items[pmKey] = eInfo } diff --git a/pkg/controller/endpointslicemirroring/metrics/cache_test.go b/pkg/controller/endpointslicemirroring/metrics/cache_test.go index 021e3919348..00297194a31 100644 --- a/pkg/controller/endpointslicemirroring/metrics/cache_test.go +++ b/pkg/controller/endpointslicemirroring/metrics/cache_test.go @@ -21,7 +21,7 @@ import ( discovery "k8s.io/api/discovery/v1" "k8s.io/apimachinery/pkg/types" - endpointutil "k8s.io/kubernetes/pkg/controller/util/endpoint" + endpointsliceutil "k8s.io/endpointslice/util" ) func TestNumEndpointsAndSlices(t *testing.T) { @@ -30,8 +30,8 @@ func TestNumEndpointsAndSlices(t *testing.T) { p80 := int32(80) p443 := int32(443) - pmKey80443 := endpointutil.NewPortMapKey([]discovery.EndpointPort{{Port: &p80}, {Port: &p443}}) - pmKey80 := endpointutil.NewPortMapKey([]discovery.EndpointPort{{Port: &p80}}) + pmKey80443 := endpointsliceutil.NewPortMapKey([]discovery.EndpointPort{{Port: &p80}, {Port: &p443}}) + pmKey80 := endpointsliceutil.NewPortMapKey([]discovery.EndpointPort{{Port: &p80}}) spCacheEfficient := NewEndpointPortCache() spCacheEfficient.Set(pmKey80, EfficiencyInfo{Endpoints: 45, Slices: 1}) diff --git a/pkg/controller/endpointslicemirroring/reconciler.go b/pkg/controller/endpointslicemirroring/reconciler.go index b82323126cb..73016516c69 100644 --- a/pkg/controller/endpointslicemirroring/reconciler.go +++ b/pkg/controller/endpointslicemirroring/reconciler.go @@ -28,11 +28,10 @@ import ( "k8s.io/apimachinery/pkg/types" clientset "k8s.io/client-go/kubernetes" "k8s.io/client-go/tools/record" + endpointsliceutil "k8s.io/endpointslice/util" "k8s.io/klog/v2" endpointsv1 "k8s.io/kubernetes/pkg/api/v1/endpoints" "k8s.io/kubernetes/pkg/controller/endpointslicemirroring/metrics" - endpointutil "k8s.io/kubernetes/pkg/controller/util/endpoint" - endpointsliceutil "k8s.io/kubernetes/pkg/controller/util/endpointslice" ) // reconciler is responsible for transforming current EndpointSlice state into @@ -145,7 +144,7 @@ func (r *reconciler) reconcile(logger klog.Logger, endpoints *corev1.Endpoints, slices.append(pmSlices) totals.add(pmTotals) - epMetrics.Set(endpointutil.PortMapKey(portKey), metrics.EfficiencyInfo{ + epMetrics.Set(endpointsliceutil.PortMapKey(portKey), metrics.EfficiencyInfo{ Endpoints: numEndpoints, Slices: len(existingSlicesByKey[portKey]) + len(pmSlices.toCreate) - len(pmSlices.toDelete), }) @@ -324,7 +323,7 @@ func totalChanges(existingSlice *discovery.EndpointSlice, desiredSet endpointsli // If existing version of endpoint doesn't match desired version // increment number of endpoints to be updated. - if !endpointutil.EndpointsEqualBeyondHash(got, &endpoint) { + if !endpointsliceutil.EndpointsEqualBeyondHash(got, &endpoint) { totals.updated++ } } diff --git a/pkg/controller/endpointslicemirroring/reconciler_helpers.go b/pkg/controller/endpointslicemirroring/reconciler_helpers.go index 9f870382a10..fec80578bbe 100644 --- a/pkg/controller/endpointslicemirroring/reconciler_helpers.go +++ b/pkg/controller/endpointslicemirroring/reconciler_helpers.go @@ -19,7 +19,7 @@ package endpointslicemirroring import ( v1 "k8s.io/api/core/v1" discovery "k8s.io/api/discovery/v1" - endpointsliceutil "k8s.io/kubernetes/pkg/controller/util/endpointslice" + endpointsliceutil "k8s.io/endpointslice/util" ) // slicesByAction includes lists of slices to create, update, or delete. diff --git a/pkg/controller/endpointslicemirroring/reconciler_test.go b/pkg/controller/endpointslicemirroring/reconciler_test.go index 510826fd954..bcdbc60f25d 100644 --- a/pkg/controller/endpointslicemirroring/reconciler_test.go +++ b/pkg/controller/endpointslicemirroring/reconciler_test.go @@ -28,10 +28,10 @@ import ( "k8s.io/client-go/kubernetes/scheme" "k8s.io/client-go/tools/record" "k8s.io/component-base/metrics/testutil" + endpointsliceutil "k8s.io/endpointslice/util" "k8s.io/klog/v2/ktesting" endpointsv1 "k8s.io/kubernetes/pkg/api/v1/endpoints" "k8s.io/kubernetes/pkg/controller/endpointslicemirroring/metrics" - endpointsliceutil "k8s.io/kubernetes/pkg/controller/util/endpointslice" "k8s.io/utils/pointer" ) diff --git a/pkg/controller/endpointslicemirroring/utils.go b/pkg/controller/endpointslicemirroring/utils.go index 45888664b62..9c5d3ee847b 100644 --- a/pkg/controller/endpointslicemirroring/utils.go +++ b/pkg/controller/endpointslicemirroring/utils.go @@ -27,8 +27,8 @@ import ( utilruntime "k8s.io/apimachinery/pkg/util/runtime" "k8s.io/client-go/tools/cache" "k8s.io/client-go/tools/leaderelection/resourcelock" + endpointsliceutil "k8s.io/endpointslice/util" "k8s.io/kubernetes/pkg/apis/discovery/validation" - endpointutil "k8s.io/kubernetes/pkg/controller/util/endpoint" netutils "k8s.io/utils/net" ) @@ -38,7 +38,7 @@ type addrTypePortMapKey string // newAddrTypePortMapKey generates a PortMapKey from endpoint ports. func newAddrTypePortMapKey(endpointPorts []discovery.EndpointPort, addrType discovery.AddressType) addrTypePortMapKey { - pmk := fmt.Sprintf("%s-%s", addrType, endpointutil.NewPortMapKey(endpointPorts)) + pmk := fmt.Sprintf("%s-%s", addrType, endpointsliceutil.NewPortMapKey(endpointPorts)) return addrTypePortMapKey(pmk) } diff --git a/pkg/controller/util/endpoint/OWNERS b/pkg/controller/util/endpoint/OWNERS deleted file mode 100644 index d75d826496c..00000000000 --- a/pkg/controller/util/endpoint/OWNERS +++ /dev/null @@ -1,15 +0,0 @@ -# See the OWNERS docs at https://go.k8s.io/owners - -approvers: - - bowei - - freehan - - MrHohn - - thockin - - sig-network-approvers -reviewers: - - robscott - - freehan - - bowei - - sig-network-reviewers -labels: - - sig/network diff --git a/pkg/controller/util/endpointslice/utils.go b/pkg/controller/util/endpointslice/utils.go deleted file mode 100644 index 3cc8b84e4bd..00000000000 --- a/pkg/controller/util/endpointslice/utils.go +++ /dev/null @@ -1,27 +0,0 @@ -/* -Copyright 2021 The Kubernetes Authors. - -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. -*/ - -package endpointslice - -import ( - discovery "k8s.io/api/discovery/v1" -) - -// EndpointReady returns true if an Endpoint has the Ready condition set to -// true. -func EndpointReady(endpoint discovery.Endpoint) bool { - return endpoint.Conditions.Ready != nil && *endpoint.Conditions.Ready -} diff --git a/staging/src/k8s.io/endpointslice/OWNERS b/staging/src/k8s.io/endpointslice/OWNERS index 78c13d3ce76..675d11afb44 100644 --- a/staging/src/k8s.io/endpointslice/OWNERS +++ b/staging/src/k8s.io/endpointslice/OWNERS @@ -1,16 +1,8 @@ # See the OWNERS docs at https://go.k8s.io/owners approvers: - - bowei - - freehan - - MrHohn - - thockin - - robscott - sig-network-approvers reviewers: - - robscott - - freehan - - bowei - sig-network-reviewers labels: - sig/network diff --git a/staging/src/k8s.io/endpointslice/README.md b/staging/src/k8s.io/endpointslice/README.md index f82de4d9d0f..98f71dbb3ce 100644 --- a/staging/src/k8s.io/endpointslice/README.md +++ b/staging/src/k8s.io/endpointslice/README.md @@ -5,6 +5,13 @@ This repository contains packages related to the [EndpointSlices](https://github.com/kubernetes/enhancements/tree/master/keps/sig-network/0752-endpointslices) feature. +This EndpointSlice reconciler library is not sufficiently generic to be used by +the EndpointSlice Mirroring controller. The reconciler in the EndpointSlice +mirroring controller has a 1:1 mapping between Service/Endpoints and +EndpointSlice, which results in a simpler implementation then the EndpointSlice +staging lib. Contributions to move towards the shared code being used by the +mirroring controller would be welcome. + ## Compatibility There are *NO compatibility guarantees* for this repository, yet. It is in direct support of Kubernetes, so branches diff --git a/staging/src/k8s.io/endpointslice/go.mod b/staging/src/k8s.io/endpointslice/go.mod index 42d1302bcd0..2beeed7cc5d 100644 --- a/staging/src/k8s.io/endpointslice/go.mod +++ b/staging/src/k8s.io/endpointslice/go.mod @@ -4,4 +4,69 @@ module k8s.io/endpointslice go 1.20 -replace k8s.io/endpointslice => ../endpointslice +require ( + github.com/davecgh/go-spew v1.1.1 + github.com/google/go-cmp v0.5.9 + github.com/stretchr/testify v1.8.2 + k8s.io/api v0.0.0 + k8s.io/apimachinery v0.0.0 + k8s.io/client-go v0.0.0 + k8s.io/component-base v0.0.0 + k8s.io/klog/v2 v2.100.1 + k8s.io/utils v0.0.0-20230406110748-d93618cff8a2 +) + +require ( + github.com/beorn7/perks v1.0.1 // indirect + github.com/blang/semver/v4 v4.0.0 // indirect + github.com/cespare/xxhash/v2 v2.2.0 // indirect + github.com/emicklei/go-restful/v3 v3.9.0 // indirect + github.com/evanphx/json-patch v5.6.0+incompatible // indirect + github.com/go-logr/logr v1.2.4 // indirect + github.com/go-openapi/jsonpointer v0.19.6 // indirect + github.com/go-openapi/jsonreference v0.20.2 // indirect + github.com/go-openapi/swag v0.22.3 // indirect + github.com/gogo/protobuf v1.3.2 // indirect + github.com/golang/groupcache v0.0.0-20210331224755-41bb18bfe9da // indirect + github.com/golang/protobuf v1.5.3 // indirect + github.com/google/gnostic-models v0.6.8 // indirect + github.com/google/gofuzz v1.2.0 // indirect + github.com/google/uuid v1.3.0 // indirect + github.com/josharian/intern v1.0.0 // indirect + github.com/json-iterator/go v1.1.12 // indirect + github.com/mailru/easyjson v0.7.7 // indirect + github.com/matttproud/golang_protobuf_extensions v1.0.4 // indirect + github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd // indirect + github.com/modern-go/reflect2 v1.0.2 // indirect + github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822 // indirect + github.com/pkg/errors v0.9.1 // indirect + github.com/pmezard/go-difflib v1.0.0 // indirect + github.com/prometheus/client_golang v1.16.0 // indirect + github.com/prometheus/client_model v0.3.0 // indirect + github.com/prometheus/common v0.42.0 // indirect + github.com/prometheus/procfs v0.10.1 // indirect + github.com/spf13/pflag v1.0.5 // indirect + golang.org/x/net v0.9.0 // indirect + golang.org/x/oauth2 v0.6.0 // indirect + golang.org/x/sys v0.8.0 // indirect + golang.org/x/term v0.7.0 // indirect + golang.org/x/text v0.9.0 // indirect + golang.org/x/time v0.3.0 // indirect + google.golang.org/appengine v1.6.7 // indirect + google.golang.org/protobuf v1.30.0 // indirect + gopkg.in/inf.v0 v0.9.1 // indirect + gopkg.in/yaml.v2 v2.4.0 // indirect + gopkg.in/yaml.v3 v3.0.1 // indirect + k8s.io/kube-openapi v0.0.0-20230601164746-7562a1006961 // indirect + sigs.k8s.io/json v0.0.0-20221116044647-bc3834ca7abd // indirect + sigs.k8s.io/structured-merge-diff/v4 v4.2.3 // indirect + sigs.k8s.io/yaml v1.3.0 // indirect +) + +replace ( + k8s.io/api => ../api + k8s.io/apimachinery => ../apimachinery + k8s.io/client-go => ../client-go + k8s.io/component-base => ../component-base + k8s.io/endpointslice => ../endpointslice +) diff --git a/staging/src/k8s.io/endpointslice/go.sum b/staging/src/k8s.io/endpointslice/go.sum index e69de29bb2d..0cc3c215ffa 100644 --- a/staging/src/k8s.io/endpointslice/go.sum +++ b/staging/src/k8s.io/endpointslice/go.sum @@ -0,0 +1,221 @@ +cloud.google.com/go/compute/metadata v0.2.0/go.mod h1:zFmK7XCadkQkj6TtorcaGlCW1hT1fIilQDwofLpJ20k= +github.com/Azure/go-ansiterm v0.0.0-20210617225240-d185dfc1b5a1/go.mod h1:xomTg63KZ2rFqZQzSB4Vz2SUXa1BpHTVz9L5PTmPC4E= +github.com/NYTimes/gziphandler v0.0.0-20170623195520-56545f4a5d46/go.mod h1:3wb06e3pkSAbeQ52E9H9iFoQsEEwGN64994WTCIhntQ= +github.com/alecthomas/kingpin/v2 v2.3.1/go.mod h1:oYL5vtsvEHZGHxU7DMp32Dvx+qL+ptGn6lWaot2vCNE= +github.com/alecthomas/units v0.0.0-20211218093645-b94a6e3cc137/go.mod h1:OMCwj8VM1Kc9e19TLln2VL61YJF0x1XFtfdL4JdbSyE= +github.com/armon/go-socks5 v0.0.0-20160902184237-e75332964ef5/go.mod h1:wHh0iHkYZB8zMSxRWpUBQtwG5a7fFgvEO+odwuTv2gs= +github.com/asaskevich/govalidator v0.0.0-20190424111038-f61b66f89f4a/go.mod h1:lB+ZfQJz7igIIfQNfa7Ml4HSf2uFQQRzpGGRXenZAgY= +github.com/beorn7/perks v1.0.1 h1:VlbKKnNfV8bJzeqoa4cOKqO6bYr3WgKZxO8Z16+hsOM= +github.com/beorn7/perks v1.0.1/go.mod h1:G2ZrVWU2WbWT9wwq4/hrbKbnv/1ERSJQ0ibhJ6rlkpw= +github.com/blang/semver/v4 v4.0.0 h1:1PFHFE6yCCTv8C1TeyNNarDzntLi7wMI5i/pzqYIsAM= +github.com/blang/semver/v4 v4.0.0/go.mod h1:IbckMUScFkM3pff0VJDNKRiT6TG/YpiHIM2yvyW5YoQ= +github.com/cenkalti/backoff/v4 v4.2.1/go.mod h1:Y3VNntkOUPxTVeUxJ/G5vcM//AlwfmyYozVcomhLiZE= +github.com/cespare/xxhash/v2 v2.2.0 h1:DC2CZ1Ep5Y4k3ZQ899DldepgrayRUGE6BBZ/cd9Cj44= +github.com/cespare/xxhash/v2 v2.2.0/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs= +github.com/creack/pty v1.1.9/go.mod h1:oKZEueFk5CKHvIhNR5MUki03XCEU+Q6VDXinZuGJ33E= +github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= +github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/emicklei/go-restful/v3 v3.9.0 h1:XwGDlfxEnQZzuopoqxwSEllNcCOM9DhhFyhFIIGKwxE= +github.com/emicklei/go-restful/v3 v3.9.0/go.mod h1:6n3XBCmQQb25CM2LCACGz8ukIrRry+4bhvbpWn3mrbc= +github.com/evanphx/json-patch v5.6.0+incompatible h1:jBYDEEiFBPxA0v50tFdvOzQQTCvpL6mnFh5mB2/l16U= +github.com/evanphx/json-patch v5.6.0+incompatible/go.mod h1:50XU6AFN0ol/bzJsmQLiYLvXMP4fmwYFNcr97nuDLSk= +github.com/felixge/httpsnoop v1.0.3/go.mod h1:m8KPJKqk1gH5J9DgRY2ASl2lWCfGKXixSwevea8zH2U= +github.com/go-kit/log v0.2.1/go.mod h1:NwTd00d/i8cPZ3xOwwiv2PO5MOcx78fFErGNcVmBjv0= +github.com/go-logfmt/logfmt v0.5.1/go.mod h1:WYhtIu8zTZfxdn5+rREduYbwxfcBr/Vr6KEVveWlfTs= +github.com/go-logr/logr v1.2.0/go.mod h1:jdQByPbusPIv2/zmleS9BjJVeZ6kBagPoEUsqbVz/1A= +github.com/go-logr/logr v1.2.4 h1:g01GSCwiDw2xSZfjJ2/T9M+S6pFdcNtFYsp+Y43HYDQ= +github.com/go-logr/logr v1.2.4/go.mod h1:jdQByPbusPIv2/zmleS9BjJVeZ6kBagPoEUsqbVz/1A= +github.com/go-logr/stdr v1.2.2/go.mod h1:mMo/vtBO5dYbehREoey6XUKy/eSumjCCveDpRre4VKE= +github.com/go-logr/zapr v1.2.3/go.mod h1:eIauM6P8qSvTw5o2ez6UEAfGjQKrxQTl5EoK+Qa2oG4= +github.com/go-openapi/jsonpointer v0.19.6 h1:eCs3fxoIi3Wh6vtgmLTOjdhSpiqphQ+DaPn38N2ZdrE= +github.com/go-openapi/jsonpointer v0.19.6/go.mod h1:osyAmYz/mB/C3I+WsTTSgw1ONzaLJoLCyoi6/zppojs= +github.com/go-openapi/jsonreference v0.20.2 h1:3sVjiK66+uXK/6oQ8xgcRKcFgQ5KXa2KvnJRumpMGbE= +github.com/go-openapi/jsonreference v0.20.2/go.mod h1:Bl1zwGIM8/wsvqjsOQLJ/SH+En5Ap4rVB5KVcIDZG2k= +github.com/go-openapi/swag v0.22.3 h1:yMBqmnQ0gyZvEb/+KzuWZOXgllrXT4SADYbvDaXHv/g= +github.com/go-openapi/swag v0.22.3/go.mod h1:UzaqsxGiab7freDnrUUra0MwWfN/q7tE4j+VcZ0yl14= +github.com/go-task/slim-sprig v0.0.0-20230315185526-52ccab3ef572 h1:tfuBGBXKqDEevZMzYi5KSi8KkcZtzBcTgAUUtapy0OI= +github.com/go-task/slim-sprig v0.0.0-20230315185526-52ccab3ef572/go.mod h1:9Pwr4B2jHnOSGXyyzV8ROjYa2ojvAY6HCGYYfMoC3Ls= +github.com/gogo/protobuf v1.3.2 h1:Ov1cvc58UF3b5XjBnZv7+opcTcQFZebYjWzi34vdm4Q= +github.com/gogo/protobuf v1.3.2/go.mod h1:P1XiOD3dCwIKUDQYPy72D8LYyHL2YPYrpS2s69NZV8Q= +github.com/golang/groupcache v0.0.0-20210331224755-41bb18bfe9da h1:oI5xCqsCo564l8iNU+DwB5epxmsaqB+rhGL0m5jtYqE= +github.com/golang/groupcache v0.0.0-20210331224755-41bb18bfe9da/go.mod h1:cIg4eruTrX1D+g88fzRXU5OdNfaM+9IcxsU14FzY7Hc= +github.com/golang/protobuf v1.2.0/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U= +github.com/golang/protobuf v1.3.1/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U= +github.com/golang/protobuf v1.3.5/go.mod h1:6O5/vntMXwX2lRkT1hjjk0nAC1IDOTvTlVgjlRvqsdk= +github.com/golang/protobuf v1.5.0/go.mod h1:FsONVRAS9T7sI+LIUmWTfcYkHO4aIWwzhcaSAoJOfIk= +github.com/golang/protobuf v1.5.3 h1:KhyjKVUg7Usr/dYsdSqoFveMYd5ko72D+zANwlG1mmg= +github.com/golang/protobuf v1.5.3/go.mod h1:XVQd3VNwM+JqD3oG2Ue2ip4fOMUkwXdXDdiuN0vRsmY= +github.com/google/btree v1.0.1/go.mod h1:xXMiIv4Fb/0kKde4SpL7qlzvu5cMJDRkFDxJfI9uaxA= +github.com/google/gnostic-models v0.6.8 h1:yo/ABAfM5IMRsS1VnXjTBvUb61tFIHozhlYvRgGre9I= +github.com/google/gnostic-models v0.6.8/go.mod h1:5n7qKqH0f5wFt+aWF8CW6pZLLNOfYuF5OpfBSENuI8U= +github.com/google/go-cmp v0.5.5/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= +github.com/google/go-cmp v0.5.9 h1:O2Tfq5qg4qc4AmwVlvv0oLiVAGB7enBSJ2x2DqQFi38= +github.com/google/go-cmp v0.5.9/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY= +github.com/google/gofuzz v1.0.0/go.mod h1:dBl0BpW6vV/+mYPU4Po3pmUjxk6FQPldtuIdl/M65Eg= +github.com/google/gofuzz v1.2.0 h1:xRy4A+RhZaiKjJ1bPfwQ8sedCA+YS2YcCHW6ec7JMi0= +github.com/google/gofuzz v1.2.0/go.mod h1:dBl0BpW6vV/+mYPU4Po3pmUjxk6FQPldtuIdl/M65Eg= +github.com/google/pprof v0.0.0-20210720184732-4bb14d4b1be1 h1:K6RDEckDVWvDI9JAJYCmNdQXq6neHJOYx3V6jnqNEec= +github.com/google/pprof v0.0.0-20210720184732-4bb14d4b1be1/go.mod h1:kpwsk12EmLew5upagYY7GY0pfYCcupk39gWOCRROcvE= +github.com/google/uuid v1.3.0 h1:t6JiXgmwXMjEs8VusXIJk2BXHsn+wx8BZdTaoZ5fu7I= +github.com/google/uuid v1.3.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= +github.com/gregjones/httpcache v0.0.0-20180305231024-9cad4c3443a7/go.mod h1:FecbI9+v66THATjSRHfNgh1IVFe/9kFxbXtjV0ctIMA= +github.com/grpc-ecosystem/grpc-gateway/v2 v2.7.0/go.mod h1:hgWBS7lorOAVIJEQMi4ZsPv9hVvWI6+ch50m39Pf2Ks= +github.com/imdario/mergo v0.3.6/go.mod h1:2EnlNZ0deacrJVfApfmtdGgDfMuh/nq6Ok1EcJh5FfA= +github.com/inconshreveable/mousetrap v1.1.0/go.mod h1:vpF70FUmC8bwa3OWnCshd2FqLfsEA9PFc4w1p2J65bw= +github.com/josharian/intern v1.0.0 h1:vlS4z54oSdjm0bgjRigI+G1HpF+tI+9rE5LLzOg8HmY= +github.com/josharian/intern v1.0.0/go.mod h1:5DoeVV0s6jJacbCEi61lwdGj/aVlrQvzHFFd8Hwg//Y= +github.com/jpillora/backoff v1.0.0/go.mod h1:J/6gKK9jxlEcS3zixgDgUAsiuZ7yrSoa/FX5e0EB2j4= +github.com/json-iterator/go v1.1.12 h1:PV8peI4a0ysnczrg+LtxykD8LfKY9ML6u2jnxaEnrnM= +github.com/json-iterator/go v1.1.12/go.mod h1:e30LSqwooZae/UwlEbR2852Gd8hjQvJoHmT4TnhNGBo= +github.com/julienschmidt/httprouter v1.3.0/go.mod h1:JR6WtHb+2LUe8TCKY3cZOxFyyO8IZAc4RVcycCCAKdM= +github.com/kisielk/errcheck v1.5.0/go.mod h1:pFxgyoBC7bSaBwPgfKdkLd5X25qrDl4LWUI2bnpBCr8= +github.com/kisielk/gotool v1.0.0/go.mod h1:XhKaO+MFFWcvkIS/tQcRk01m1F5IRFswLeQ+oQHNcck= +github.com/kr/pretty v0.2.1/go.mod h1:ipq/a2n7PKx3OHsz4KJII5eveXtPO4qwEXGdVfWzfnI= +github.com/kr/pretty v0.3.1 h1:flRD4NNwYAUpkphVc1HcthR4KEIFJ65n8Mw5qdRn3LE= +github.com/kr/pretty v0.3.1/go.mod h1:hoEshYVHaxMs3cyo3Yncou5ZscifuDolrwPKZanG3xk= +github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ= +github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI= +github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY= +github.com/kr/text v0.2.0/go.mod h1:eLer722TekiGuMkidMxC/pM04lWEeraHUUmBw8l2grE= +github.com/mailru/easyjson v0.7.7 h1:UGYAvKxe3sBsEDzO8ZeWOSlIQfWFlxbzLZe7hwFURr0= +github.com/mailru/easyjson v0.7.7/go.mod h1:xzfreul335JAWq5oZzymOObrkdz5UnU4kGfJJLY9Nlc= +github.com/matttproud/golang_protobuf_extensions v1.0.4 h1:mmDVorXM7PCGKw94cs5zkfA9PSy5pEvNWRP0ET0TIVo= +github.com/matttproud/golang_protobuf_extensions v1.0.4/go.mod h1:BSXmuO+STAnVfrANrmjBb36TMTDstsz7MSK+HVaYKv4= +github.com/moby/spdystream v0.2.0/go.mod h1:f7i0iNDQJ059oMTcWxx8MA/zKFIuD/lY+0GqbN2Wy8c= +github.com/moby/term v0.0.0-20221205130635-1aeaba878587/go.mod h1:8FzsFHVUBGZdbDsJw/ot+X+d5HLUbvklYLJ9uGfcI3Y= +github.com/modern-go/concurrent v0.0.0-20180228061459-e0a39a4cb421/go.mod h1:6dJC0mAP4ikYIbvyc7fijjWJddQyLn8Ig3JB5CqoB9Q= +github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd h1:TRLaZ9cD/w8PVh93nsPXa1VrQ6jlwL5oN8l14QlcNfg= +github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd/go.mod h1:6dJC0mAP4ikYIbvyc7fijjWJddQyLn8Ig3JB5CqoB9Q= +github.com/modern-go/reflect2 v1.0.2 h1:xBagoLtFs94CBntxluKeaWgTMpvLxC4ur3nMaC9Gz0M= +github.com/modern-go/reflect2 v1.0.2/go.mod h1:yWuevngMOJpCy52FWWMvUC8ws7m/LJsjYzDa0/r8luk= +github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822 h1:C3w9PqII01/Oq1c1nUAm88MOHcQC9l5mIlSMApZMrHA= +github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822/go.mod h1:+n7T8mK8HuQTcFwEeznm/DIxMOiR9yIdICNftLE1DvQ= +github.com/mwitkow/go-conntrack v0.0.0-20190716064945-2f068394615f/go.mod h1:qRWi+5nqEBWmkhHvq77mSJWrCKwh8bxhgT7d/eI7P4U= +github.com/mxk/go-flowrate v0.0.0-20140419014527-cca7078d478f/go.mod h1:ZdcZmHo+o7JKHSa8/e818NopupXU1YMK5fe1lsApnBw= +github.com/onsi/ginkgo/v2 v2.9.4 h1:xR7vG4IXt5RWx6FfIjyAtsoMAtnc3C/rFXBBd2AjZwE= +github.com/onsi/ginkgo/v2 v2.9.4/go.mod h1:gCQYp2Q+kSoIj7ykSVb9nskRSsR6PUj4AiLywzIhbKM= +github.com/onsi/gomega v1.27.6 h1:ENqfyGeS5AX/rlXDd/ETokDz93u0YufY1Pgxuy/PvWE= +github.com/onsi/gomega v1.27.6/go.mod h1:PIQNjfQwkP3aQAH7lf7j87O/5FiNr+ZR8+ipb+qQlhg= +github.com/peterbourgon/diskv v2.0.1+incompatible/go.mod h1:uqqh8zWWbv1HBMNONnaR/tNboyR3/BZd58JJSHlUSCU= +github.com/pkg/errors v0.9.1 h1:FEBLx1zS214owpjy7qsBeixbURkuhQAwrK5UwLGTwt4= +github.com/pkg/errors v0.9.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= +github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= +github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= +github.com/prometheus/client_golang v1.16.0 h1:yk/hx9hDbrGHovbci4BY+pRMfSuuat626eFsHb7tmT8= +github.com/prometheus/client_golang v1.16.0/go.mod h1:Zsulrv/L9oM40tJ7T815tM89lFEugiJ9HzIqaAx4LKc= +github.com/prometheus/client_model v0.3.0 h1:UBgGFHqYdG/TPFD1B1ogZywDqEkwp3fBMvqdiQ7Xew4= +github.com/prometheus/client_model v0.3.0/go.mod h1:LDGWKZIo7rky3hgvBe+caln+Dr3dPggB5dvjtD7w9+w= +github.com/prometheus/common v0.42.0 h1:EKsfXEYo4JpWMHH5cg+KOUWeuJSov1Id8zGR8eeI1YM= +github.com/prometheus/common v0.42.0/go.mod h1:xBwqVerjNdUDjgODMpudtOMwlOwf2SaTr1yjz4b7Zbc= +github.com/prometheus/procfs v0.10.1 h1:kYK1Va/YMlutzCGazswoHKo//tZVlFpKYh+PymziUAg= +github.com/prometheus/procfs v0.10.1/go.mod h1:nwNm2aOCAYw8uTR/9bWRREkZFxAUcWzPHWJq+XBB/FM= +github.com/rogpeppe/go-internal v1.6.1 h1:/FiVV8dS/e+YqF2JvO3yXRFbBLTIuSDkuC7aBOAvL+k= +github.com/rogpeppe/go-internal v1.6.1/go.mod h1:xXDCJY+GAPziupqXw64V24skbSoqbTEfhy4qGm1nDQc= +github.com/spf13/cobra v1.7.0/go.mod h1:uLxZILRyS/50WlhOIKD7W6V5bgeIt+4sICxh6uRMrb0= +github.com/spf13/pflag v1.0.5 h1:iy+VFUOCP1a+8yFto/drg2CJ5u0yRoB7fZw3DKv/JXA= +github.com/spf13/pflag v1.0.5/go.mod h1:McXfInJRrz4CZXVZOBLb0bTZqETkiAhM9Iw0y3An2Bg= +github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= +github.com/stretchr/objx v0.4.0/go.mod h1:YvHI0jy2hoMjB+UWwv71VJQ9isScKT/TqJzVSSt89Yw= +github.com/stretchr/objx v0.5.0/go.mod h1:Yh+to48EsGEfYuaHDzXPcE3xhTkx73EhmCGUpEOglKo= +github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI= +github.com/stretchr/testify v1.7.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= +github.com/stretchr/testify v1.8.0/go.mod h1:yNjHg4UonilssWZ8iaSj1OCr/vHnekPRkoO+kdMU+MU= +github.com/stretchr/testify v1.8.1/go.mod h1:w2LPCIKwWwSfY2zedu0+kehJoqGctiVI29o6fzry7u4= +github.com/stretchr/testify v1.8.2 h1:+h33VjcLVPDHtOdpUCuF+7gSuG3yGIftsP1YvFihtJ8= +github.com/stretchr/testify v1.8.2/go.mod h1:w2LPCIKwWwSfY2zedu0+kehJoqGctiVI29o6fzry7u4= +github.com/xhit/go-str2duration v1.2.0/go.mod h1:3cPSlfZlUHVlneIVfePFWcJZsuwf+P1v2SRTV4cUmp4= +github.com/yuin/goldmark v1.1.27/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74= +github.com/yuin/goldmark v1.2.1/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74= +go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.35.1/go.mod h1:9NiG9I2aHTKkcxqCILhjtyNA1QEiCjdBACv4IvrFQ+c= +go.opentelemetry.io/otel v1.10.0/go.mod h1:NbvWjCthWHKBEUMpf0/v8ZRZlni86PpGFEMA9pnQSnQ= +go.opentelemetry.io/otel/exporters/otlp/internal/retry v1.10.0/go.mod h1:78XhIg8Ht9vR4tbLNUhXsiOnE2HOuSeKAiAcoVQEpOY= +go.opentelemetry.io/otel/exporters/otlp/otlptrace v1.10.0/go.mod h1:Krqnjl22jUJ0HgMzw5eveuCvFDXY4nSYb4F8t5gdrag= +go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracegrpc v1.10.0/go.mod h1:OfUCyyIiDvNXHWpcWgbF+MWvqPZiNa3YDEnivcnYsV0= +go.opentelemetry.io/otel/metric v0.31.0/go.mod h1:ohmwj9KTSIeBnDBm/ZwH2PSZxZzoOaG2xZeekTRzL5A= +go.opentelemetry.io/otel/sdk v1.10.0/go.mod h1:vO06iKzD5baltJz1zarxMCNHFpUlUiOy4s65ECtn6kE= +go.opentelemetry.io/otel/trace v1.10.0/go.mod h1:Sij3YYczqAdz+EhmGhE6TpTxUO5/F/AzrK+kxfGqySM= +go.opentelemetry.io/proto/otlp v0.19.0/go.mod h1:H7XAot3MsfNsj7EXtrA2q5xSNQ10UqI405h3+duxN4U= +go.uber.org/atomic v1.10.0/go.mod h1:LUxbIzbOniOlMKjJjyPfpl4v+PKK2cNJn91OQbhoJI0= +go.uber.org/goleak v1.2.1/go.mod h1:qlT2yGI9QafXHhZZLxlSuNsMw3FFLxBr+tBRlmO1xH4= +go.uber.org/multierr v1.11.0/go.mod h1:20+QtiLqy0Nd6FdQB9TLXag12DsQkrbs3htMFfDN80Y= +go.uber.org/zap v1.19.0/go.mod h1:xg/QME4nWcxGxrpdeYfq7UvYrLh66cuVKdrbD1XF/NI= +golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w= +golang.org/x/crypto v0.0.0-20191011191535-87dc89f01550/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI= +golang.org/x/crypto v0.0.0-20200622213623-75b288015ac9/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto= +golang.org/x/mod v0.2.0/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA= +golang.org/x/mod v0.3.0/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA= +golang.org/x/mod v0.8.0/go.mod h1:iBbtSCu2XBx23ZKBPSOrRkjjQPZFPuis4dIYUhu/chs= +golang.org/x/net v0.0.0-20190404232315-eb5bcb51f2a3/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg= +golang.org/x/net v0.0.0-20190603091049-60506f45cf65/go.mod h1:HSz+uSET+XFnRR8LxR5pz3Of3rY3CfYBVs4xY44aLks= +golang.org/x/net v0.0.0-20190620200207-3b0461eec859/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= +golang.org/x/net v0.0.0-20200226121028-0de0cce0169b/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= +golang.org/x/net v0.0.0-20201021035429-f5854403a974/go.mod h1:sp8m0HH+o8qH0wwXwYZr8TS3Oi6o0r6Gce1SSxlDquU= +golang.org/x/net v0.9.0 h1:aWJ/m6xSmxWBx+V0XRHTlrYrPG56jKsLdTFmsSsCzOM= +golang.org/x/net v0.9.0/go.mod h1:d48xBJpPfHeWQsugry2m+kC02ZBRGRgulfHnEXEuWns= +golang.org/x/oauth2 v0.6.0 h1:Lh8GPgSKBfWSwFvtuWOfeI3aAAnbXTSutYxJiOJFgIw= +golang.org/x/oauth2 v0.6.0/go.mod h1:ycmewcwgD4Rpr3eZJLSB4Kyyljb3qDh40vJ8STE5HKw= +golang.org/x/sync v0.0.0-20181221193216-37e7f081c4d4/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= +golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= +golang.org/x/sync v0.0.0-20190911185100-cd5d95a43a6e/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= +golang.org/x/sync v0.0.0-20201020160332-67f06af15bc9/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= +golang.org/x/sync v0.2.0/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= +golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= +golang.org/x/sys v0.0.0-20190412213103-97732733099d/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/sys v0.0.0-20200930185726-fdedc70b468f/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/sys v0.8.0 h1:EBmGv8NaZBZTWvrbjNoL6HVt+IVy3QDQpJs7VRIw3tU= +golang.org/x/sys v0.8.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/term v0.7.0 h1:BEvjmm5fURWqcfbSKTdpkDXYBrUS1c0m8agp14W48vQ= +golang.org/x/term v0.7.0/go.mod h1:P32HKFT3hSsZrRxla30E9HqToFYAQPCMs/zFMBUFqPY= +golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= +golang.org/x/text v0.3.2/go.mod h1:bEr9sfX3Q8Zfm5fL9x+3itogRgK3+ptLWKqgva+5dAk= +golang.org/x/text v0.3.3/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ= +golang.org/x/text v0.9.0 h1:2sjJmO8cDvYveuX97RDLsxlyUxLl+GHoLxBiRdHllBE= +golang.org/x/text v0.9.0/go.mod h1:e1OnstbJyHTd6l/uOt8jFFHp6TRDWZR/bV3emEE/zU8= +golang.org/x/time v0.3.0 h1:rg5rLMjNzMS1RkNLzCG38eapWhnYLFYXDXj2gOlr8j4= +golang.org/x/time v0.3.0/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ= +golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= +golang.org/x/tools v0.0.0-20191119224855-298f0cb1881e/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo= +golang.org/x/tools v0.0.0-20200619180055-7c47624df98f/go.mod h1:EkVYQZoAsY45+roYkvgYkIh4xh/qjgUK9TdY2XT94GE= +golang.org/x/tools v0.0.0-20210106214847-113979e3529a/go.mod h1:emZCQorbCU4vsT4fOWvOPXz4eW1wZW4PmDk9uLelYpA= +golang.org/x/tools v0.8.0 h1:vSDcovVPld282ceKgDimkRSC8kpaH1dgyc9UMzlt84Y= +golang.org/x/tools v0.8.0/go.mod h1:JxBZ99ISMI5ViVkT1tr6tdNmXeTrcpVSD3vZ1RsRdN4= +golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= +golang.org/x/xerrors v0.0.0-20191011141410-1b5146add898/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= +golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= +golang.org/x/xerrors v0.0.0-20200804184101-5ec99f83aff1/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= +golang.org/x/xerrors v0.0.0-20220907171357-04be3eba64a2/go.mod h1:K8+ghG5WaK9qNqU5K3HdILfMLy1f3aNYFI/wnl100a8= +google.golang.org/appengine v1.6.7 h1:FZR1q0exgwxzPzp/aF+VccGrSfxfPpkBqjIIEq3ru6c= +google.golang.org/appengine v1.6.7/go.mod h1:8WjMMxjGQR8xUklV/ARdw2HLXBOI7O7uCIDZVag1xfc= +google.golang.org/genproto v0.0.0-20230526161137-0005af68ea54/go.mod h1:zqTuNwFlFRsw5zIts5VnzLQxSRqh+CGOTVMlYbY0Eyk= +google.golang.org/genproto/googleapis/api v0.0.0-20230525234035-dd9d682886f9/go.mod h1:vHYtlOoi6TsQ3Uk2yxR7NI5z8uoV+3pZtR4jmHIkRig= +google.golang.org/genproto/googleapis/rpc v0.0.0-20230525234030-28d5490b6b19/go.mod h1:66JfowdXAEgad5O9NnYcsNPLCPZJD++2L9X0PCMODrA= +google.golang.org/grpc v1.54.0/go.mod h1:PUSEXI6iWghWaB6lXM4knEgpJNu2qUcKfDtNci3EC2g= +google.golang.org/protobuf v1.26.0-rc.1/go.mod h1:jlhhOSvTdKEhbULTjvd4ARK9grFBp09yW+WbY/TyQbw= +google.golang.org/protobuf v1.26.0/go.mod h1:9q0QmTI4eRPtz6boOQmLYwt+qCgq0jsYwAQnmE0givc= +google.golang.org/protobuf v1.30.0 h1:kPPoIgf3TsEvrm0PFe15JQ+570QVxYzEvvHqChK+cng= +google.golang.org/protobuf v1.30.0/go.mod h1:HV8QOd/L58Z+nl8r43ehVNZIU/HEI6OcFqwMG9pJV4I= +gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= +gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c h1:Hei/4ADfdWqJk1ZMxUNpqntNwaWcugrBjAiHlqqRiVk= +gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c/go.mod h1:JHkPIbrfpd72SG/EVd6muEfDQjcINNoR0C8j2r3qZ4Q= +gopkg.in/inf.v0 v0.9.1 h1:73M5CoZyi3ZLMOyDlQh031Cx6N9NDJ2Vvfl76EDAgDc= +gopkg.in/inf.v0 v0.9.1/go.mod h1:cWUDdTG/fYaXco+Dcufb5Vnc6Gp2YChqWtbxRZE0mXw= +gopkg.in/yaml.v2 v2.2.8/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= +gopkg.in/yaml.v2 v2.4.0 h1:D8xgwECY7CYvx+Y2n4sBz93Jn9JRvxdiyyo8CTfuKaY= +gopkg.in/yaml.v2 v2.4.0/go.mod h1:RDklbk79AGWmwhnvt/jBztapEOGDOx6ZbXqjP6csGnQ= +gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= +gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA= +gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= +k8s.io/gengo v0.0.0-20210813121822-485abfe95c7c/go.mod h1:FiNAH4ZV3gBg2Kwh89tzAEV2be7d5xI0vBa/VySYy3E= +k8s.io/klog/v2 v2.100.1 h1:7WCHKK6K8fNhTqfBhISHQ97KrnJNFZMcQvKp7gP/tmg= +k8s.io/klog/v2 v2.100.1/go.mod h1:y1WjHnz7Dj687irZUWR/WLkLc5N1YHtjLdmgWjndZn0= +k8s.io/kube-openapi v0.0.0-20230601164746-7562a1006961 h1:pqRVJGQJz6oeZby8qmPKXYIBjyrcv7EHCe/33UkZMYA= +k8s.io/kube-openapi v0.0.0-20230601164746-7562a1006961/go.mod h1:l8HTwL5fqnlns4jOveW1L75eo7R9KFHxiE0bsPGy428= +k8s.io/utils v0.0.0-20230406110748-d93618cff8a2 h1:qY1Ad8PODbnymg2pRbkyMT/ylpTrCM8P2RJ0yroCyIk= +k8s.io/utils v0.0.0-20230406110748-d93618cff8a2/go.mod h1:OLgZIPagt7ERELqWJFomSt595RzquPNLL48iOWgYOg0= +sigs.k8s.io/json v0.0.0-20221116044647-bc3834ca7abd h1:EDPBXCAspyGV4jQlpZSudPeMmr1bNJefnuqLsRAsHZo= +sigs.k8s.io/json v0.0.0-20221116044647-bc3834ca7abd/go.mod h1:B8JuhiUyNFVKdsE8h686QcCxMaH6HrOAZj4vswFpcB0= +sigs.k8s.io/structured-merge-diff/v4 v4.2.3 h1:PRbqxJClWWYMNV1dhaG4NsibJbArud9kFxnAMREiWFE= +sigs.k8s.io/structured-merge-diff/v4 v4.2.3/go.mod h1:qjx8mGObPmV2aSZepjQjbmb2ihdVs8cGKBraizNC69E= +sigs.k8s.io/yaml v1.3.0 h1:a2VclLzOGrwOHDiV8EfBGhvjHvP46CtW5j6POvhYGGo= +sigs.k8s.io/yaml v1.3.0/go.mod h1:GeOyir5tyXNByN85N/dRIT9es5UQNerPYEKK56eTBm8= diff --git a/pkg/controller/endpointslice/metrics/cache.go b/staging/src/k8s.io/endpointslice/metrics/cache.go similarity index 95% rename from pkg/controller/endpointslice/metrics/cache.go rename to staging/src/k8s.io/endpointslice/metrics/cache.go index 2bd22316167..e2681f262fe 100644 --- a/pkg/controller/endpointslice/metrics/cache.go +++ b/staging/src/k8s.io/endpointslice/metrics/cache.go @@ -21,7 +21,7 @@ import ( "sync" "k8s.io/apimachinery/pkg/types" - endpointutil "k8s.io/kubernetes/pkg/controller/util/endpoint" + endpointsliceutil "k8s.io/endpointslice/util" ) // NewCache returns a new Cache with the specified endpointsPerSlice. @@ -58,7 +58,7 @@ type Cache struct { // as the efficiency of EndpointSlice endpoints distribution for each unique // Service Port combination. type ServicePortCache struct { - items map[endpointutil.PortMapKey]EfficiencyInfo + items map[endpointsliceutil.PortMapKey]EfficiencyInfo } // EfficiencyInfo stores the number of Endpoints and Slices for calculating @@ -72,13 +72,13 @@ type EfficiencyInfo struct { // NewServicePortCache initializes and returns a new ServicePortCache. func NewServicePortCache() *ServicePortCache { return &ServicePortCache{ - items: map[endpointutil.PortMapKey]EfficiencyInfo{}, + items: map[endpointsliceutil.PortMapKey]EfficiencyInfo{}, } } // Set updates the ServicePortCache to contain the provided EfficiencyInfo // for the provided PortMapKey. -func (spc *ServicePortCache) Set(pmKey endpointutil.PortMapKey, eInfo EfficiencyInfo) { +func (spc *ServicePortCache) Set(pmKey endpointsliceutil.PortMapKey, eInfo EfficiencyInfo) { spc.items[pmKey] = eInfo } diff --git a/pkg/controller/endpointslice/metrics/cache_test.go b/staging/src/k8s.io/endpointslice/metrics/cache_test.go similarity index 84% rename from pkg/controller/endpointslice/metrics/cache_test.go rename to staging/src/k8s.io/endpointslice/metrics/cache_test.go index 2f4839d4ea8..bbf400852f2 100644 --- a/pkg/controller/endpointslice/metrics/cache_test.go +++ b/staging/src/k8s.io/endpointslice/metrics/cache_test.go @@ -22,7 +22,7 @@ import ( discovery "k8s.io/api/discovery/v1" "k8s.io/apimachinery/pkg/types" - "k8s.io/kubernetes/pkg/controller/util/endpoint" + endpointsliceutil "k8s.io/endpointslice/util" "k8s.io/utils/pointer" ) @@ -32,8 +32,8 @@ func TestNumEndpointsAndSlices(t *testing.T) { p80 := int32(80) p443 := int32(443) - pmKey80443 := endpoint.NewPortMapKey([]discovery.EndpointPort{{Port: &p80}, {Port: &p443}}) - pmKey80 := endpoint.NewPortMapKey([]discovery.EndpointPort{{Port: &p80}}) + pmKey80443 := endpointsliceutil.NewPortMapKey([]discovery.EndpointPort{{Port: &p80}, {Port: &p443}}) + pmKey80 := endpointsliceutil.NewPortMapKey([]discovery.EndpointPort{{Port: &p80}}) spCacheEfficient := NewServicePortCache() spCacheEfficient.Set(pmKey80, EfficiencyInfo{Endpoints: 45, Slices: 1}) @@ -65,8 +65,8 @@ func TestPlaceHolderSlice(t *testing.T) { p80 := int32(80) p443 := int32(443) - pmKey80443 := endpoint.NewPortMapKey([]discovery.EndpointPort{{Port: &p80}, {Port: &p443}}) - pmKey80 := endpoint.NewPortMapKey([]discovery.EndpointPort{{Port: &p80}}) + pmKey80443 := endpointsliceutil.NewPortMapKey([]discovery.EndpointPort{{Port: &p80}, {Port: &p443}}) + pmKey80 := endpointsliceutil.NewPortMapKey([]discovery.EndpointPort{{Port: &p80}}) sp := NewServicePortCache() sp.Set(pmKey80, EfficiencyInfo{Endpoints: 0, Slices: 1}) @@ -92,9 +92,9 @@ func expectNumEndpointsAndSlices(t *testing.T, c *Cache, desired int, actual int func benchmarkUpdateServicePortCache(b *testing.B, num int) { c := NewCache(int32(100)) ns := "benchmark" - httpKey := endpoint.NewPortMapKey([]discovery.EndpointPort{{Port: pointer.Int32(80)}}) - httpsKey := endpoint.NewPortMapKey([]discovery.EndpointPort{{Port: pointer.Int32(443)}}) - spCache := &ServicePortCache{items: map[endpoint.PortMapKey]EfficiencyInfo{ + httpKey := endpointsliceutil.NewPortMapKey([]discovery.EndpointPort{{Port: pointer.Int32(80)}}) + httpsKey := endpointsliceutil.NewPortMapKey([]discovery.EndpointPort{{Port: pointer.Int32(443)}}) + spCache := &ServicePortCache{items: map[endpointsliceutil.PortMapKey]EfficiencyInfo{ httpKey: { Endpoints: 182, Slices: 2, diff --git a/pkg/controller/endpointslice/metrics/metrics.go b/staging/src/k8s.io/endpointslice/metrics/metrics.go similarity index 100% rename from pkg/controller/endpointslice/metrics/metrics.go rename to staging/src/k8s.io/endpointslice/metrics/metrics.go diff --git a/pkg/controller/endpointslice/reconciler.go b/staging/src/k8s.io/endpointslice/reconciler.go similarity index 90% rename from pkg/controller/endpointslice/reconciler.go rename to staging/src/k8s.io/endpointslice/reconciler.go index 9f51c491a4e..827bcdc01a2 100644 --- a/pkg/controller/endpointslice/reconciler.go +++ b/staging/src/k8s.io/endpointslice/reconciler.go @@ -33,16 +33,21 @@ import ( clientset "k8s.io/client-go/kubernetes" corelisters "k8s.io/client-go/listers/core/v1" "k8s.io/client-go/tools/record" + "k8s.io/endpointslice/metrics" + "k8s.io/endpointslice/topologycache" + endpointsliceutil "k8s.io/endpointslice/util" "k8s.io/klog/v2" - "k8s.io/kubernetes/pkg/controller/endpointslice/metrics" - "k8s.io/kubernetes/pkg/controller/endpointslice/topologycache" - endpointutil "k8s.io/kubernetes/pkg/controller/util/endpoint" - endpointsliceutil "k8s.io/kubernetes/pkg/controller/util/endpointslice" ) -// reconciler is responsible for transforming current EndpointSlice state into +const ( + // controllerName is a unique value used with LabelManagedBy to indicated + // the component managing an EndpointSlice. + controllerName = "endpointslice-controller.k8s.io" +) + +// Reconciler is responsible for transforming current EndpointSlice state into // desired state -type reconciler struct { +type Reconciler struct { client clientset.Interface nodeLister corelisters.NodeLister maxEndpointsPerSlice int32 @@ -51,22 +56,22 @@ type reconciler struct { // topologyCache tracks the distribution of Nodes and endpoints across zones // to enable TopologyAwareHints. topologyCache *topologycache.TopologyCache - // eventRecorder allows reconciler to record and publish events. + // eventRecorder allows Reconciler to record and publish events. eventRecorder record.EventRecorder } // endpointMeta includes the attributes we group slices on, this type helps with -// that logic in reconciler +// that logic in Reconciler type endpointMeta struct { ports []discovery.EndpointPort addressType discovery.AddressType } -// reconcile takes a set of pods currently matching a service selector and +// Reconcile takes a set of pods currently matching a service selector and // compares them with the endpoints already present in any existing endpoint // slices for the given service. It creates, updates, or deletes endpoint slices // to ensure the desired set of pods are represented by endpoint slices. -func (r *reconciler) reconcile(logger klog.Logger, service *corev1.Service, pods []*corev1.Pod, existingSlices []*discovery.EndpointSlice, triggerTime time.Time) error { +func (r *Reconciler) Reconcile(logger klog.Logger, service *corev1.Service, pods []*corev1.Pod, existingSlices []*discovery.EndpointSlice, triggerTime time.Time) error { slicesToDelete := []*discovery.EndpointSlice{} // slices that are no longer matching any address the service has errs := []error{} // all errors generated in the process of reconciling slicesByAddressType := make(map[discovery.AddressType][]*discovery.EndpointSlice) // slices by address type @@ -76,13 +81,13 @@ func (r *reconciler) reconcile(logger klog.Logger, service *corev1.Service, pods // loop through slices identifying their address type. // slices that no longer match address type supported by services - // go to delete, other slices goes to the reconciler machinery + // go to delete, other slices goes to the Reconciler machinery // for further adjustment for _, existingSlice := range existingSlices { // service no longer supports that address type, add it to deleted slices if !serviceSupportedAddressesTypes.Has(existingSlice.AddressType) { if r.topologyCache != nil { - svcKey, err := serviceControllerKey(existingSlice) + svcKey, err := ServiceControllerKey(existingSlice) if err != nil { logger.Info("Couldn't get key to remove EndpointSlice from topology cache", "existingSlice", existingSlice, "err", err) } else { @@ -130,7 +135,7 @@ func (r *reconciler) reconcile(logger klog.Logger, service *corev1.Service, pods // compares them with the endpoints already present in any existing endpoint // slices (by address type) for the given service. It creates, updates, or deletes endpoint slices // to ensure the desired set of pods are represented by endpoint slices. -func (r *reconciler) reconcileByAddressType(logger klog.Logger, service *corev1.Service, pods []*corev1.Pod, existingSlices []*discovery.EndpointSlice, triggerTime time.Time, addressType discovery.AddressType) error { +func (r *Reconciler) reconcileByAddressType(logger klog.Logger, service *corev1.Service, pods []*corev1.Pod, existingSlices []*discovery.EndpointSlice, triggerTime time.Time, addressType discovery.AddressType) error { errs := []error{} slicesToCreate := []*discovery.EndpointSlice{} @@ -139,10 +144,10 @@ func (r *reconciler) reconcileByAddressType(logger klog.Logger, service *corev1. events := []*topologycache.EventBuilder{} // Build data structures for existing state. - existingSlicesByPortMap := map[endpointutil.PortMapKey][]*discovery.EndpointSlice{} + existingSlicesByPortMap := map[endpointsliceutil.PortMapKey][]*discovery.EndpointSlice{} for _, existingSlice := range existingSlices { if ownedBy(existingSlice, service) { - epHash := endpointutil.NewPortMapKey(existingSlice.Ports) + epHash := endpointsliceutil.NewPortMapKey(existingSlice.Ports) existingSlicesByPortMap[epHash] = append(existingSlicesByPortMap[epHash], existingSlice) } else { slicesToDelete = append(slicesToDelete, existingSlice) @@ -150,16 +155,16 @@ func (r *reconciler) reconcileByAddressType(logger klog.Logger, service *corev1. } // Build data structures for desired state. - desiredMetaByPortMap := map[endpointutil.PortMapKey]*endpointMeta{} - desiredEndpointsByPortMap := map[endpointutil.PortMapKey]endpointsliceutil.EndpointSet{} + desiredMetaByPortMap := map[endpointsliceutil.PortMapKey]*endpointMeta{} + desiredEndpointsByPortMap := map[endpointsliceutil.PortMapKey]endpointsliceutil.EndpointSet{} for _, pod := range pods { - if !endpointutil.ShouldPodBeInEndpoints(pod, true) { + if !endpointsliceutil.ShouldPodBeInEndpoints(pod, true) { continue } endpointPorts := getEndpointPorts(logger, service, pod) - epHash := endpointutil.NewPortMapKey(endpointPorts) + epHash := endpointsliceutil.NewPortMapKey(endpointPorts) if _, ok := desiredEndpointsByPortMap[epHash]; !ok { desiredEndpointsByPortMap[epHash] = endpointsliceutil.EndpointSet{} } @@ -238,7 +243,7 @@ func (r *reconciler) reconcileByAddressType(logger klog.Logger, service *corev1. } else { slicesToCreate = append(slicesToCreate, placeholderSlice) } - spMetrics.Set(endpointutil.NewPortMapKey(placeholderSlice.Ports), metrics.EfficiencyInfo{ + spMetrics.Set(endpointsliceutil.NewPortMapKey(placeholderSlice.Ports), metrics.EfficiencyInfo{ Endpoints: 0, Slices: 1, }) @@ -288,6 +293,22 @@ func (r *reconciler) reconcileByAddressType(logger klog.Logger, service *corev1. } +func NewReconciler(client clientset.Interface, nodeLister corelisters.NodeLister, maxEndpointsPerSlice int32, endpointSliceTracker *endpointsliceutil.EndpointSliceTracker, topologyCache *topologycache.TopologyCache, eventRecorder record.EventRecorder) *Reconciler { + return &Reconciler{ + client: client, + nodeLister: nodeLister, + maxEndpointsPerSlice: maxEndpointsPerSlice, + endpointSliceTracker: endpointSliceTracker, + metricsCache: metrics.NewCache(maxEndpointsPerSlice), + topologyCache: topologyCache, + eventRecorder: eventRecorder, + } +} + +func GetReconcilerName() string { + return controllerName +} + // placeholderSliceCompare is a conversion func for comparing two placeholder endpoint slices. // It only compares the specific fields we care about. var placeholderSliceCompare = conversion.EqualitiesOrDie( @@ -313,7 +334,7 @@ var placeholderSliceCompare = conversion.EqualitiesOrDie( ) // finalize creates, updates, and deletes slices as specified -func (r *reconciler) finalize( +func (r *Reconciler) finalize( service *corev1.Service, slicesToCreate, slicesToUpdate, @@ -406,7 +427,7 @@ func (r *reconciler) finalize( // any remaining desired endpoints. // 3. If there still desired endpoints left, try to fit them into a previously // unchanged slice and/or create new ones. -func (r *reconciler) reconcileByPortMapping( +func (r *Reconciler) reconcileByPortMapping( logger klog.Logger, service *corev1.Service, existingSlices []*discovery.EndpointSlice, @@ -432,7 +453,7 @@ func (r *reconciler) reconcileByPortMapping( newEndpoints = append(newEndpoints, *got) // If existing version of endpoint doesn't match desired version // set endpointUpdated to ensure endpoint changes are persisted. - if !endpointutil.EndpointsEqualBeyondHash(got, &endpoint) { + if !endpointsliceutil.EndpointsEqualBeyondHash(got, &endpoint) { endpointUpdated = true } // once an endpoint has been placed/found in a slice, it no @@ -553,6 +574,6 @@ func (r *reconciler) reconcileByPortMapping( return slicesToCreate, slicesToUpdate, slicesToDelete, numAdded, numRemoved } -func (r *reconciler) deleteService(namespace, name string) { +func (r *Reconciler) DeleteService(namespace, name string) { r.metricsCache.DeleteService(types.NamespacedName{Namespace: namespace, Name: name}) } diff --git a/pkg/controller/endpointslice/reconciler_test.go b/staging/src/k8s.io/endpointslice/reconciler_test.go similarity index 96% rename from pkg/controller/endpointslice/reconciler_test.go rename to staging/src/k8s.io/endpointslice/reconciler_test.go index 4374e0f50bb..f2992729068 100644 --- a/pkg/controller/endpointslice/reconciler_test.go +++ b/staging/src/k8s.io/endpointslice/reconciler_test.go @@ -30,6 +30,7 @@ import ( apiequality "k8s.io/apimachinery/pkg/api/equality" "k8s.io/apimachinery/pkg/api/resource" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/runtime/schema" "k8s.io/apimachinery/pkg/util/intstr" "k8s.io/client-go/informers" @@ -38,14 +39,71 @@ import ( k8stesting "k8s.io/client-go/testing" "k8s.io/client-go/tools/record" "k8s.io/component-base/metrics/testutil" + "k8s.io/endpointslice/metrics" + "k8s.io/endpointslice/topologycache" + endpointsliceutil "k8s.io/endpointslice/util" "k8s.io/klog/v2/ktesting" - "k8s.io/kubernetes/pkg/controller" - "k8s.io/kubernetes/pkg/controller/endpointslice/metrics" - "k8s.io/kubernetes/pkg/controller/endpointslice/topologycache" - endpointsliceutil "k8s.io/kubernetes/pkg/controller/util/endpointslice" "k8s.io/utils/pointer" ) +func expectAction(t *testing.T, actions []k8stesting.Action, index int, verb, resource string) { + t.Helper() + if len(actions) <= index { + t.Fatalf("Expected at least %d actions, got %d", index+1, len(actions)) + } + + action := actions[index] + if action.GetVerb() != verb { + t.Errorf("Expected action %d verb to be %s, got %s", index, verb, action.GetVerb()) + } + + if action.GetResource().Resource != resource { + t.Errorf("Expected action %d resource to be %s, got %s", index, resource, action.GetResource().Resource) + } +} + +// cacheMutationCheck helps ensure that cached objects have not been changed +// in any way throughout a test run. +type cacheMutationCheck struct { + objects []cacheObject +} + +// cacheObject stores a reference to an original object as well as a deep copy +// of that object to track any mutations in the original object. +type cacheObject struct { + original runtime.Object + deepCopy runtime.Object +} + +// newCacheMutationCheck initializes a cacheMutationCheck with EndpointSlices. +func newCacheMutationCheck(endpointSlices []*discovery.EndpointSlice) cacheMutationCheck { + cmc := cacheMutationCheck{} + for _, endpointSlice := range endpointSlices { + cmc.Add(endpointSlice) + } + return cmc +} + +// Add appends a runtime.Object and a deep copy of that object into the +// cacheMutationCheck. +func (cmc *cacheMutationCheck) Add(o runtime.Object) { + cmc.objects = append(cmc.objects, cacheObject{ + original: o, + deepCopy: o.DeepCopyObject(), + }) +} + +// Check verifies that no objects in the cacheMutationCheck have been mutated. +func (cmc *cacheMutationCheck) Check(t *testing.T) { + for _, o := range cmc.objects { + if !reflect.DeepEqual(o.original, o.deepCopy) { + // Cached objects can't be safely mutated and instead should be deep + // copied before changed in any way. + t.Errorf("Cached object was unexpectedly mutated. Original: %+v, Mutated: %+v", o.deepCopy, o.original) + } + } +} + var defaultMaxEndpointsPerSlice = int32(100) // Even when there are no pods, we want to have a placeholder slice for each service @@ -652,7 +710,7 @@ func TestReconcile1EndpointSlicePublishNotReadyAddresses(t *testing.T) { endpointSlices := fetchEndpointSlices(t, client, namespace) for _, endpointSlice := range endpointSlices { for i, endpoint := range endpointSlice.Endpoints { - if !endpointsliceutil.EndpointReady(endpoint) { + if !topologycache.EndpointReady(endpoint) { t.Errorf("Expected endpoints[%d] to be ready", i) } } @@ -1623,7 +1681,7 @@ func TestReconcilerPodMissingNode(t *testing.T) { t.Errorf("Expected no error creating endpoint slice") } } - err := r.reconcile(logger, svc, pods, existingSlices, time.Now()) + err := r.Reconcile(logger, svc, pods, existingSlices, time.Now()) if err == nil && tc.expectError { t.Errorf("Expected error but no error received") } @@ -1849,7 +1907,7 @@ func TestReconcileTopology(t *testing.T) { service.Annotations = map[string]string{ corev1.DeprecatedAnnotationTopologyAwareHints: tc.hintsAnnotation, } - r.reconcile(logger, service, tc.pods, tc.existingSlices, time.Now()) + r.Reconcile(logger, service, tc.pods, tc.existingSlices, time.Now()) cmc.Check(t) expectMetrics(t, tc.expectedMetrics) @@ -1912,23 +1970,23 @@ func TestReconcileTopology(t *testing.T) { // Test Helpers -func newReconciler(client *fake.Clientset, nodes []*corev1.Node, maxEndpointsPerSlice int32) *reconciler { +func newReconciler(client *fake.Clientset, nodes []*corev1.Node, maxEndpointsPerSlice int32) *Reconciler { eventRecorder := record.NewFakeRecorder(10) - informerFactory := informers.NewSharedInformerFactory(client, controller.NoResyncPeriodFunc()) + informerFactory := informers.NewSharedInformerFactory(client, 0) nodeInformer := informerFactory.Core().V1().Nodes() indexer := nodeInformer.Informer().GetIndexer() for _, node := range nodes { indexer.Add(node) } - return &reconciler{ - client: client, - nodeLister: corelisters.NewNodeLister(indexer), - maxEndpointsPerSlice: maxEndpointsPerSlice, - endpointSliceTracker: endpointsliceutil.NewEndpointSliceTracker(), - metricsCache: metrics.NewCache(maxEndpointsPerSlice), - eventRecorder: eventRecorder, - } + return NewReconciler( + client, + corelisters.NewNodeLister(indexer), + maxEndpointsPerSlice, + endpointsliceutil.NewEndpointSliceTracker(), + nil, + eventRecorder, + ) } // ensures endpoint slices exist with the desired set of lengths @@ -2037,10 +2095,10 @@ func fetchEndpointSlices(t *testing.T, client *fake.Clientset, namespace string) return fetchedSlices.Items } -func reconcileHelper(t *testing.T, r *reconciler, service *corev1.Service, pods []*corev1.Pod, existingSlices []*discovery.EndpointSlice, triggerTime time.Time) { +func reconcileHelper(t *testing.T, r *Reconciler, service *corev1.Service, pods []*corev1.Pod, existingSlices []*discovery.EndpointSlice, triggerTime time.Time) { logger, _ := ktesting.NewTestContext(t) t.Helper() - err := r.reconcile(logger, service, pods, existingSlices, triggerTime) + err := r.Reconcile(logger, service, pods, existingSlices, triggerTime) if err != nil { t.Fatalf("Expected no error reconciling Endpoint Slices, got: %v", err) } diff --git a/pkg/controller/endpointslice/topologycache/event.go b/staging/src/k8s.io/endpointslice/topologycache/event.go similarity index 100% rename from pkg/controller/endpointslice/topologycache/event.go rename to staging/src/k8s.io/endpointslice/topologycache/event.go diff --git a/pkg/controller/endpointslice/topologycache/sliceinfo.go b/staging/src/k8s.io/endpointslice/topologycache/sliceinfo.go similarity index 100% rename from pkg/controller/endpointslice/topologycache/sliceinfo.go rename to staging/src/k8s.io/endpointslice/topologycache/sliceinfo.go diff --git a/pkg/controller/endpointslice/topologycache/sliceinfo_test.go b/staging/src/k8s.io/endpointslice/topologycache/sliceinfo_test.go similarity index 100% rename from pkg/controller/endpointslice/topologycache/sliceinfo_test.go rename to staging/src/k8s.io/endpointslice/topologycache/sliceinfo_test.go diff --git a/pkg/controller/endpointslice/topologycache/topologycache.go b/staging/src/k8s.io/endpointslice/topologycache/topologycache.go similarity index 98% rename from pkg/controller/endpointslice/topologycache/topologycache.go rename to staging/src/k8s.io/endpointslice/topologycache/topologycache.go index 3256c8b7e4d..12a4678383b 100644 --- a/pkg/controller/endpointslice/topologycache/topologycache.go +++ b/staging/src/k8s.io/endpointslice/topologycache/topologycache.go @@ -26,7 +26,6 @@ import ( "k8s.io/apimachinery/pkg/api/resource" "k8s.io/apimachinery/pkg/util/sets" "k8s.io/klog/v2" - endpointsliceutil "k8s.io/kubernetes/pkg/controller/util/endpointslice" ) const ( @@ -111,7 +110,7 @@ func (t *TopologyCache) AddHints(logger klog.Logger, si *SliceInfo) ([]*discover // step 1: assign same-zone hints for all endpoints as a starting point. for _, slice := range allocatableSlices { for i, endpoint := range slice.Endpoints { - if !endpointsliceutil.EndpointReady(endpoint) { + if !EndpointReady(endpoint) { endpoint.Hints = nil continue } diff --git a/pkg/controller/endpointslice/topologycache/topologycache_test.go b/staging/src/k8s.io/endpointslice/topologycache/topologycache_test.go similarity index 100% rename from pkg/controller/endpointslice/topologycache/topologycache_test.go rename to staging/src/k8s.io/endpointslice/topologycache/topologycache_test.go diff --git a/pkg/controller/endpointslice/topologycache/utils.go b/staging/src/k8s.io/endpointslice/topologycache/utils.go similarity index 96% rename from pkg/controller/endpointslice/topologycache/utils.go rename to staging/src/k8s.io/endpointslice/topologycache/utils.go index 45600609c74..a716e4ac670 100644 --- a/pkg/controller/endpointslice/topologycache/utils.go +++ b/staging/src/k8s.io/endpointslice/topologycache/utils.go @@ -23,7 +23,6 @@ import ( v1 "k8s.io/api/core/v1" discovery "k8s.io/api/discovery/v1" "k8s.io/klog/v2" - endpointsliceutil "k8s.io/kubernetes/pkg/controller/util/endpointslice" ) // RemoveHintsFromSlices removes topology hints on EndpointSlices and returns @@ -82,7 +81,7 @@ func redistributeHints(logger klog.Logger, slices []*discovery.EndpointSlice, gi for _, slice := range slices { for i, endpoint := range slice.Endpoints { - if !endpointsliceutil.EndpointReady(endpoint) { + if !EndpointReady(endpoint) { endpoint.Hints = nil continue } @@ -196,7 +195,7 @@ func getMost(zones map[string]float64) (string, float64) { func getHintsByZone(slice *discovery.EndpointSlice, allocatedHintsByZone EndpointZoneInfo, allocations map[string]allocation) map[string]int { hintsByZone := map[string]int{} for _, endpoint := range slice.Endpoints { - if !endpointsliceutil.EndpointReady(endpoint) { + if !EndpointReady(endpoint) { continue } if endpoint.Hints == nil || len(endpoint.Hints.ForZones) == 0 { @@ -273,3 +272,9 @@ func numReadyEndpoints(endpoints []discovery.Endpoint) int { } return total } + +// EndpointReady returns true if an Endpoint has the Ready condition set to +// true. +func EndpointReady(endpoint discovery.Endpoint) bool { + return endpoint.Conditions.Ready != nil && *endpoint.Conditions.Ready +} diff --git a/pkg/controller/endpointslice/topologycache/utils_test.go b/staging/src/k8s.io/endpointslice/topologycache/utils_test.go similarity index 100% rename from pkg/controller/endpointslice/topologycache/utils_test.go rename to staging/src/k8s.io/endpointslice/topologycache/utils_test.go diff --git a/pkg/controller/util/endpoint/controller_utils.go b/staging/src/k8s.io/endpointslice/util/controller_utils.go similarity index 79% rename from pkg/controller/util/endpoint/controller_utils.go rename to staging/src/k8s.io/endpointslice/util/controller_utils.go index 016e493824d..8b2bfb0bf38 100644 --- a/pkg/controller/util/endpoint/controller_utils.go +++ b/staging/src/k8s.io/endpointslice/util/controller_utils.go @@ -14,15 +14,17 @@ See the License for the specific language governing permissions and limitations under the License. */ -package endpoint +package util import ( "crypto/md5" "encoding/hex" "fmt" + "hash" "reflect" "sort" + "github.com/davecgh/go-spew/spew" v1 "k8s.io/api/core/v1" discovery "k8s.io/api/discovery/v1" "k8s.io/apimachinery/pkg/conversion" @@ -31,9 +33,6 @@ import ( "k8s.io/apimachinery/pkg/util/sets" v1listers "k8s.io/client-go/listers/core/v1" "k8s.io/client-go/tools/cache" - podutil "k8s.io/kubernetes/pkg/api/v1/pod" - "k8s.io/kubernetes/pkg/controller" - "k8s.io/kubernetes/pkg/util/hash" ) // semanticIgnoreResourceVersion does semantic deep equality checks for objects @@ -62,7 +61,7 @@ func GetPodServiceMemberships(serviceLister v1listers.ServiceLister, pod *v1.Pod // if the service has a nil selector this means selectors match nothing, not everything. continue } - key, err := controller.KeyFunc(service) + key, err := cache.DeletionHandlingMetaNamespaceKeyFunc(service) if err != nil { return nil, err } @@ -79,13 +78,13 @@ type PortMapKey string // NewPortMapKey generates a PortMapKey from endpoint ports. func NewPortMapKey(endpointPorts []discovery.EndpointPort) PortMapKey { sort.Sort(portsInOrder(endpointPorts)) - return PortMapKey(DeepHashObjectToString(endpointPorts)) + return PortMapKey(deepHashObjectToString(endpointPorts)) } -// DeepHashObjectToString creates a unique hash string from a go object. -func DeepHashObjectToString(objectToWrite interface{}) string { +// deepHashObjectToString creates a unique hash string from a go object. +func deepHashObjectToString(objectToWrite interface{}) string { hasher := md5.New() - hash.DeepHashObject(hasher, objectToWrite) + deepHashObject(hasher, objectToWrite) return hex.EncodeToString(hasher.Sum(nil)[0:]) } @@ -96,7 +95,7 @@ func ShouldPodBeInEndpoints(pod *v1.Pod, includeTerminating bool) bool { // "Terminal" describes when a Pod is complete (in a succeeded or failed phase). // This is distinct from the "Terminating" condition which represents when a Pod // is being terminated (metadata.deletionTimestamp is non nil). - if podutil.IsPodTerminal(pod) { + if isPodTerminal(pod) { return false } @@ -137,7 +136,7 @@ func podEndpointsChanged(oldPod, newPod *v1.Pod) (bool, bool) { // will move from the unready endpoints set to the ready endpoints. // So for the purposes of an endpoint, a readiness change on a pod // means we have a changed pod. - if podutil.IsPodReady(oldPod) != podutil.IsPodReady(newPod) { + if IsPodReady(oldPod) != IsPodReady(newPod) { return true, labelsChanged } @@ -238,8 +237,8 @@ type portsInOrder []discovery.EndpointPort func (sl portsInOrder) Len() int { return len(sl) } func (sl portsInOrder) Swap(i, j int) { sl[i], sl[j] = sl[j], sl[i] } func (sl portsInOrder) Less(i, j int) bool { - h1 := DeepHashObjectToString(sl[i]) - h2 := DeepHashObjectToString(sl[j]) + h1 := deepHashObjectToString(sl[i]) + h2 := deepHashObjectToString(sl[j]) return h1 < h2 } @@ -297,8 +296,58 @@ func stringPtrChanged(ptr1, ptr2 *string) bool { return false } -// EndpointSubsetsEqualIgnoreResourceVersion returns true if EndpointSubsets -// have equal attributes but excludes ResourceVersion of Pod. -func EndpointSubsetsEqualIgnoreResourceVersion(subsets1, subsets2 []v1.EndpointSubset) bool { - return semanticIgnoreResourceVersion.DeepEqual(subsets1, subsets2) +// DeepHashObject writes specified object to hash using the spew library +// which follows pointers and prints actual values of the nested objects +// ensuring the hash does not change when a pointer changes. +// copied from k8s.io/kubernetes/pkg/util/hash +func deepHashObject(hasher hash.Hash, objectToWrite interface{}) { + hasher.Reset() + printer := spew.ConfigState{ + Indent: " ", + SortKeys: true, + DisableMethods: true, + SpewKeys: true, + } + printer.Fprintf(hasher, "%#v", objectToWrite) +} + +// IsPodReady returns true if Pods Ready condition is true +// copied from k8s.io/kubernetes/pkg/api/v1/pod +func IsPodReady(pod *v1.Pod) bool { + return isPodReadyConditionTrue(pod.Status) +} + +// IsPodTerminal returns true if a pod is terminal, all containers are stopped and cannot ever regress. +// copied from k8s.io/kubernetes/pkg/api/v1/pod +func isPodTerminal(pod *v1.Pod) bool { + return isPodPhaseTerminal(pod.Status.Phase) +} + +// IsPodPhaseTerminal returns true if the pod's phase is terminal. +// copied from k8s.io/kubernetes/pkg/api/v1/pod +func isPodPhaseTerminal(phase v1.PodPhase) bool { + return phase == v1.PodFailed || phase == v1.PodSucceeded +} + +// IsPodReadyConditionTrue returns true if a pod is ready; false otherwise. +// copied from k8s.io/kubernetes/pkg/api/v1/pod +func isPodReadyConditionTrue(status v1.PodStatus) bool { + condition := getPodReadyCondition(&status) + return condition != nil && condition.Status == v1.ConditionTrue +} + +// getPodReadyCondition extracts the pod ready condition from the given status and returns that. +// Returns nil if the condition is not present. +// copied from k8s.io/kubernetes/pkg/api/v1/pod +func getPodReadyCondition(status *v1.PodStatus) *v1.PodCondition { + if status == nil || status.Conditions == nil { + return nil + } + + for i := range status.Conditions { + if status.Conditions[i].Type == v1.PodReady { + return &status.Conditions[i] + } + } + return nil } diff --git a/pkg/controller/util/endpoint/controller_utils_test.go b/staging/src/k8s.io/endpointslice/util/controller_utils_test.go similarity index 85% rename from pkg/controller/util/endpoint/controller_utils_test.go rename to staging/src/k8s.io/endpointslice/util/controller_utils_test.go index 43f27f1d3ae..8ce4022aa43 100644 --- a/pkg/controller/util/endpoint/controller_utils_test.go +++ b/staging/src/k8s.io/endpointslice/util/controller_utils_test.go @@ -14,16 +14,18 @@ See the License for the specific language governing permissions and limitations under the License. */ -package endpoint +package util import ( "fmt" + "hash/adler32" "testing" "time" v1 "k8s.io/api/core/v1" discovery "k8s.io/api/discovery/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/util/dump" "k8s.io/apimachinery/pkg/util/sets" "k8s.io/client-go/informers" "k8s.io/client-go/kubernetes/fake" @@ -825,100 +827,124 @@ func TestEndpointsEqualBeyondHash(t *testing.T) { } } -func TestEndpointSubsetsEqualIgnoreResourceVersion(t *testing.T) { - copyAndMutateEndpointSubset := func(orig *v1.EndpointSubset, mutator func(*v1.EndpointSubset)) *v1.EndpointSubset { - newSubSet := orig.DeepCopy() - mutator(newSubSet) - return newSubSet +type A struct { + x int + y string +} + +type B struct { + x []int + y map[string]bool +} + +type C struct { + x int + y string +} + +func (c C) String() string { + return fmt.Sprintf("%d:%s", c.x, c.y) +} + +func TestDeepHashObject(t *testing.T) { + successCases := []func() interface{}{ + func() interface{} { return 8675309 }, + func() interface{} { return "Jenny, I got your number" }, + func() interface{} { return []string{"eight", "six", "seven"} }, + func() interface{} { return [...]int{5, 3, 0, 9} }, + func() interface{} { return map[int]string{8: "8", 6: "6", 7: "7"} }, + func() interface{} { return map[string]int{"5": 5, "3": 3, "0": 0, "9": 9} }, + func() interface{} { return A{867, "5309"} }, + func() interface{} { return &A{867, "5309"} }, + func() interface{} { + return B{[]int{8, 6, 7}, map[string]bool{"5": true, "3": true, "0": true, "9": true}} + }, + func() interface{} { return map[A]bool{{8675309, "Jenny"}: true, {9765683, "!Jenny"}: false} }, + func() interface{} { return map[C]bool{{8675309, "Jenny"}: true, {9765683, "!Jenny"}: false} }, + func() interface{} { return map[*A]bool{{8675309, "Jenny"}: true, {9765683, "!Jenny"}: false} }, + func() interface{} { return map[*C]bool{{8675309, "Jenny"}: true, {9765683, "!Jenny"}: false} }, } - es1 := &v1.EndpointSubset{ - Addresses: []v1.EndpointAddress{ - { - IP: "1.1.1.1", - TargetRef: &v1.ObjectReference{Kind: "Pod", Name: "pod1-1", Namespace: "ns", ResourceVersion: "1"}, - }, - }, - NotReadyAddresses: []v1.EndpointAddress{ - { - IP: "1.1.1.2", - TargetRef: &v1.ObjectReference{Kind: "Pod", Name: "pod1-2", Namespace: "ns2", ResourceVersion: "2"}, - }, - }, - Ports: []v1.EndpointPort{{Port: 8081, Protocol: "TCP"}}, - } - es2 := &v1.EndpointSubset{ - Addresses: []v1.EndpointAddress{ - { - IP: "2.2.2.1", - TargetRef: &v1.ObjectReference{Kind: "Pod", Name: "pod2-1", Namespace: "ns", ResourceVersion: "3"}, - }, - }, - NotReadyAddresses: []v1.EndpointAddress{ - { - IP: "2.2.2.2", - TargetRef: &v1.ObjectReference{Kind: "Pod", Name: "pod2-2", Namespace: "ns2", ResourceVersion: "4"}, - }, - }, - Ports: []v1.EndpointPort{{Port: 8082, Protocol: "TCP"}}, - } - tests := []struct { - name string - subsets1 []v1.EndpointSubset - subsets2 []v1.EndpointSubset - expected bool - }{ - { - name: "Subsets removed", - subsets1: []v1.EndpointSubset{*es1, *es2}, - subsets2: []v1.EndpointSubset{*es1}, - expected: false, - }, - { - name: "Ready Pod IP changed", - subsets1: []v1.EndpointSubset{*es1, *es2}, - subsets2: []v1.EndpointSubset{*copyAndMutateEndpointSubset(es1, func(es *v1.EndpointSubset) { - es.Addresses[0].IP = "1.1.1.10" - }), *es2}, - expected: false, - }, - { - name: "NotReady Pod IP changed", - subsets1: []v1.EndpointSubset{*es1, *es2}, - subsets2: []v1.EndpointSubset{*es1, *copyAndMutateEndpointSubset(es2, func(es *v1.EndpointSubset) { - es.NotReadyAddresses[0].IP = "2.2.2.10" - })}, - expected: false, - }, - { - name: "Pod ResourceVersion changed", - subsets1: []v1.EndpointSubset{*es1, *es2}, - subsets2: []v1.EndpointSubset{*es1, *copyAndMutateEndpointSubset(es2, func(es *v1.EndpointSubset) { - es.Addresses[0].TargetRef.ResourceVersion = "100" - })}, - expected: true, - }, - { - name: "Pod ResourceVersion removed", - subsets1: []v1.EndpointSubset{*es1, *es2}, - subsets2: []v1.EndpointSubset{*es1, *copyAndMutateEndpointSubset(es2, func(es *v1.EndpointSubset) { - es.Addresses[0].TargetRef.ResourceVersion = "" - })}, - expected: true, - }, - { - name: "Ports changed", - subsets1: []v1.EndpointSubset{*es1, *es2}, - subsets2: []v1.EndpointSubset{*es1, *copyAndMutateEndpointSubset(es1, func(es *v1.EndpointSubset) { - es.Ports[0].Port = 8082 - })}, - expected: false, - }, - } - for _, tt := range tests { - t.Run(tt.name, func(t *testing.T) { - if got := EndpointSubsetsEqualIgnoreResourceVersion(tt.subsets1, tt.subsets2); got != tt.expected { - t.Errorf("semanticIgnoreResourceVersion.DeepEqual() = %v, expected %v", got, tt.expected) + + for _, tc := range successCases { + hasher1 := adler32.New() + deepHashObject(hasher1, tc()) + hash1 := hasher1.Sum32() + deepHashObject(hasher1, tc()) + hash2 := hasher1.Sum32() + if hash1 != hash2 { + t.Fatalf("hash of the same object (%q) produced different results: %d vs %d", toString(tc()), hash1, hash2) + } + for i := 0; i < 100; i++ { + hasher2 := adler32.New() + + deepHashObject(hasher1, tc()) + hash1a := hasher1.Sum32() + deepHashObject(hasher2, tc()) + hash2a := hasher2.Sum32() + + if hash1a != hash1 { + t.Errorf("repeated hash of the same object (%q) produced different results: %d vs %d", toString(tc()), hash1, hash1a) } - }) + if hash2a != hash2 { + t.Errorf("repeated hash of the same object (%q) produced different results: %d vs %d", toString(tc()), hash2, hash2a) + } + if hash1a != hash2a { + t.Errorf("hash of the same object produced (%q) different results: %d vs %d", toString(tc()), hash1a, hash2a) + } + } + } +} + +func toString(obj interface{}) string { + return dump.Pretty(obj) +} + +type wheel struct { + radius uint32 +} + +type unicycle struct { + primaryWheel *wheel + licencePlateID string + tags map[string]string +} + +func TestDeepObjectPointer(t *testing.T) { + // Arrange + wheel1 := wheel{radius: 17} + wheel2 := wheel{radius: 22} + wheel3 := wheel{radius: 17} + + myUni1 := unicycle{licencePlateID: "blah", primaryWheel: &wheel1, tags: map[string]string{"color": "blue", "name": "john"}} + myUni2 := unicycle{licencePlateID: "blah", primaryWheel: &wheel2, tags: map[string]string{"color": "blue", "name": "john"}} + myUni3 := unicycle{licencePlateID: "blah", primaryWheel: &wheel3, tags: map[string]string{"color": "blue", "name": "john"}} + + // Run it more than once to verify determinism of hasher. + for i := 0; i < 100; i++ { + hasher1 := adler32.New() + hasher2 := adler32.New() + hasher3 := adler32.New() + // Act + deepHashObject(hasher1, myUni1) + hash1 := hasher1.Sum32() + deepHashObject(hasher1, myUni1) + hash1a := hasher1.Sum32() + deepHashObject(hasher2, myUni2) + hash2 := hasher2.Sum32() + deepHashObject(hasher3, myUni3) + hash3 := hasher3.Sum32() + + // Assert + if hash1 != hash1a { + t.Errorf("repeated hash of the same object produced different results: %d vs %d", hash1, hash1a) + } + + if hash1 == hash2 { + t.Errorf("hash1 (%d) and hash2(%d) must be different because they have different values for wheel size", hash1, hash2) + } + + if hash1 != hash3 { + t.Errorf("hash1 (%d) and hash3(%d) must be the same because although they point to different objects, they have the same values for wheel size", hash1, hash3) + } } } diff --git a/pkg/controller/util/endpointslice/endpointset.go b/staging/src/k8s.io/endpointslice/util/endpointset.go similarity index 94% rename from pkg/controller/util/endpointslice/endpointset.go rename to staging/src/k8s.io/endpointslice/util/endpointset.go index 82a0744847a..44cb47d976f 100644 --- a/pkg/controller/util/endpointslice/endpointset.go +++ b/staging/src/k8s.io/endpointslice/util/endpointset.go @@ -14,13 +14,12 @@ See the License for the specific language governing permissions and limitations under the License. */ -package endpointslice +package util import ( "sort" discovery "k8s.io/api/discovery/v1" - endpointutil "k8s.io/kubernetes/pkg/controller/util/endpoint" ) // endpointHash is used to uniquely identify endpoints. Only including addresses @@ -45,7 +44,7 @@ func hashEndpoint(endpoint *discovery.Endpoint) endpointHash { hashObj.Name = endpoint.TargetRef.Name } - return endpointHash(endpointutil.DeepHashObjectToString(hashObj)) + return endpointHash(deepHashObjectToString(hashObj)) } // EndpointSet provides simple methods for comparing sets of Endpoints. diff --git a/pkg/controller/util/endpointslice/endpointslice_tracker.go b/staging/src/k8s.io/endpointslice/util/endpointslice_tracker.go similarity index 99% rename from pkg/controller/util/endpointslice/endpointslice_tracker.go rename to staging/src/k8s.io/endpointslice/util/endpointslice_tracker.go index 06ae0fe932e..846d25ac274 100644 --- a/pkg/controller/util/endpointslice/endpointslice_tracker.go +++ b/staging/src/k8s.io/endpointslice/util/endpointslice_tracker.go @@ -14,7 +14,7 @@ See the License for the specific language governing permissions and limitations under the License. */ -package endpointslice +package util import ( "sync" diff --git a/pkg/controller/util/endpointslice/endpointslice_tracker_test.go b/staging/src/k8s.io/endpointslice/util/endpointslice_tracker_test.go similarity index 99% rename from pkg/controller/util/endpointslice/endpointslice_tracker_test.go rename to staging/src/k8s.io/endpointslice/util/endpointslice_tracker_test.go index efd9ba07499..387c5094696 100644 --- a/pkg/controller/util/endpointslice/endpointslice_tracker_test.go +++ b/staging/src/k8s.io/endpointslice/util/endpointslice_tracker_test.go @@ -14,7 +14,7 @@ See the License for the specific language governing permissions and limitations under the License. */ -package endpointslice +package util import ( "testing" diff --git a/pkg/controller/util/endpoint/trigger_time_tracker.go b/staging/src/k8s.io/endpointslice/util/trigger_time_tracker.go similarity index 97% rename from pkg/controller/util/endpoint/trigger_time_tracker.go rename to staging/src/k8s.io/endpointslice/util/trigger_time_tracker.go index fb75a4fa935..aac885fc65b 100644 --- a/pkg/controller/util/endpoint/trigger_time_tracker.go +++ b/staging/src/k8s.io/endpointslice/util/trigger_time_tracker.go @@ -14,14 +14,13 @@ See the License for the specific language governing permissions and limitations under the License. */ -package endpoint +package util import ( "sync" "time" v1 "k8s.io/api/core/v1" - podutil "k8s.io/kubernetes/pkg/api/v1/pod" ) // TriggerTimeTracker is used to compute an EndpointsLastChangeTriggerTime @@ -140,7 +139,7 @@ func (t *TriggerTimeTracker) DeleteService(namespace, name string) { // getPodTriggerTime returns the time of the pod change (trigger) that resulted // or will result in the endpoint object change. func getPodTriggerTime(pod *v1.Pod) (triggerTime time.Time) { - if readyCondition := podutil.GetPodReadyCondition(pod.Status); readyCondition != nil { + if readyCondition := getPodReadyCondition(&pod.Status); readyCondition != nil { triggerTime = readyCondition.LastTransitionTime.Time } return triggerTime diff --git a/pkg/controller/util/endpoint/trigger_time_tracker_test.go b/staging/src/k8s.io/endpointslice/util/trigger_time_tracker_test.go similarity index 99% rename from pkg/controller/util/endpoint/trigger_time_tracker_test.go rename to staging/src/k8s.io/endpointslice/util/trigger_time_tracker_test.go index c468147db3a..22ccbf88f6a 100644 --- a/pkg/controller/util/endpoint/trigger_time_tracker_test.go +++ b/staging/src/k8s.io/endpointslice/util/trigger_time_tracker_test.go @@ -14,7 +14,7 @@ See the License for the specific language governing permissions and limitations under the License. */ -package endpoint +package util import ( "runtime" diff --git a/pkg/controller/endpointslice/utils.go b/staging/src/k8s.io/endpointslice/utils.go similarity index 86% rename from pkg/controller/endpointslice/utils.go rename to staging/src/k8s.io/endpointslice/utils.go index 0d0dec86ee4..c003774d76f 100644 --- a/pkg/controller/endpointslice/utils.go +++ b/staging/src/k8s.io/endpointslice/utils.go @@ -23,23 +23,19 @@ import ( v1 "k8s.io/api/core/v1" discovery "k8s.io/api/discovery/v1" apiequality "k8s.io/apimachinery/pkg/api/equality" + apimachineryvalidation "k8s.io/apimachinery/pkg/api/validation" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime/schema" - utilruntime "k8s.io/apimachinery/pkg/util/runtime" + "k8s.io/apimachinery/pkg/util/intstr" "k8s.io/apimachinery/pkg/util/sets" - "k8s.io/client-go/tools/cache" + endpointutil "k8s.io/endpointslice/util" "k8s.io/klog/v2" - podutil "k8s.io/kubernetes/pkg/api/v1/pod" - api "k8s.io/kubernetes/pkg/apis/core" - "k8s.io/kubernetes/pkg/apis/core/v1/helper" - "k8s.io/kubernetes/pkg/apis/discovery/validation" - endpointutil "k8s.io/kubernetes/pkg/controller/util/endpoint" utilnet "k8s.io/utils/net" ) // podToEndpoint returns an Endpoint object generated from a Pod, a Node, and a Service for a particular addressType. func podToEndpoint(pod *v1.Pod, node *v1.Node, service *v1.Service, addressType discovery.AddressType) discovery.Endpoint { - serving := podutil.IsPodReady(pod) + serving := endpointutil.IsPodReady(pod) terminating := pod.DeletionTimestamp != nil // For compatibility reasons, "ready" should never be "true" if a pod is terminatng, unless // publishNotReadyAddresses was set. @@ -81,7 +77,7 @@ func getEndpointPorts(logger klog.Logger, service *v1.Service, pod *v1.Pod) []di endpointPorts := []discovery.EndpointPort{} // Allow headless service not to have ports. - if len(service.Spec.Ports) == 0 && service.Spec.ClusterIP == api.ClusterIPNone { + if len(service.Spec.Ports) == 0 && service.Spec.ClusterIP == v1.ClusterIPNone { return endpointPorts } @@ -90,7 +86,7 @@ func getEndpointPorts(logger klog.Logger, service *v1.Service, pod *v1.Pod) []di portName := servicePort.Name portProto := servicePort.Protocol - portNum, err := podutil.FindPort(pod, servicePort) + portNum, err := findPort(pod, servicePort) if err != nil { logger.V(4).Info("Failed to find port for service", "service", klog.KObj(service), "err", err) continue @@ -152,7 +148,7 @@ func newEndpointSlice(logger klog.Logger, service *v1.Service, endpointMeta *end func getEndpointSlicePrefix(serviceName string) string { // use the dash (if the name isn't too long) to make the pod name a bit prettier prefix := fmt.Sprintf("%s-", serviceName) - if len(validation.ValidateEndpointSliceName(prefix, true)) != 0 { + if len(apimachineryvalidation.NameIsDNSSubdomain(prefix, true)) != 0 { prefix = serviceName } return prefix @@ -188,27 +184,6 @@ func getSliceToFill(endpointSlices []*discovery.EndpointSlice, numEndpoints, max return closestSlice } -// getEndpointSliceFromDeleteAction parses an EndpointSlice from a delete action. -func getEndpointSliceFromDeleteAction(obj interface{}) *discovery.EndpointSlice { - if endpointSlice, ok := obj.(*discovery.EndpointSlice); ok { - // Enqueue all the services that the pod used to be a member of. - // This is the same thing we do when we add a pod. - return endpointSlice - } - // If we reached here it means the pod was deleted but its final state is unrecorded. - tombstone, ok := obj.(cache.DeletedFinalStateUnknown) - if !ok { - utilruntime.HandleError(fmt.Errorf("Couldn't get object from tombstone %#v", obj)) - return nil - } - endpointSlice, ok := tombstone.Obj.(*discovery.EndpointSlice) - if !ok { - utilruntime.HandleError(fmt.Errorf("Tombstone contained object that is not a EndpointSlice: %#v", obj)) - return nil - } - return endpointSlice -} - // addTriggerTimeAnnotation adds a triggerTime annotation to an EndpointSlice func addTriggerTimeAnnotation(endpointSlice *discovery.EndpointSlice, triggerTime time.Time) { if endpointSlice.Annotations == nil { @@ -222,11 +197,11 @@ func addTriggerTimeAnnotation(endpointSlice *discovery.EndpointSlice, triggerTim } } -// serviceControllerKey returns a controller key for a Service but derived from +// ServiceControllerKey returns a controller key for a Service but derived from // an EndpointSlice. -func serviceControllerKey(endpointSlice *discovery.EndpointSlice) (string, error) { +func ServiceControllerKey(endpointSlice *discovery.EndpointSlice) (string, error) { if endpointSlice == nil { - return "", fmt.Errorf("nil EndpointSlice passed to serviceControllerKey()") + return "", fmt.Errorf("nil EndpointSlice passed to ServiceControllerKey()") } serviceName, ok := endpointSlice.Labels[discovery.LabelServiceName] if !ok || serviceName == "" { @@ -268,7 +243,7 @@ func setEndpointSliceLabels(logger klog.Logger, epSlice *discovery.EndpointSlice } // add or remove headless label depending on the service Type - if !helper.IsServiceIPSet(service) { + if !isServiceIPSet(service) { svcLabels[v1.IsHeadlessService] = "" } else { delete(svcLabels, v1.IsHeadlessService) @@ -393,25 +368,46 @@ func hintsEnabled(annotations map[string]string) bool { return val == "Auto" || val == "auto" } -// managedByChanged returns true if one of the provided EndpointSlices is +// ManagedByChanged returns true if one of the provided EndpointSlices is // managed by the EndpointSlice controller while the other is not. -func managedByChanged(endpointSlice1, endpointSlice2 *discovery.EndpointSlice) bool { - return managedByController(endpointSlice1) != managedByController(endpointSlice2) +func ManagedByChanged(endpointSlice1, endpointSlice2 *discovery.EndpointSlice) bool { + return ManagedByController(endpointSlice1) != ManagedByController(endpointSlice2) } -// managedByController returns true if the controller of the provided +// ManagedByController returns true if the controller of the provided // EndpointSlices is the EndpointSlice controller. -func managedByController(endpointSlice *discovery.EndpointSlice) bool { +func ManagedByController(endpointSlice *discovery.EndpointSlice) bool { managedBy := endpointSlice.Labels[discovery.LabelManagedBy] return managedBy == controllerName } -// isNodeReady returns true if a node is ready; false otherwise. -func isNodeReady(node *v1.Node) bool { - for _, c := range node.Status.Conditions { - if c.Type == v1.NodeReady { - return c.Status == v1.ConditionTrue - } - } - return false +// isServiceIPSet aims to check if the service's ClusterIP is set or not +// the objective is not to perform validation here +// copied from k8s.io/kubernetes/pkg/apis/core/v1/helper +func isServiceIPSet(service *v1.Service) bool { + return service.Spec.ClusterIP != v1.ClusterIPNone && service.Spec.ClusterIP != "" +} + +// findPort locates the container port for the given pod and portName. If the +// targetPort is a number, use that. If the targetPort is a string, look that +// string up in all named ports in all containers in the target pod. If no +// match is found, fail. +// copied from k8s.io/kubernetes/pkg/api/v1/pod +func findPort(pod *v1.Pod, svcPort *v1.ServicePort) (int, error) { + portName := svcPort.TargetPort + switch portName.Type { + case intstr.String: + name := portName.StrVal + for _, container := range pod.Spec.Containers { + for _, port := range container.Ports { + if port.Name == name && port.Protocol == svcPort.Protocol { + return int(port.ContainerPort), nil + } + } + } + case intstr.Int: + return portName.IntValue(), nil + } + + return 0, fmt.Errorf("no suitable port for manifest: %s", pod.UID) } diff --git a/pkg/controller/endpointslice/utils_test.go b/staging/src/k8s.io/endpointslice/utils_test.go similarity index 99% rename from pkg/controller/endpointslice/utils_test.go rename to staging/src/k8s.io/endpointslice/utils_test.go index 5669e0da779..04caafdd0b8 100644 --- a/pkg/controller/endpointslice/utils_test.go +++ b/staging/src/k8s.io/endpointslice/utils_test.go @@ -479,7 +479,7 @@ func TestServiceControllerKey(t *testing.T) { "nil EndpointSlice": { endpointSlice: nil, expectedKey: "", - expectedErr: fmt.Errorf("nil EndpointSlice passed to serviceControllerKey()"), + expectedErr: fmt.Errorf("nil EndpointSlice passed to ServiceControllerKey()"), }, "empty EndpointSlice": { endpointSlice: &discovery.EndpointSlice{}, @@ -502,7 +502,7 @@ func TestServiceControllerKey(t *testing.T) { for name, tc := range testCases { t.Run(name, func(t *testing.T) { - actualKey, actualErr := serviceControllerKey(tc.endpointSlice) + actualKey, actualErr := ServiceControllerKey(tc.endpointSlice) if !reflect.DeepEqual(actualErr, tc.expectedErr) { t.Errorf("Expected %s, got %s", tc.expectedErr, actualErr) } diff --git a/vendor/modules.txt b/vendor/modules.txt index 7064533c5c8..1b3d1c8b68f 100644 --- a/vendor/modules.txt +++ b/vendor/modules.txt @@ -2061,6 +2061,12 @@ k8s.io/dynamic-resource-allocation/controller k8s.io/dynamic-resource-allocation/kubeletplugin k8s.io/dynamic-resource-allocation/leaderelection k8s.io/dynamic-resource-allocation/resourceclaim +# k8s.io/endpointslice v0.0.0 => ./staging/src/k8s.io/endpointslice +## explicit; go 1.20 +k8s.io/endpointslice +k8s.io/endpointslice/metrics +k8s.io/endpointslice/topologycache +k8s.io/endpointslice/util # k8s.io/gengo v0.0.0-20220902162205-c0856e24416d ## explicit; go 1.13 k8s.io/gengo/args