From 8f9483d827f0e7f2b605a0bce95e11702f1ff9c7 Mon Sep 17 00:00:00 2001 From: Rob Scott Date: Fri, 30 Aug 2019 00:12:15 -0700 Subject: [PATCH] Fixing bugs related to Endpoint Slices This should fix a bug that could break masters when the EndpointSlice feature gate was enabled. This was all tied to how the apiserver creates and manages it's own services and endpoints (or in this case endpoint slices). Consumers of endpoint slices also need to know about the corresponding service. Previously we were trying to set an owner reference here for this purpose, but that came with potential downsides and increased complexity. This commit changes behavior of the apiserver endpointslice integration to set the service name label instead of owner references, and simplifies consumer logic to reference that (both are set by the EndpointSlice controller). Additionally, this should fix a bug with the EndpointSlice GenerateName value that had previously been set with a "." as a suffix. --- cmd/kube-controller-manager/app/BUILD | 1 + cmd/kube-controller-manager/app/discovery.go | 6 ++-- pkg/apis/discovery/validation/validation.go | 8 +++-- pkg/controller/.import-restrictions | 2 ++ pkg/controller/endpointslice/BUILD | 1 + .../endpointslice/endpointslice_controller.go | 7 ++-- .../endpointslice_controller_test.go | 8 ++--- .../endpointslice/reconciler_test.go | 25 +++++++-------- pkg/controller/endpointslice/utils.go | 15 +++++++-- pkg/controller/endpointslice/utils_test.go | 13 ++++---- pkg/master/reconcilers/endpointsadapter.go | 9 +----- .../reconcilers/endpointsadapter_test.go | 3 +- pkg/proxy/endpoints.go | 6 +++- pkg/proxy/endpointslicecache.go | 32 +++++++++++-------- pkg/proxy/endpointslicecache_test.go | 6 ++-- pkg/proxy/iptables/proxier.go | 12 ++++--- pkg/proxy/iptables/proxier_test.go | 7 ++-- pkg/proxy/ipvs/BUILD | 1 + pkg/proxy/ipvs/proxier.go | 12 ++++--- pkg/proxy/ipvs/proxier_test.go | 13 +++++--- .../src/k8s.io/api/discovery/v1alpha1/BUILD | 1 + .../discovery/v1alpha1/well_known_labels.go | 22 +++++++++++++ 22 files changed, 132 insertions(+), 78 deletions(-) create mode 100644 staging/src/k8s.io/api/discovery/v1alpha1/well_known_labels.go diff --git a/cmd/kube-controller-manager/app/BUILD b/cmd/kube-controller-manager/app/BUILD index 0070c5c4603..e1ac4f3b5a7 100644 --- a/cmd/kube-controller-manager/app/BUILD +++ b/cmd/kube-controller-manager/app/BUILD @@ -112,6 +112,7 @@ go_library( "//pkg/volume/util:go_default_library", "//pkg/volume/vsphere_volume:go_default_library", "//staging/src/k8s.io/api/core/v1:go_default_library", + "//staging/src/k8s.io/api/discovery/v1alpha1:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/runtime/schema:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/util/runtime:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/util/sets:go_default_library", diff --git a/cmd/kube-controller-manager/app/discovery.go b/cmd/kube-controller-manager/app/discovery.go index c4b0ef5c5a6..7570ea7dde0 100644 --- a/cmd/kube-controller-manager/app/discovery.go +++ b/cmd/kube-controller-manager/app/discovery.go @@ -23,12 +23,14 @@ package app import ( "net/http" - "k8s.io/apimachinery/pkg/runtime/schema" + discoveryv1alpha1 "k8s.io/api/discovery/v1alpha1" + "k8s.io/klog" endpointslicecontroller "k8s.io/kubernetes/pkg/controller/endpointslice" ) func startEndpointSliceController(ctx ControllerContext) (http.Handler, bool, error) { - if !ctx.AvailableResources[schema.GroupVersionResource{Group: "discovery", Version: "v1alpha1", Resource: "endpointslices"}] { + if !ctx.AvailableResources[discoveryv1alpha1.SchemeGroupVersion.WithResource("endpointslices")] { + klog.Warningf("Not starting endpointslice-controller, discovery.k8s.io/v1alpha1 resources are not available") return nil, false, nil } diff --git a/pkg/apis/discovery/validation/validation.go b/pkg/apis/discovery/validation/validation.go index 50695f68263..329e4e2f82b 100644 --- a/pkg/apis/discovery/validation/validation.go +++ b/pkg/apis/discovery/validation/validation.go @@ -36,10 +36,14 @@ var ( maxEndpoints = 1000 ) +// ValidateEndpointSliceName can be used to check whether the given endpoint +// slice name is valid. Prefix indicates this name will be used as part of +// generation, in which case trailing dashes are allowed. +var ValidateEndpointSliceName = apimachineryvalidation.NameIsDNSSubdomain + // ValidateEndpointSlice validates an EndpointSlice. func ValidateEndpointSlice(endpointSlice *discovery.EndpointSlice) field.ErrorList { - validateEndpointSliceName := apimachineryvalidation.NameIsDNSSubdomain - allErrs := apivalidation.ValidateObjectMeta(&endpointSlice.ObjectMeta, true, validateEndpointSliceName, field.NewPath("metadata")) + allErrs := apivalidation.ValidateObjectMeta(&endpointSlice.ObjectMeta, true, ValidateEndpointSliceName, field.NewPath("metadata")) addrType := discovery.AddressType("") if endpointSlice.AddressType == nil { diff --git a/pkg/controller/.import-restrictions b/pkg/controller/.import-restrictions index 9c1d574230b..896a720510c 100644 --- a/pkg/controller/.import-restrictions +++ b/pkg/controller/.import-restrictions @@ -188,6 +188,8 @@ "k8s.io/kubernetes/pkg/apis/core/v1", "k8s.io/kubernetes/pkg/apis/core/v1/helper", "k8s.io/kubernetes/pkg/apis/core/validation", + "k8s.io/kubernetes/pkg/apis/discovery", + "k8s.io/kubernetes/pkg/apis/discovery/validation", "k8s.io/kubernetes/pkg/cloudprovider", "k8s.io/kubernetes/pkg/cloudprovider/providers/gce", "k8s.io/kubernetes/pkg/controller", diff --git a/pkg/controller/endpointslice/BUILD b/pkg/controller/endpointslice/BUILD index 5d10cc8ab19..16ea4d62af8 100644 --- a/pkg/controller/endpointslice/BUILD +++ b/pkg/controller/endpointslice/BUILD @@ -13,6 +13,7 @@ go_library( deps = [ "//pkg/api/v1/pod:go_default_library", "//pkg/apis/core:go_default_library", + "//pkg/apis/discovery/validation:go_default_library", "//pkg/controller:go_default_library", "//pkg/controller/util/endpoint:go_default_library", "//pkg/util/hash:go_default_library", diff --git a/pkg/controller/endpointslice/endpointslice_controller.go b/pkg/controller/endpointslice/endpointslice_controller.go index d5fef2e1154..e956530dcd3 100644 --- a/pkg/controller/endpointslice/endpointslice_controller.go +++ b/pkg/controller/endpointslice/endpointslice_controller.go @@ -21,6 +21,7 @@ import ( "time" v1 "k8s.io/api/core/v1" + discovery "k8s.io/api/discovery/v1alpha1" apierrors "k8s.io/apimachinery/pkg/api/errors" "k8s.io/apimachinery/pkg/labels" utilruntime "k8s.io/apimachinery/pkg/util/runtime" @@ -42,10 +43,6 @@ import ( ) const ( - // serviceNameLabel is used to indicate the name of a Kubernetes service - // associated with an EndpointSlice. - serviceNameLabel = "kubernetes.io/service-name" - // maxRetries is the number of times a service will be retried before it is // dropped out of the queue. Any sync error, such as a failure to create or // update an EndpointSlice could trigger a retry. With the current @@ -276,7 +273,7 @@ func (c *Controller) syncService(key string) error { return err } - esLabelSelector := labels.Set(map[string]string{serviceNameLabel: service.Name}).AsSelectorPreValidated() + esLabelSelector := labels.Set(map[string]string{discovery.LabelServiceName: service.Name}).AsSelectorPreValidated() endpointSlices, err := c.endpointSliceLister.EndpointSlices(service.Namespace).List(esLabelSelector) if err != nil { diff --git a/pkg/controller/endpointslice/endpointslice_controller_test.go b/pkg/controller/endpointslice/endpointslice_controller_test.go index 15524eea21f..e8047b625a3 100644 --- a/pkg/controller/endpointslice/endpointslice_controller_test.go +++ b/pkg/controller/endpointslice/endpointslice_controller_test.go @@ -108,7 +108,7 @@ func TestSyncServiceWithSelector(t *testing.T) { assert.Len(t, sliceList.Items, 1, "Expected 1 endpoint slices") slice := sliceList.Items[0] assert.Regexp(t, "^"+serviceName, slice.Name) - assert.Equal(t, serviceName, slice.Labels[serviceNameLabel]) + assert.Equal(t, serviceName, slice.Labels[discovery.LabelServiceName]) assert.EqualValues(t, []discovery.EndpointPort{}, slice.Ports) assert.EqualValues(t, []discovery.Endpoint{}, slice.Endpoints) assert.NotEmpty(t, slice.Annotations["endpoints.kubernetes.io/last-change-trigger-time"]) @@ -189,11 +189,11 @@ func TestSyncServiceEndpointSliceSelection(t *testing.T) { // 3 slices, 2 with matching labels for our service endpointSlices := []*discovery.EndpointSlice{{ - ObjectMeta: metav1.ObjectMeta{Name: "matching-1", Namespace: ns, Labels: map[string]string{serviceNameLabel: serviceName}}, + ObjectMeta: metav1.ObjectMeta{Name: "matching-1", Namespace: ns, Labels: map[string]string{discovery.LabelServiceName: serviceName}}, }, { - ObjectMeta: metav1.ObjectMeta{Name: "matching-2", Namespace: ns, Labels: map[string]string{serviceNameLabel: serviceName}}, + ObjectMeta: metav1.ObjectMeta{Name: "matching-2", Namespace: ns, Labels: map[string]string{discovery.LabelServiceName: serviceName}}, }, { - ObjectMeta: metav1.ObjectMeta{Name: "not-matching-1", Namespace: ns, Labels: map[string]string{serviceNameLabel: "something-else"}}, + ObjectMeta: metav1.ObjectMeta{Name: "not-matching-1", Namespace: ns, Labels: map[string]string{discovery.LabelServiceName: "something-else"}}, }} // need to add them to both store and fake clientset diff --git a/pkg/controller/endpointslice/reconciler_test.go b/pkg/controller/endpointslice/reconciler_test.go index dea28337a3b..031f18b0bbd 100644 --- a/pkg/controller/endpointslice/reconciler_test.go +++ b/pkg/controller/endpointslice/reconciler_test.go @@ -23,7 +23,6 @@ import ( "github.com/stretchr/testify/assert" corev1 "k8s.io/api/core/v1" - v1 "k8s.io/api/core/v1" discovery "k8s.io/api/discovery/v1alpha1" apiequality "k8s.io/apimachinery/pkg/api/equality" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" @@ -52,7 +51,7 @@ func TestReconcileEmpty(t *testing.T) { assert.Len(t, slices, 1, "Expected 1 endpoint slices") assert.Regexp(t, "^"+svc.Name, slices[0].Name) - assert.Equal(t, svc.Name, slices[0].Labels[serviceNameLabel]) + assert.Equal(t, svc.Name, slices[0].Labels[discovery.LabelServiceName]) assert.EqualValues(t, []discovery.EndpointPort{}, slices[0].Ports) assert.EqualValues(t, []discovery.Endpoint{}, slices[0].Endpoints) } @@ -83,7 +82,7 @@ func TestReconcile1Pod(t *testing.T) { slices := fetchEndpointSlices(t, client, namespace) assert.Len(t, slices, 1, "Expected 1 endpoint slices") assert.Regexp(t, "^"+svc.Name, slices[0].Name) - assert.Equal(t, svc.Name, slices[0].Labels[serviceNameLabel]) + assert.Equal(t, svc.Name, slices[0].Labels[discovery.LabelServiceName]) assert.Equal(t, slices[0].Annotations, map[string]string{ "endpoints.kubernetes.io/last-change-trigger-time": triggerTime.Format(time.RFC3339Nano), }) @@ -125,7 +124,7 @@ func TestReconcile1EndpointSlice(t *testing.T) { assert.Len(t, slices, 1, "Expected 1 endpoint slices") assert.Regexp(t, "^"+svc.Name, slices[0].Name) - assert.Equal(t, svc.Name, slices[0].Labels[serviceNameLabel]) + assert.Equal(t, svc.Name, slices[0].Labels[discovery.LabelServiceName]) assert.EqualValues(t, []discovery.EndpointPort{}, slices[0].Ports) assert.EqualValues(t, []discovery.Endpoint{}, slices[0].Endpoints) } @@ -362,9 +361,9 @@ func TestReconcileEndpointSlicesUpdatePacking(t *testing.T) { // ensure that endpoints in each slice will be marked for update. for i, pod := range pods { if i%10 == 0 { - pod.Status.Conditions = []v1.PodCondition{{ - Type: v1.PodReady, - Status: v1.ConditionFalse, + pod.Status.Conditions = []corev1.PodCondition{{ + Type: corev1.PodReady, + Status: corev1.ConditionFalse, }} } } @@ -397,10 +396,10 @@ func TestReconcileEndpointSlicesNamedPorts(t *testing.T) { svc := corev1.Service{ ObjectMeta: metav1.ObjectMeta{Name: "named-port-example", Namespace: namespace}, - Spec: v1.ServiceSpec{ - Ports: []v1.ServicePort{{ + Spec: corev1.ServiceSpec{ + Ports: []corev1.ServicePort{{ TargetPort: portNameIntStr, - Protocol: v1.ProtocolTCP, + Protocol: corev1.ProtocolTCP, }}, Selector: map[string]string{"foo": "bar"}, }, @@ -412,10 +411,10 @@ func TestReconcileEndpointSlicesNamedPorts(t *testing.T) { ready := !(i%3 == 0) portOffset := i % 5 pod := newPod(i, namespace, ready, 1) - pod.Spec.Containers[0].Ports = []v1.ContainerPort{{ + pod.Spec.Containers[0].Ports = []corev1.ContainerPort{{ Name: portNameIntStr.StrVal, ContainerPort: int32(8080 + portOffset), - Protocol: v1.ProtocolTCP, + Protocol: corev1.ProtocolTCP, }} pods = append(pods, pod) } @@ -433,7 +432,7 @@ func TestReconcileEndpointSlicesNamedPorts(t *testing.T) { expectUnorderedSlicesWithLengths(t, fetchedSlices, []int{60, 60, 60, 60, 60}) // generate data structures for expected slice ports and address types - protoTCP := v1.ProtocolTCP + protoTCP := corev1.ProtocolTCP ipAddressType := discovery.AddressTypeIP expectedSlices := []discovery.EndpointSlice{} for i := range fetchedSlices { diff --git a/pkg/controller/endpointslice/utils.go b/pkg/controller/endpointslice/utils.go index ff093d78e64..7de30bd2260 100644 --- a/pkg/controller/endpointslice/utils.go +++ b/pkg/controller/endpointslice/utils.go @@ -32,6 +32,7 @@ import ( "k8s.io/klog" podutil "k8s.io/kubernetes/pkg/api/v1/pod" api "k8s.io/kubernetes/pkg/apis/core" + "k8s.io/kubernetes/pkg/apis/discovery/validation" "k8s.io/kubernetes/pkg/util/hash" ) @@ -158,8 +159,8 @@ func newEndpointSlice(service *corev1.Service, endpointMeta *endpointMeta) *disc ownerRef := metav1.NewControllerRef(service, gvk) return &discovery.EndpointSlice{ ObjectMeta: metav1.ObjectMeta{ - Labels: map[string]string{serviceNameLabel: service.Name}, - GenerateName: fmt.Sprintf("%s.", service.Name), + Labels: map[string]string{discovery.LabelServiceName: service.Name}, + GenerateName: getEndpointSlicePrefix(service.Name), OwnerReferences: []metav1.OwnerReference{*ownerRef}, Namespace: service.Namespace, }, @@ -169,6 +170,16 @@ func newEndpointSlice(service *corev1.Service, endpointMeta *endpointMeta) *disc } } +// getEndpointSlicePrefix returns a suitable prefix for an EndpointSlice name. +func getEndpointSlicePrefix(serviceName string) string { + // use the dash (if the name isn't too long) to make the pod name a bit prettier + prefix := fmt.Sprintf("%s-", serviceName) + if len(validation.ValidateEndpointSliceName(prefix, true)) != 0 { + prefix = serviceName + } + return prefix +} + // boolPtrChanged returns true if a set of bool pointers have different values. func boolPtrChanged(ptr1, ptr2 *bool) bool { if (ptr1 == nil) != (ptr2 == nil) { diff --git a/pkg/controller/endpointslice/utils_test.go b/pkg/controller/endpointslice/utils_test.go index c14269b833e..da482f77d9d 100644 --- a/pkg/controller/endpointslice/utils_test.go +++ b/pkg/controller/endpointslice/utils_test.go @@ -22,7 +22,6 @@ import ( "time" "github.com/stretchr/testify/assert" - corev1 "k8s.io/api/core/v1" v1 "k8s.io/api/core/v1" discovery "k8s.io/api/discovery/v1alpha1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" @@ -58,8 +57,8 @@ func TestNewEndpointSlice(t *testing.T) { expectedSlice := discovery.EndpointSlice{ ObjectMeta: metav1.ObjectMeta{ - Labels: map[string]string{serviceNameLabel: service.Name}, - GenerateName: fmt.Sprintf("%s.", service.Name), + Labels: map[string]string{discovery.LabelServiceName: service.Name}, + GenerateName: fmt.Sprintf("%s-", service.Name), OwnerReferences: []metav1.OwnerReference{*ownerRef}, Namespace: service.Namespace, }, @@ -81,7 +80,7 @@ func TestPodToEndpoint(t *testing.T) { multiIPPod.Status.PodIPs = []v1.PodIP{{IP: "1.2.3.4"}, {IP: "1234::5678:0000:0000:9abc:def0"}} - node1 := &corev1.Node{ + node1 := &v1.Node{ ObjectMeta: metav1.ObjectMeta{ Name: readyPod.Spec.NodeName, Labels: map[string]string{ @@ -288,14 +287,14 @@ func newClientset() *fake.Clientset { return client } -func newServiceAndendpointMeta(name, namespace string) (corev1.Service, endpointMeta) { +func newServiceAndendpointMeta(name, namespace string) (v1.Service, endpointMeta) { portNum := int32(80) portNameIntStr := intstr.IntOrString{ Type: intstr.Int, IntVal: portNum, } - svc := corev1.Service{ + svc := v1.Service{ ObjectMeta: metav1.ObjectMeta{Name: name, Namespace: namespace}, Spec: v1.ServiceSpec{ Ports: []v1.ServicePort{{ @@ -317,7 +316,7 @@ func newServiceAndendpointMeta(name, namespace string) (corev1.Service, endpoint return svc, endpointMeta } -func newEmptyEndpointSlice(n int, namespace string, endpointMeta endpointMeta, svc corev1.Service) *discovery.EndpointSlice { +func newEmptyEndpointSlice(n int, namespace string, endpointMeta endpointMeta, svc v1.Service) *discovery.EndpointSlice { return &discovery.EndpointSlice{ ObjectMeta: metav1.ObjectMeta{ Name: fmt.Sprintf("%s.%d", svc.Name, n), diff --git a/pkg/master/reconcilers/endpointsadapter.go b/pkg/master/reconcilers/endpointsadapter.go index 5eee2fca15c..19fb7afb4e7 100644 --- a/pkg/master/reconcilers/endpointsadapter.go +++ b/pkg/master/reconcilers/endpointsadapter.go @@ -25,12 +25,6 @@ import ( discoveryclient "k8s.io/client-go/kubernetes/typed/discovery/v1alpha1" ) -const ( - // serviceNameLabel is used to indicate the name of a Kubernetes service - // associated with an EndpointSlice. - serviceNameLabel = "kubernetes.io/service-name" -) - // EndpointsAdapter provides a simple interface for reading and writing both // Endpoints and Endpoint Slices. // NOTE: This is an incomplete adapter implementation that is only suitable for @@ -103,8 +97,7 @@ func (adapter *EndpointsAdapter) ensureEndpointSliceFromEndpoints(namespace stri func endpointSliceFromEndpoints(endpoints *corev1.Endpoints) *discovery.EndpointSlice { endpointSlice := &discovery.EndpointSlice{} endpointSlice.Name = endpoints.Name - endpointSlice.Labels = map[string]string{serviceNameLabel: endpoints.Name} - endpointSlice.OwnerReferences = []metav1.OwnerReference{{Kind: "Service", Name: endpoints.Name}} + endpointSlice.Labels = map[string]string{discovery.LabelServiceName: endpoints.Name} ipAddressType := discovery.AddressTypeIP endpointSlice.AddressType = &ipAddressType diff --git a/pkg/master/reconcilers/endpointsadapter_test.go b/pkg/master/reconcilers/endpointsadapter_test.go index f751866a962..188dfb63f1a 100644 --- a/pkg/master/reconcilers/endpointsadapter_test.go +++ b/pkg/master/reconcilers/endpointsadapter_test.go @@ -294,8 +294,7 @@ func generateEndpointsAndSlice(name, namespace string, ports []int, addresses [] addressTypeIP := discovery.AddressTypeIP epSlice := &discovery.EndpointSlice{ObjectMeta: objectMeta, AddressType: &addressTypeIP} - epSlice.Labels = map[string]string{serviceNameLabel: name} - epSlice.OwnerReferences = []metav1.OwnerReference{{Kind: "Service", Name: name}} + epSlice.Labels = map[string]string{discovery.LabelServiceName: name} subset := corev1.EndpointSubset{} for i, port := range ports { diff --git a/pkg/proxy/endpoints.go b/pkg/proxy/endpoints.go index 9bf1bdd4a25..e78f9290875 100644 --- a/pkg/proxy/endpoints.go +++ b/pkg/proxy/endpoints.go @@ -179,7 +179,11 @@ func (ect *EndpointChangeTracker) EndpointSliceUpdate(endpointSlice *discovery.E return false } - namespacedName, _ := endpointSliceCacheKeys(endpointSlice) + namespacedName, _, err := endpointSliceCacheKeys(endpointSlice) + if err != nil { + klog.Warningf("Error getting endpoint slice cache keys: %v", err) + return false + } metrics.EndpointChangesTotal.Inc() diff --git a/pkg/proxy/endpointslicecache.go b/pkg/proxy/endpointslicecache.go index 229b7744424..9b81a647624 100644 --- a/pkg/proxy/endpointslicecache.go +++ b/pkg/proxy/endpointslicecache.go @@ -17,6 +17,7 @@ limitations under the License. package proxy import ( + "fmt" "sort" "k8s.io/api/core/v1" @@ -79,12 +80,12 @@ func standardEndpointInfo(ep *BaseEndpointInfo) Endpoint { // Update a slice in the cache. func (cache *EndpointSliceCache) Update(endpointSlice *discovery.EndpointSlice) { - serviceKey, sliceKey := endpointSliceCacheKeys(endpointSlice) - // This should never actually happen - if serviceKey.Name == "" || serviceKey.Namespace == "" || sliceKey == "" { - klog.Errorf("Invalid endpoint slice, name and owner reference required %v", endpointSlice) + serviceKey, sliceKey, err := endpointSliceCacheKeys(endpointSlice) + if err != nil { + klog.Warningf("Error getting endpoint slice cache keys: %v", err) return } + esInfo := &endpointSliceInfo{ Ports: endpointSlice.Ports, Endpoints: []*endpointInfo{}, @@ -105,7 +106,11 @@ func (cache *EndpointSliceCache) Update(endpointSlice *discovery.EndpointSlice) // Delete a slice from the cache. func (cache *EndpointSliceCache) Delete(endpointSlice *discovery.EndpointSlice) { - serviceKey, sliceKey := endpointSliceCacheKeys(endpointSlice) + serviceKey, sliceKey, err := endpointSliceCacheKeys(endpointSlice) + if err != nil { + klog.Warningf("Error getting endpoint slice cache keys: %v", err) + return + } delete(cache.sliceByServiceMap[serviceKey], sliceKey) } @@ -214,16 +219,15 @@ func formatEndpointsList(endpoints []Endpoint) []string { } // endpointSliceCacheKeys returns cache keys used for a given EndpointSlice. -func endpointSliceCacheKeys(endpointSlice *discovery.EndpointSlice) (types.NamespacedName, string) { - if len(endpointSlice.OwnerReferences) == 0 { - klog.Errorf("No owner reference set on endpoint slice: %s", endpointSlice.Name) - return types.NamespacedName{}, endpointSlice.Name +func endpointSliceCacheKeys(endpointSlice *discovery.EndpointSlice) (types.NamespacedName, string, error) { + var err error + serviceName, ok := endpointSlice.Labels[discovery.LabelServiceName] + if !ok || serviceName == "" { + err = fmt.Errorf("No %s label set on endpoint slice: %s", discovery.LabelServiceName, endpointSlice.Name) + } else if endpointSlice.Namespace == "" || endpointSlice.Name == "" { + err = fmt.Errorf("Expected EndpointSlice name and namespace to be set: %v", endpointSlice) } - if len(endpointSlice.OwnerReferences) > 1 { - klog.Errorf("More than 1 owner reference set on endpoint slice: %s", endpointSlice.Name) - } - ownerRef := endpointSlice.OwnerReferences[0] - return types.NamespacedName{Namespace: endpointSlice.Namespace, Name: ownerRef.Name}, endpointSlice.Name + return types.NamespacedName{Namespace: endpointSlice.Namespace, Name: serviceName}, endpointSlice.Name, err } // byIP helps sort endpoints by IP diff --git a/pkg/proxy/endpointslicecache_test.go b/pkg/proxy/endpointslicecache_test.go index 450f2d083c4..73dcc683692 100644 --- a/pkg/proxy/endpointslicecache_test.go +++ b/pkg/proxy/endpointslicecache_test.go @@ -200,9 +200,9 @@ func generateEndpointSliceWithOffset(serviceName, namespace string, sliceNum, of ipAddressType := discovery.AddressTypeIP endpointSlice := &discovery.EndpointSlice{ ObjectMeta: metav1.ObjectMeta{ - Name: fmt.Sprintf("%s-%d", serviceName, sliceNum), - Namespace: namespace, - OwnerReferences: []metav1.OwnerReference{{Kind: "Service", Name: serviceName}}, + Name: fmt.Sprintf("%s-%d", serviceName, sliceNum), + Namespace: namespace, + Labels: map[string]string{discovery.LabelServiceName: serviceName}, }, Ports: []discovery.EndpointPort{}, AddressType: &ipAddressType, diff --git a/pkg/proxy/iptables/proxier.go b/pkg/proxy/iptables/proxier.go index 5b4741b9d4c..0f84fd8a899 100644 --- a/pkg/proxy/iptables/proxier.go +++ b/pkg/proxy/iptables/proxier.go @@ -505,7 +505,11 @@ func (proxier *Proxier) OnServiceDelete(service *v1.Service) { func (proxier *Proxier) OnServiceSynced() { proxier.mu.Lock() proxier.servicesSynced = true - proxier.setInitialized(proxier.endpointsSynced || proxier.endpointSlicesSynced) + if utilfeature.DefaultFeatureGate.Enabled(features.EndpointSlice) { + proxier.setInitialized(proxier.endpointSlicesSynced) + } else { + proxier.setInitialized(proxier.endpointsSynced) + } proxier.mu.Unlock() // Sync unconditionally - this is called once per lifetime. @@ -537,7 +541,7 @@ func (proxier *Proxier) OnEndpointsDelete(endpoints *v1.Endpoints) { func (proxier *Proxier) OnEndpointsSynced() { proxier.mu.Lock() proxier.endpointsSynced = true - proxier.setInitialized(proxier.servicesSynced && proxier.endpointsSynced) + proxier.setInitialized(proxier.servicesSynced) proxier.mu.Unlock() // Sync unconditionally - this is called once per lifetime. @@ -573,7 +577,7 @@ func (proxier *Proxier) OnEndpointSliceDelete(endpointSlice *discovery.EndpointS func (proxier *Proxier) OnEndpointSlicesSynced() { proxier.mu.Lock() proxier.endpointSlicesSynced = true - proxier.setInitialized(proxier.servicesSynced && proxier.endpointSlicesSynced) + proxier.setInitialized(proxier.servicesSynced) proxier.mu.Unlock() // Sync unconditionally - this is called once per lifetime. @@ -675,7 +679,7 @@ func (proxier *Proxier) syncProxyRules() { defer proxier.mu.Unlock() // don't sync rules till we've received services and endpoints - if !proxier.endpointsSynced || !proxier.servicesSynced { + if !proxier.isInitialized() { klog.V(2).Info("Not syncing iptables until Services and Endpoints have been received from master") return } diff --git a/pkg/proxy/iptables/proxier_test.go b/pkg/proxy/iptables/proxier_test.go index 41be026e368..15f9a1bf362 100644 --- a/pkg/proxy/iptables/proxier_test.go +++ b/pkg/proxy/iptables/proxier_test.go @@ -390,6 +390,7 @@ func NewFakeProxier(ipt utiliptables.Interface, endpointSlicesEnabled bool) *Pro nodePortAddresses: make([]string, 0), networkInterfacer: utilproxytest.NewFakeNetwork(), } + p.setInitialized(true) p.syncRunner = async.NewBoundedFrequencyRunner("test-sync-runner", p.syncProxyRules, 0, time.Minute, 1) return p } @@ -2335,9 +2336,9 @@ COMMIT ipAddressType := discovery.AddressTypeIP endpointSlice := &discovery.EndpointSlice{ ObjectMeta: metav1.ObjectMeta{ - Name: fmt.Sprintf("%s-1", serviceName), - Namespace: namespaceName, - OwnerReferences: []metav1.OwnerReference{{Kind: "Service", Name: serviceName}}, + Name: fmt.Sprintf("%s-1", serviceName), + Namespace: namespaceName, + Labels: map[string]string{discovery.LabelServiceName: serviceName}, }, Ports: []discovery.EndpointPort{{ Name: utilpointer.StringPtr(""), diff --git a/pkg/proxy/ipvs/BUILD b/pkg/proxy/ipvs/BUILD index 0da921e54c6..ff2c1ab0c16 100644 --- a/pkg/proxy/ipvs/BUILD +++ b/pkg/proxy/ipvs/BUILD @@ -19,6 +19,7 @@ go_test( "//pkg/proxy/ipvs/testing:go_default_library", "//pkg/proxy/util:go_default_library", "//pkg/proxy/util/testing:go_default_library", + "//pkg/util/async:go_default_library", "//pkg/util/ipset:go_default_library", "//pkg/util/ipset/testing:go_default_library", "//pkg/util/iptables:go_default_library", diff --git a/pkg/proxy/ipvs/proxier.go b/pkg/proxy/ipvs/proxier.go index c344806c3fc..9fc9d259226 100644 --- a/pkg/proxy/ipvs/proxier.go +++ b/pkg/proxy/ipvs/proxier.go @@ -819,7 +819,11 @@ func (proxier *Proxier) OnServiceDelete(service *v1.Service) { func (proxier *Proxier) OnServiceSynced() { proxier.mu.Lock() proxier.servicesSynced = true - proxier.setInitialized(proxier.endpointsSynced || proxier.endpointSlicesSynced) + if utilfeature.DefaultFeatureGate.Enabled(features.EndpointSlice) { + proxier.setInitialized(proxier.endpointSlicesSynced) + } else { + proxier.setInitialized(proxier.endpointsSynced) + } proxier.mu.Unlock() // Sync unconditionally - this is called once per lifetime. @@ -847,7 +851,7 @@ func (proxier *Proxier) OnEndpointsDelete(endpoints *v1.Endpoints) { func (proxier *Proxier) OnEndpointsSynced() { proxier.mu.Lock() proxier.endpointsSynced = true - proxier.setInitialized(proxier.servicesSynced && proxier.endpointsSynced) + proxier.setInitialized(proxier.servicesSynced) proxier.mu.Unlock() // Sync unconditionally - this is called once per lifetime. @@ -883,7 +887,7 @@ func (proxier *Proxier) OnEndpointSliceDelete(endpointSlice *discovery.EndpointS func (proxier *Proxier) OnEndpointSlicesSynced() { proxier.mu.Lock() proxier.endpointSlicesSynced = true - proxier.setInitialized(proxier.servicesSynced && proxier.endpointSlicesSynced) + proxier.setInitialized(proxier.servicesSynced) proxier.mu.Unlock() // Sync unconditionally - this is called once per lifetime. @@ -900,7 +904,7 @@ func (proxier *Proxier) syncProxyRules() { defer proxier.mu.Unlock() // don't sync rules till we've received services and endpoints - if !proxier.endpointsSynced || !proxier.servicesSynced { + if !proxier.isInitialized() { klog.V(2).Info("Not syncing ipvs rules until Services and Endpoints have been received from master") return } diff --git a/pkg/proxy/ipvs/proxier_test.go b/pkg/proxy/ipvs/proxier_test.go index 324bcf26fc2..ac9f1a19a92 100644 --- a/pkg/proxy/ipvs/proxier_test.go +++ b/pkg/proxy/ipvs/proxier_test.go @@ -24,6 +24,7 @@ import ( "sort" "strings" "testing" + "time" "github.com/stretchr/testify/assert" v1 "k8s.io/api/core/v1" @@ -36,6 +37,7 @@ import ( netlinktest "k8s.io/kubernetes/pkg/proxy/ipvs/testing" utilproxy "k8s.io/kubernetes/pkg/proxy/util" proxyutiltest "k8s.io/kubernetes/pkg/proxy/util/testing" + "k8s.io/kubernetes/pkg/util/async" utilipset "k8s.io/kubernetes/pkg/util/ipset" ipsettest "k8s.io/kubernetes/pkg/util/ipset/testing" utiliptables "k8s.io/kubernetes/pkg/util/iptables" @@ -134,7 +136,7 @@ func NewFakeProxier(ipt utiliptables.Interface, ipvs utilipvs.Interface, ipset u for _, is := range ipsetInfo { ipsetList[is.name] = NewIPSet(ipset, is.name, is.setType, false, is.comment) } - return &Proxier{ + p := &Proxier{ exec: fexec, serviceMap: make(proxy.ServiceMap), serviceChanges: proxy.NewServiceChangeTracker(newServiceInfo, nil, nil), @@ -164,6 +166,9 @@ func NewFakeProxier(ipt utiliptables.Interface, ipvs utilipvs.Interface, ipset u networkInterfacer: proxyutiltest.NewFakeNetwork(), gracefuldeleteManager: NewGracefulTerminationManager(ipvs), } + p.setInitialized(true) + p.syncRunner = async.NewBoundedFrequencyRunner("test-sync-runner", p.syncProxyRules, 0, time.Minute, 1) + return p } func makeNSN(namespace, name string) types.NamespacedName { @@ -3660,9 +3665,9 @@ func TestEndpointSliceE2E(t *testing.T) { ipAddressType := discovery.AddressTypeIP endpointSlice := &discovery.EndpointSlice{ ObjectMeta: metav1.ObjectMeta{ - Name: fmt.Sprintf("%s-1", serviceName), - Namespace: namespaceName, - OwnerReferences: []metav1.OwnerReference{{Kind: "Service", Name: serviceName}}, + Name: fmt.Sprintf("%s-1", serviceName), + Namespace: namespaceName, + Labels: map[string]string{discovery.LabelServiceName: serviceName}, }, Ports: []discovery.EndpointPort{{ Name: utilpointer.StringPtr(""), diff --git a/staging/src/k8s.io/api/discovery/v1alpha1/BUILD b/staging/src/k8s.io/api/discovery/v1alpha1/BUILD index cc76727f871..283479e10c0 100644 --- a/staging/src/k8s.io/api/discovery/v1alpha1/BUILD +++ b/staging/src/k8s.io/api/discovery/v1alpha1/BUILD @@ -8,6 +8,7 @@ go_library( "register.go", "types.go", "types_swagger_doc_generated.go", + "well_known_labels.go", "zz_generated.deepcopy.go", ], importmap = "k8s.io/kubernetes/vendor/k8s.io/api/discovery/v1alpha1", diff --git a/staging/src/k8s.io/api/discovery/v1alpha1/well_known_labels.go b/staging/src/k8s.io/api/discovery/v1alpha1/well_known_labels.go new file mode 100644 index 00000000000..850cd205925 --- /dev/null +++ b/staging/src/k8s.io/api/discovery/v1alpha1/well_known_labels.go @@ -0,0 +1,22 @@ +/* +Copyright 2019 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package v1alpha1 + +const ( + // LabelServiceName is used to indicate the name of a Kubernetes service. + LabelServiceName = "kubernetes.io/service-name" +)