diff --git a/cmd/kube-controller-manager/app/BUILD b/cmd/kube-controller-manager/app/BUILD index 0070c5c4603..e1ac4f3b5a7 100644 --- a/cmd/kube-controller-manager/app/BUILD +++ b/cmd/kube-controller-manager/app/BUILD @@ -112,6 +112,7 @@ go_library( "//pkg/volume/util:go_default_library", "//pkg/volume/vsphere_volume:go_default_library", "//staging/src/k8s.io/api/core/v1:go_default_library", + "//staging/src/k8s.io/api/discovery/v1alpha1:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/runtime/schema:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/util/runtime:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/util/sets:go_default_library", diff --git a/cmd/kube-controller-manager/app/discovery.go b/cmd/kube-controller-manager/app/discovery.go index c4b0ef5c5a6..7570ea7dde0 100644 --- a/cmd/kube-controller-manager/app/discovery.go +++ b/cmd/kube-controller-manager/app/discovery.go @@ -23,12 +23,14 @@ package app import ( "net/http" - "k8s.io/apimachinery/pkg/runtime/schema" + discoveryv1alpha1 "k8s.io/api/discovery/v1alpha1" + "k8s.io/klog" endpointslicecontroller "k8s.io/kubernetes/pkg/controller/endpointslice" ) func startEndpointSliceController(ctx ControllerContext) (http.Handler, bool, error) { - if !ctx.AvailableResources[schema.GroupVersionResource{Group: "discovery", Version: "v1alpha1", Resource: "endpointslices"}] { + if !ctx.AvailableResources[discoveryv1alpha1.SchemeGroupVersion.WithResource("endpointslices")] { + klog.Warningf("Not starting endpointslice-controller, discovery.k8s.io/v1alpha1 resources are not available") return nil, false, nil } diff --git a/pkg/apis/discovery/validation/validation.go b/pkg/apis/discovery/validation/validation.go index 50695f68263..329e4e2f82b 100644 --- a/pkg/apis/discovery/validation/validation.go +++ b/pkg/apis/discovery/validation/validation.go @@ -36,10 +36,14 @@ var ( maxEndpoints = 1000 ) +// ValidateEndpointSliceName can be used to check whether the given endpoint +// slice name is valid. Prefix indicates this name will be used as part of +// generation, in which case trailing dashes are allowed. +var ValidateEndpointSliceName = apimachineryvalidation.NameIsDNSSubdomain + // ValidateEndpointSlice validates an EndpointSlice. func ValidateEndpointSlice(endpointSlice *discovery.EndpointSlice) field.ErrorList { - validateEndpointSliceName := apimachineryvalidation.NameIsDNSSubdomain - allErrs := apivalidation.ValidateObjectMeta(&endpointSlice.ObjectMeta, true, validateEndpointSliceName, field.NewPath("metadata")) + allErrs := apivalidation.ValidateObjectMeta(&endpointSlice.ObjectMeta, true, ValidateEndpointSliceName, field.NewPath("metadata")) addrType := discovery.AddressType("") if endpointSlice.AddressType == nil { diff --git a/pkg/controller/.import-restrictions b/pkg/controller/.import-restrictions index 9c1d574230b..896a720510c 100644 --- a/pkg/controller/.import-restrictions +++ b/pkg/controller/.import-restrictions @@ -188,6 +188,8 @@ "k8s.io/kubernetes/pkg/apis/core/v1", "k8s.io/kubernetes/pkg/apis/core/v1/helper", "k8s.io/kubernetes/pkg/apis/core/validation", + "k8s.io/kubernetes/pkg/apis/discovery", + "k8s.io/kubernetes/pkg/apis/discovery/validation", "k8s.io/kubernetes/pkg/cloudprovider", "k8s.io/kubernetes/pkg/cloudprovider/providers/gce", "k8s.io/kubernetes/pkg/controller", diff --git a/pkg/controller/endpointslice/BUILD b/pkg/controller/endpointslice/BUILD index 5d10cc8ab19..16ea4d62af8 100644 --- a/pkg/controller/endpointslice/BUILD +++ b/pkg/controller/endpointslice/BUILD @@ -13,6 +13,7 @@ go_library( deps = [ "//pkg/api/v1/pod:go_default_library", "//pkg/apis/core:go_default_library", + "//pkg/apis/discovery/validation:go_default_library", "//pkg/controller:go_default_library", "//pkg/controller/util/endpoint:go_default_library", "//pkg/util/hash:go_default_library", diff --git a/pkg/controller/endpointslice/endpointslice_controller.go b/pkg/controller/endpointslice/endpointslice_controller.go index d5fef2e1154..e956530dcd3 100644 --- a/pkg/controller/endpointslice/endpointslice_controller.go +++ b/pkg/controller/endpointslice/endpointslice_controller.go @@ -21,6 +21,7 @@ import ( "time" v1 "k8s.io/api/core/v1" + discovery "k8s.io/api/discovery/v1alpha1" apierrors "k8s.io/apimachinery/pkg/api/errors" "k8s.io/apimachinery/pkg/labels" utilruntime "k8s.io/apimachinery/pkg/util/runtime" @@ -42,10 +43,6 @@ import ( ) const ( - // serviceNameLabel is used to indicate the name of a Kubernetes service - // associated with an EndpointSlice. - serviceNameLabel = "kubernetes.io/service-name" - // maxRetries is the number of times a service will be retried before it is // dropped out of the queue. Any sync error, such as a failure to create or // update an EndpointSlice could trigger a retry. With the current @@ -276,7 +273,7 @@ func (c *Controller) syncService(key string) error { return err } - esLabelSelector := labels.Set(map[string]string{serviceNameLabel: service.Name}).AsSelectorPreValidated() + esLabelSelector := labels.Set(map[string]string{discovery.LabelServiceName: service.Name}).AsSelectorPreValidated() endpointSlices, err := c.endpointSliceLister.EndpointSlices(service.Namespace).List(esLabelSelector) if err != nil { diff --git a/pkg/controller/endpointslice/endpointslice_controller_test.go b/pkg/controller/endpointslice/endpointslice_controller_test.go index 15524eea21f..e8047b625a3 100644 --- a/pkg/controller/endpointslice/endpointslice_controller_test.go +++ b/pkg/controller/endpointslice/endpointslice_controller_test.go @@ -108,7 +108,7 @@ func TestSyncServiceWithSelector(t *testing.T) { assert.Len(t, sliceList.Items, 1, "Expected 1 endpoint slices") slice := sliceList.Items[0] assert.Regexp(t, "^"+serviceName, slice.Name) - assert.Equal(t, serviceName, slice.Labels[serviceNameLabel]) + assert.Equal(t, serviceName, slice.Labels[discovery.LabelServiceName]) assert.EqualValues(t, []discovery.EndpointPort{}, slice.Ports) assert.EqualValues(t, []discovery.Endpoint{}, slice.Endpoints) assert.NotEmpty(t, slice.Annotations["endpoints.kubernetes.io/last-change-trigger-time"]) @@ -189,11 +189,11 @@ func TestSyncServiceEndpointSliceSelection(t *testing.T) { // 3 slices, 2 with matching labels for our service endpointSlices := []*discovery.EndpointSlice{{ - ObjectMeta: metav1.ObjectMeta{Name: "matching-1", Namespace: ns, Labels: map[string]string{serviceNameLabel: serviceName}}, + ObjectMeta: metav1.ObjectMeta{Name: "matching-1", Namespace: ns, Labels: map[string]string{discovery.LabelServiceName: serviceName}}, }, { - ObjectMeta: metav1.ObjectMeta{Name: "matching-2", Namespace: ns, Labels: map[string]string{serviceNameLabel: serviceName}}, + ObjectMeta: metav1.ObjectMeta{Name: "matching-2", Namespace: ns, Labels: map[string]string{discovery.LabelServiceName: serviceName}}, }, { - ObjectMeta: metav1.ObjectMeta{Name: "not-matching-1", Namespace: ns, Labels: map[string]string{serviceNameLabel: "something-else"}}, + ObjectMeta: metav1.ObjectMeta{Name: "not-matching-1", Namespace: ns, Labels: map[string]string{discovery.LabelServiceName: "something-else"}}, }} // need to add them to both store and fake clientset diff --git a/pkg/controller/endpointslice/reconciler_test.go b/pkg/controller/endpointslice/reconciler_test.go index dea28337a3b..031f18b0bbd 100644 --- a/pkg/controller/endpointslice/reconciler_test.go +++ b/pkg/controller/endpointslice/reconciler_test.go @@ -23,7 +23,6 @@ import ( "github.com/stretchr/testify/assert" corev1 "k8s.io/api/core/v1" - v1 "k8s.io/api/core/v1" discovery "k8s.io/api/discovery/v1alpha1" apiequality "k8s.io/apimachinery/pkg/api/equality" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" @@ -52,7 +51,7 @@ func TestReconcileEmpty(t *testing.T) { assert.Len(t, slices, 1, "Expected 1 endpoint slices") assert.Regexp(t, "^"+svc.Name, slices[0].Name) - assert.Equal(t, svc.Name, slices[0].Labels[serviceNameLabel]) + assert.Equal(t, svc.Name, slices[0].Labels[discovery.LabelServiceName]) assert.EqualValues(t, []discovery.EndpointPort{}, slices[0].Ports) assert.EqualValues(t, []discovery.Endpoint{}, slices[0].Endpoints) } @@ -83,7 +82,7 @@ func TestReconcile1Pod(t *testing.T) { slices := fetchEndpointSlices(t, client, namespace) assert.Len(t, slices, 1, "Expected 1 endpoint slices") assert.Regexp(t, "^"+svc.Name, slices[0].Name) - assert.Equal(t, svc.Name, slices[0].Labels[serviceNameLabel]) + assert.Equal(t, svc.Name, slices[0].Labels[discovery.LabelServiceName]) assert.Equal(t, slices[0].Annotations, map[string]string{ "endpoints.kubernetes.io/last-change-trigger-time": triggerTime.Format(time.RFC3339Nano), }) @@ -125,7 +124,7 @@ func TestReconcile1EndpointSlice(t *testing.T) { assert.Len(t, slices, 1, "Expected 1 endpoint slices") assert.Regexp(t, "^"+svc.Name, slices[0].Name) - assert.Equal(t, svc.Name, slices[0].Labels[serviceNameLabel]) + assert.Equal(t, svc.Name, slices[0].Labels[discovery.LabelServiceName]) assert.EqualValues(t, []discovery.EndpointPort{}, slices[0].Ports) assert.EqualValues(t, []discovery.Endpoint{}, slices[0].Endpoints) } @@ -362,9 +361,9 @@ func TestReconcileEndpointSlicesUpdatePacking(t *testing.T) { // ensure that endpoints in each slice will be marked for update. for i, pod := range pods { if i%10 == 0 { - pod.Status.Conditions = []v1.PodCondition{{ - Type: v1.PodReady, - Status: v1.ConditionFalse, + pod.Status.Conditions = []corev1.PodCondition{{ + Type: corev1.PodReady, + Status: corev1.ConditionFalse, }} } } @@ -397,10 +396,10 @@ func TestReconcileEndpointSlicesNamedPorts(t *testing.T) { svc := corev1.Service{ ObjectMeta: metav1.ObjectMeta{Name: "named-port-example", Namespace: namespace}, - Spec: v1.ServiceSpec{ - Ports: []v1.ServicePort{{ + Spec: corev1.ServiceSpec{ + Ports: []corev1.ServicePort{{ TargetPort: portNameIntStr, - Protocol: v1.ProtocolTCP, + Protocol: corev1.ProtocolTCP, }}, Selector: map[string]string{"foo": "bar"}, }, @@ -412,10 +411,10 @@ func TestReconcileEndpointSlicesNamedPorts(t *testing.T) { ready := !(i%3 == 0) portOffset := i % 5 pod := newPod(i, namespace, ready, 1) - pod.Spec.Containers[0].Ports = []v1.ContainerPort{{ + pod.Spec.Containers[0].Ports = []corev1.ContainerPort{{ Name: portNameIntStr.StrVal, ContainerPort: int32(8080 + portOffset), - Protocol: v1.ProtocolTCP, + Protocol: corev1.ProtocolTCP, }} pods = append(pods, pod) } @@ -433,7 +432,7 @@ func TestReconcileEndpointSlicesNamedPorts(t *testing.T) { expectUnorderedSlicesWithLengths(t, fetchedSlices, []int{60, 60, 60, 60, 60}) // generate data structures for expected slice ports and address types - protoTCP := v1.ProtocolTCP + protoTCP := corev1.ProtocolTCP ipAddressType := discovery.AddressTypeIP expectedSlices := []discovery.EndpointSlice{} for i := range fetchedSlices { diff --git a/pkg/controller/endpointslice/utils.go b/pkg/controller/endpointslice/utils.go index ff093d78e64..7de30bd2260 100644 --- a/pkg/controller/endpointslice/utils.go +++ b/pkg/controller/endpointslice/utils.go @@ -32,6 +32,7 @@ import ( "k8s.io/klog" podutil "k8s.io/kubernetes/pkg/api/v1/pod" api "k8s.io/kubernetes/pkg/apis/core" + "k8s.io/kubernetes/pkg/apis/discovery/validation" "k8s.io/kubernetes/pkg/util/hash" ) @@ -158,8 +159,8 @@ func newEndpointSlice(service *corev1.Service, endpointMeta *endpointMeta) *disc ownerRef := metav1.NewControllerRef(service, gvk) return &discovery.EndpointSlice{ ObjectMeta: metav1.ObjectMeta{ - Labels: map[string]string{serviceNameLabel: service.Name}, - GenerateName: fmt.Sprintf("%s.", service.Name), + Labels: map[string]string{discovery.LabelServiceName: service.Name}, + GenerateName: getEndpointSlicePrefix(service.Name), OwnerReferences: []metav1.OwnerReference{*ownerRef}, Namespace: service.Namespace, }, @@ -169,6 +170,16 @@ func newEndpointSlice(service *corev1.Service, endpointMeta *endpointMeta) *disc } } +// getEndpointSlicePrefix returns a suitable prefix for an EndpointSlice name. +func getEndpointSlicePrefix(serviceName string) string { + // use the dash (if the name isn't too long) to make the pod name a bit prettier + prefix := fmt.Sprintf("%s-", serviceName) + if len(validation.ValidateEndpointSliceName(prefix, true)) != 0 { + prefix = serviceName + } + return prefix +} + // boolPtrChanged returns true if a set of bool pointers have different values. func boolPtrChanged(ptr1, ptr2 *bool) bool { if (ptr1 == nil) != (ptr2 == nil) { diff --git a/pkg/controller/endpointslice/utils_test.go b/pkg/controller/endpointslice/utils_test.go index c14269b833e..da482f77d9d 100644 --- a/pkg/controller/endpointslice/utils_test.go +++ b/pkg/controller/endpointslice/utils_test.go @@ -22,7 +22,6 @@ import ( "time" "github.com/stretchr/testify/assert" - corev1 "k8s.io/api/core/v1" v1 "k8s.io/api/core/v1" discovery "k8s.io/api/discovery/v1alpha1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" @@ -58,8 +57,8 @@ func TestNewEndpointSlice(t *testing.T) { expectedSlice := discovery.EndpointSlice{ ObjectMeta: metav1.ObjectMeta{ - Labels: map[string]string{serviceNameLabel: service.Name}, - GenerateName: fmt.Sprintf("%s.", service.Name), + Labels: map[string]string{discovery.LabelServiceName: service.Name}, + GenerateName: fmt.Sprintf("%s-", service.Name), OwnerReferences: []metav1.OwnerReference{*ownerRef}, Namespace: service.Namespace, }, @@ -81,7 +80,7 @@ func TestPodToEndpoint(t *testing.T) { multiIPPod.Status.PodIPs = []v1.PodIP{{IP: "1.2.3.4"}, {IP: "1234::5678:0000:0000:9abc:def0"}} - node1 := &corev1.Node{ + node1 := &v1.Node{ ObjectMeta: metav1.ObjectMeta{ Name: readyPod.Spec.NodeName, Labels: map[string]string{ @@ -288,14 +287,14 @@ func newClientset() *fake.Clientset { return client } -func newServiceAndendpointMeta(name, namespace string) (corev1.Service, endpointMeta) { +func newServiceAndendpointMeta(name, namespace string) (v1.Service, endpointMeta) { portNum := int32(80) portNameIntStr := intstr.IntOrString{ Type: intstr.Int, IntVal: portNum, } - svc := corev1.Service{ + svc := v1.Service{ ObjectMeta: metav1.ObjectMeta{Name: name, Namespace: namespace}, Spec: v1.ServiceSpec{ Ports: []v1.ServicePort{{ @@ -317,7 +316,7 @@ func newServiceAndendpointMeta(name, namespace string) (corev1.Service, endpoint return svc, endpointMeta } -func newEmptyEndpointSlice(n int, namespace string, endpointMeta endpointMeta, svc corev1.Service) *discovery.EndpointSlice { +func newEmptyEndpointSlice(n int, namespace string, endpointMeta endpointMeta, svc v1.Service) *discovery.EndpointSlice { return &discovery.EndpointSlice{ ObjectMeta: metav1.ObjectMeta{ Name: fmt.Sprintf("%s.%d", svc.Name, n), diff --git a/pkg/master/reconcilers/endpointsadapter.go b/pkg/master/reconcilers/endpointsadapter.go index 5eee2fca15c..19fb7afb4e7 100644 --- a/pkg/master/reconcilers/endpointsadapter.go +++ b/pkg/master/reconcilers/endpointsadapter.go @@ -25,12 +25,6 @@ import ( discoveryclient "k8s.io/client-go/kubernetes/typed/discovery/v1alpha1" ) -const ( - // serviceNameLabel is used to indicate the name of a Kubernetes service - // associated with an EndpointSlice. - serviceNameLabel = "kubernetes.io/service-name" -) - // EndpointsAdapter provides a simple interface for reading and writing both // Endpoints and Endpoint Slices. // NOTE: This is an incomplete adapter implementation that is only suitable for @@ -103,8 +97,7 @@ func (adapter *EndpointsAdapter) ensureEndpointSliceFromEndpoints(namespace stri func endpointSliceFromEndpoints(endpoints *corev1.Endpoints) *discovery.EndpointSlice { endpointSlice := &discovery.EndpointSlice{} endpointSlice.Name = endpoints.Name - endpointSlice.Labels = map[string]string{serviceNameLabel: endpoints.Name} - endpointSlice.OwnerReferences = []metav1.OwnerReference{{Kind: "Service", Name: endpoints.Name}} + endpointSlice.Labels = map[string]string{discovery.LabelServiceName: endpoints.Name} ipAddressType := discovery.AddressTypeIP endpointSlice.AddressType = &ipAddressType diff --git a/pkg/master/reconcilers/endpointsadapter_test.go b/pkg/master/reconcilers/endpointsadapter_test.go index f751866a962..188dfb63f1a 100644 --- a/pkg/master/reconcilers/endpointsadapter_test.go +++ b/pkg/master/reconcilers/endpointsadapter_test.go @@ -294,8 +294,7 @@ func generateEndpointsAndSlice(name, namespace string, ports []int, addresses [] addressTypeIP := discovery.AddressTypeIP epSlice := &discovery.EndpointSlice{ObjectMeta: objectMeta, AddressType: &addressTypeIP} - epSlice.Labels = map[string]string{serviceNameLabel: name} - epSlice.OwnerReferences = []metav1.OwnerReference{{Kind: "Service", Name: name}} + epSlice.Labels = map[string]string{discovery.LabelServiceName: name} subset := corev1.EndpointSubset{} for i, port := range ports { diff --git a/pkg/proxy/endpoints.go b/pkg/proxy/endpoints.go index 9bf1bdd4a25..e78f9290875 100644 --- a/pkg/proxy/endpoints.go +++ b/pkg/proxy/endpoints.go @@ -179,7 +179,11 @@ func (ect *EndpointChangeTracker) EndpointSliceUpdate(endpointSlice *discovery.E return false } - namespacedName, _ := endpointSliceCacheKeys(endpointSlice) + namespacedName, _, err := endpointSliceCacheKeys(endpointSlice) + if err != nil { + klog.Warningf("Error getting endpoint slice cache keys: %v", err) + return false + } metrics.EndpointChangesTotal.Inc() diff --git a/pkg/proxy/endpointslicecache.go b/pkg/proxy/endpointslicecache.go index 229b7744424..9b81a647624 100644 --- a/pkg/proxy/endpointslicecache.go +++ b/pkg/proxy/endpointslicecache.go @@ -17,6 +17,7 @@ limitations under the License. package proxy import ( + "fmt" "sort" "k8s.io/api/core/v1" @@ -79,12 +80,12 @@ func standardEndpointInfo(ep *BaseEndpointInfo) Endpoint { // Update a slice in the cache. func (cache *EndpointSliceCache) Update(endpointSlice *discovery.EndpointSlice) { - serviceKey, sliceKey := endpointSliceCacheKeys(endpointSlice) - // This should never actually happen - if serviceKey.Name == "" || serviceKey.Namespace == "" || sliceKey == "" { - klog.Errorf("Invalid endpoint slice, name and owner reference required %v", endpointSlice) + serviceKey, sliceKey, err := endpointSliceCacheKeys(endpointSlice) + if err != nil { + klog.Warningf("Error getting endpoint slice cache keys: %v", err) return } + esInfo := &endpointSliceInfo{ Ports: endpointSlice.Ports, Endpoints: []*endpointInfo{}, @@ -105,7 +106,11 @@ func (cache *EndpointSliceCache) Update(endpointSlice *discovery.EndpointSlice) // Delete a slice from the cache. func (cache *EndpointSliceCache) Delete(endpointSlice *discovery.EndpointSlice) { - serviceKey, sliceKey := endpointSliceCacheKeys(endpointSlice) + serviceKey, sliceKey, err := endpointSliceCacheKeys(endpointSlice) + if err != nil { + klog.Warningf("Error getting endpoint slice cache keys: %v", err) + return + } delete(cache.sliceByServiceMap[serviceKey], sliceKey) } @@ -214,16 +219,15 @@ func formatEndpointsList(endpoints []Endpoint) []string { } // endpointSliceCacheKeys returns cache keys used for a given EndpointSlice. -func endpointSliceCacheKeys(endpointSlice *discovery.EndpointSlice) (types.NamespacedName, string) { - if len(endpointSlice.OwnerReferences) == 0 { - klog.Errorf("No owner reference set on endpoint slice: %s", endpointSlice.Name) - return types.NamespacedName{}, endpointSlice.Name +func endpointSliceCacheKeys(endpointSlice *discovery.EndpointSlice) (types.NamespacedName, string, error) { + var err error + serviceName, ok := endpointSlice.Labels[discovery.LabelServiceName] + if !ok || serviceName == "" { + err = fmt.Errorf("No %s label set on endpoint slice: %s", discovery.LabelServiceName, endpointSlice.Name) + } else if endpointSlice.Namespace == "" || endpointSlice.Name == "" { + err = fmt.Errorf("Expected EndpointSlice name and namespace to be set: %v", endpointSlice) } - if len(endpointSlice.OwnerReferences) > 1 { - klog.Errorf("More than 1 owner reference set on endpoint slice: %s", endpointSlice.Name) - } - ownerRef := endpointSlice.OwnerReferences[0] - return types.NamespacedName{Namespace: endpointSlice.Namespace, Name: ownerRef.Name}, endpointSlice.Name + return types.NamespacedName{Namespace: endpointSlice.Namespace, Name: serviceName}, endpointSlice.Name, err } // byIP helps sort endpoints by IP diff --git a/pkg/proxy/endpointslicecache_test.go b/pkg/proxy/endpointslicecache_test.go index 450f2d083c4..73dcc683692 100644 --- a/pkg/proxy/endpointslicecache_test.go +++ b/pkg/proxy/endpointslicecache_test.go @@ -200,9 +200,9 @@ func generateEndpointSliceWithOffset(serviceName, namespace string, sliceNum, of ipAddressType := discovery.AddressTypeIP endpointSlice := &discovery.EndpointSlice{ ObjectMeta: metav1.ObjectMeta{ - Name: fmt.Sprintf("%s-%d", serviceName, sliceNum), - Namespace: namespace, - OwnerReferences: []metav1.OwnerReference{{Kind: "Service", Name: serviceName}}, + Name: fmt.Sprintf("%s-%d", serviceName, sliceNum), + Namespace: namespace, + Labels: map[string]string{discovery.LabelServiceName: serviceName}, }, Ports: []discovery.EndpointPort{}, AddressType: &ipAddressType, diff --git a/pkg/proxy/iptables/proxier.go b/pkg/proxy/iptables/proxier.go index c537d5bfae7..5d0afa016ed 100644 --- a/pkg/proxy/iptables/proxier.go +++ b/pkg/proxy/iptables/proxier.go @@ -505,7 +505,11 @@ func (proxier *Proxier) OnServiceDelete(service *v1.Service) { func (proxier *Proxier) OnServiceSynced() { proxier.mu.Lock() proxier.servicesSynced = true - proxier.setInitialized(proxier.endpointsSynced || proxier.endpointSlicesSynced) + if utilfeature.DefaultFeatureGate.Enabled(features.EndpointSlice) { + proxier.setInitialized(proxier.endpointSlicesSynced) + } else { + proxier.setInitialized(proxier.endpointsSynced) + } proxier.mu.Unlock() // Sync unconditionally - this is called once per lifetime. @@ -537,7 +541,7 @@ func (proxier *Proxier) OnEndpointsDelete(endpoints *v1.Endpoints) { func (proxier *Proxier) OnEndpointsSynced() { proxier.mu.Lock() proxier.endpointsSynced = true - proxier.setInitialized(proxier.servicesSynced && proxier.endpointsSynced) + proxier.setInitialized(proxier.servicesSynced) proxier.mu.Unlock() // Sync unconditionally - this is called once per lifetime. @@ -573,7 +577,7 @@ func (proxier *Proxier) OnEndpointSliceDelete(endpointSlice *discovery.EndpointS func (proxier *Proxier) OnEndpointSlicesSynced() { proxier.mu.Lock() proxier.endpointSlicesSynced = true - proxier.setInitialized(proxier.servicesSynced && proxier.endpointSlicesSynced) + proxier.setInitialized(proxier.servicesSynced) proxier.mu.Unlock() // Sync unconditionally - this is called once per lifetime. @@ -675,7 +679,7 @@ func (proxier *Proxier) syncProxyRules() { defer proxier.mu.Unlock() // don't sync rules till we've received services and endpoints - if !proxier.endpointsSynced || !proxier.servicesSynced { + if !proxier.isInitialized() { klog.V(2).Info("Not syncing iptables until Services and Endpoints have been received from master") return } diff --git a/pkg/proxy/iptables/proxier_test.go b/pkg/proxy/iptables/proxier_test.go index 407461a5ac3..86973ef3fb2 100644 --- a/pkg/proxy/iptables/proxier_test.go +++ b/pkg/proxy/iptables/proxier_test.go @@ -390,6 +390,7 @@ func NewFakeProxier(ipt utiliptables.Interface, endpointSlicesEnabled bool) *Pro nodePortAddresses: make([]string, 0), networkInterfacer: utilproxytest.NewFakeNetwork(), } + p.setInitialized(true) p.syncRunner = async.NewBoundedFrequencyRunner("test-sync-runner", p.syncProxyRules, 0, time.Minute, 1) return p } @@ -2363,9 +2364,9 @@ COMMIT ipAddressType := discovery.AddressTypeIP endpointSlice := &discovery.EndpointSlice{ ObjectMeta: metav1.ObjectMeta{ - Name: fmt.Sprintf("%s-1", serviceName), - Namespace: namespaceName, - OwnerReferences: []metav1.OwnerReference{{Kind: "Service", Name: serviceName}}, + Name: fmt.Sprintf("%s-1", serviceName), + Namespace: namespaceName, + Labels: map[string]string{discovery.LabelServiceName: serviceName}, }, Ports: []discovery.EndpointPort{{ Name: utilpointer.StringPtr(""), diff --git a/pkg/proxy/ipvs/BUILD b/pkg/proxy/ipvs/BUILD index 0da921e54c6..ff2c1ab0c16 100644 --- a/pkg/proxy/ipvs/BUILD +++ b/pkg/proxy/ipvs/BUILD @@ -19,6 +19,7 @@ go_test( "//pkg/proxy/ipvs/testing:go_default_library", "//pkg/proxy/util:go_default_library", "//pkg/proxy/util/testing:go_default_library", + "//pkg/util/async:go_default_library", "//pkg/util/ipset:go_default_library", "//pkg/util/ipset/testing:go_default_library", "//pkg/util/iptables:go_default_library", diff --git a/pkg/proxy/ipvs/proxier.go b/pkg/proxy/ipvs/proxier.go index 1f6ed8e81c5..e58782e4b82 100644 --- a/pkg/proxy/ipvs/proxier.go +++ b/pkg/proxy/ipvs/proxier.go @@ -820,7 +820,11 @@ func (proxier *Proxier) OnServiceDelete(service *v1.Service) { func (proxier *Proxier) OnServiceSynced() { proxier.mu.Lock() proxier.servicesSynced = true - proxier.setInitialized(proxier.endpointsSynced || proxier.endpointSlicesSynced) + if utilfeature.DefaultFeatureGate.Enabled(features.EndpointSlice) { + proxier.setInitialized(proxier.endpointSlicesSynced) + } else { + proxier.setInitialized(proxier.endpointsSynced) + } proxier.mu.Unlock() // Sync unconditionally - this is called once per lifetime. @@ -848,7 +852,7 @@ func (proxier *Proxier) OnEndpointsDelete(endpoints *v1.Endpoints) { func (proxier *Proxier) OnEndpointsSynced() { proxier.mu.Lock() proxier.endpointsSynced = true - proxier.setInitialized(proxier.servicesSynced && proxier.endpointsSynced) + proxier.setInitialized(proxier.servicesSynced) proxier.mu.Unlock() // Sync unconditionally - this is called once per lifetime. @@ -884,7 +888,7 @@ func (proxier *Proxier) OnEndpointSliceDelete(endpointSlice *discovery.EndpointS func (proxier *Proxier) OnEndpointSlicesSynced() { proxier.mu.Lock() proxier.endpointSlicesSynced = true - proxier.setInitialized(proxier.servicesSynced && proxier.endpointSlicesSynced) + proxier.setInitialized(proxier.servicesSynced) proxier.mu.Unlock() // Sync unconditionally - this is called once per lifetime. @@ -901,7 +905,7 @@ func (proxier *Proxier) syncProxyRules() { defer proxier.mu.Unlock() // don't sync rules till we've received services and endpoints - if !proxier.endpointsSynced || !proxier.servicesSynced { + if !proxier.isInitialized() { klog.V(2).Info("Not syncing ipvs rules until Services and Endpoints have been received from master") return } diff --git a/pkg/proxy/ipvs/proxier_test.go b/pkg/proxy/ipvs/proxier_test.go index 2d710a257c5..1885cb95d07 100644 --- a/pkg/proxy/ipvs/proxier_test.go +++ b/pkg/proxy/ipvs/proxier_test.go @@ -24,6 +24,7 @@ import ( "sort" "strings" "testing" + "time" "github.com/stretchr/testify/assert" v1 "k8s.io/api/core/v1" @@ -36,6 +37,7 @@ import ( netlinktest "k8s.io/kubernetes/pkg/proxy/ipvs/testing" utilproxy "k8s.io/kubernetes/pkg/proxy/util" proxyutiltest "k8s.io/kubernetes/pkg/proxy/util/testing" + "k8s.io/kubernetes/pkg/util/async" utilipset "k8s.io/kubernetes/pkg/util/ipset" ipsettest "k8s.io/kubernetes/pkg/util/ipset/testing" utiliptables "k8s.io/kubernetes/pkg/util/iptables" @@ -134,7 +136,7 @@ func NewFakeProxier(ipt utiliptables.Interface, ipvs utilipvs.Interface, ipset u for _, is := range ipsetInfo { ipsetList[is.name] = NewIPSet(ipset, is.name, is.setType, false, is.comment) } - return &Proxier{ + p := &Proxier{ exec: fexec, serviceMap: make(proxy.ServiceMap), serviceChanges: proxy.NewServiceChangeTracker(newServiceInfo, nil, nil), @@ -164,6 +166,9 @@ func NewFakeProxier(ipt utiliptables.Interface, ipvs utilipvs.Interface, ipset u networkInterfacer: proxyutiltest.NewFakeNetwork(), gracefuldeleteManager: NewGracefulTerminationManager(ipvs), } + p.setInitialized(true) + p.syncRunner = async.NewBoundedFrequencyRunner("test-sync-runner", p.syncProxyRules, 0, time.Minute, 1) + return p } func makeNSN(namespace, name string) types.NamespacedName { @@ -3690,9 +3695,9 @@ func TestEndpointSliceE2E(t *testing.T) { ipAddressType := discovery.AddressTypeIP endpointSlice := &discovery.EndpointSlice{ ObjectMeta: metav1.ObjectMeta{ - Name: fmt.Sprintf("%s-1", serviceName), - Namespace: namespaceName, - OwnerReferences: []metav1.OwnerReference{{Kind: "Service", Name: serviceName}}, + Name: fmt.Sprintf("%s-1", serviceName), + Namespace: namespaceName, + Labels: map[string]string{discovery.LabelServiceName: serviceName}, }, Ports: []discovery.EndpointPort{{ Name: utilpointer.StringPtr(""), diff --git a/staging/src/k8s.io/api/discovery/v1alpha1/BUILD b/staging/src/k8s.io/api/discovery/v1alpha1/BUILD index cc76727f871..283479e10c0 100644 --- a/staging/src/k8s.io/api/discovery/v1alpha1/BUILD +++ b/staging/src/k8s.io/api/discovery/v1alpha1/BUILD @@ -8,6 +8,7 @@ go_library( "register.go", "types.go", "types_swagger_doc_generated.go", + "well_known_labels.go", "zz_generated.deepcopy.go", ], importmap = "k8s.io/kubernetes/vendor/k8s.io/api/discovery/v1alpha1", diff --git a/staging/src/k8s.io/api/discovery/v1alpha1/well_known_labels.go b/staging/src/k8s.io/api/discovery/v1alpha1/well_known_labels.go new file mode 100644 index 00000000000..850cd205925 --- /dev/null +++ b/staging/src/k8s.io/api/discovery/v1alpha1/well_known_labels.go @@ -0,0 +1,22 @@ +/* +Copyright 2019 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package v1alpha1 + +const ( + // LabelServiceName is used to indicate the name of a Kubernetes service. + LabelServiceName = "kubernetes.io/service-name" +)