diff --git a/pkg/controller/endpoint/endpoints_controller.go b/pkg/controller/endpoint/endpoints_controller.go index 2c91d491911..b090d676223 100644 --- a/pkg/controller/endpoint/endpoints_controller.go +++ b/pkg/controller/endpoint/endpoints_controller.go @@ -472,9 +472,18 @@ func (e *EndpointController) syncService(key string) error { createEndpoints := len(currentEndpoints.ResourceVersion) == 0 + // Compare the sorted subsets and labels + // Remove the HeadlessService label from the endpoints if it exists, + // as this won't be set on the service itself + // and will cause a false negative in this diff check. + // But first check if it has that label to avoid expensive copies. + compareLabels := currentEndpoints.Labels + if _, ok := currentEndpoints.Labels[v1.IsHeadlessService]; ok { + compareLabels = utillabels.CloneAndRemoveLabel(currentEndpoints.Labels, v1.IsHeadlessService) + } if !createEndpoints && apiequality.Semantic.DeepEqual(currentEndpoints.Subsets, subsets) && - apiequality.Semantic.DeepEqual(currentEndpoints.Labels, service.Labels) { + apiequality.Semantic.DeepEqual(compareLabels, service.Labels) { klog.V(5).Infof("endpoints are equal for %s/%s, skipping update", service.Namespace, service.Name) return nil } diff --git a/pkg/controller/endpoint/endpoints_controller_test.go b/pkg/controller/endpoint/endpoints_controller_test.go index 705c10b6e0c..1b3e15948ca 100644 --- a/pkg/controller/endpoint/endpoints_controller_test.go +++ b/pkg/controller/endpoint/endpoints_controller_test.go @@ -383,6 +383,33 @@ func TestSyncEndpointsProtocolTCP(t *testing.T) { endpointsHandler.ValidateRequest(t, "/api/v1/namespaces/"+ns+"/endpoints/foo", "PUT", &data) } +func TestSyncEndpointsHeadlessServiceLabel(t *testing.T) { + ns := metav1.NamespaceDefault + testServer, endpointsHandler := makeTestServer(t, ns) + defer testServer.Close() + endpoints := newController(testServer.URL, 0*time.Second) + endpoints.endpointsStore.Add(&v1.Endpoints{ + ObjectMeta: metav1.ObjectMeta{ + Name: "foo", + Namespace: ns, + ResourceVersion: "1", + Labels: map[string]string{ + v1.IsHeadlessService: "", + }, + }, + Subsets: []v1.EndpointSubset{}, + }) + endpoints.serviceStore.Add(&v1.Service{ + ObjectMeta: metav1.ObjectMeta{Name: "foo", Namespace: ns}, + Spec: v1.ServiceSpec{ + Selector: map[string]string{"foo": "bar"}, + Ports: []v1.ServicePort{{Port: 80}}, + }, + }) + endpoints.syncService(ns + "/foo") + endpointsHandler.ValidateRequestCount(t, 0) +} + func TestSyncEndpointsProtocolUDP(t *testing.T) { ns := "other" testServer, endpointsHandler := makeTestServer(t, ns) diff --git a/pkg/registry/core/endpoint/BUILD b/pkg/registry/core/endpoint/BUILD index 9a88912bced..76bd3e14fb6 100644 --- a/pkg/registry/core/endpoint/BUILD +++ b/pkg/registry/core/endpoint/BUILD @@ -13,7 +13,6 @@ go_library( ], importpath = "k8s.io/kubernetes/pkg/registry/core/endpoint", deps = [ - "//pkg/api/endpoints:go_default_library", "//pkg/api/legacyscheme:go_default_library", "//pkg/apis/core:go_default_library", "//pkg/apis/core/validation:go_default_library", diff --git a/pkg/registry/core/endpoint/strategy.go b/pkg/registry/core/endpoint/strategy.go index af8d1233f7e..9feb751ae84 100644 --- a/pkg/registry/core/endpoint/strategy.go +++ b/pkg/registry/core/endpoint/strategy.go @@ -22,7 +22,6 @@ import ( "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/util/validation/field" "k8s.io/apiserver/pkg/storage/names" - endptspkg "k8s.io/kubernetes/pkg/api/endpoints" "k8s.io/kubernetes/pkg/api/legacyscheme" api "k8s.io/kubernetes/pkg/apis/core" "k8s.io/kubernetes/pkg/apis/core/validation" @@ -60,8 +59,6 @@ func (endpointsStrategy) Validate(ctx context.Context, obj runtime.Object) field // Canonicalize normalizes the object after validation. func (endpointsStrategy) Canonicalize(obj runtime.Object) { - endpoints := obj.(*api.Endpoints) - endpoints.Subsets = endptspkg.RepackSubsets(endpoints.Subsets) } // AllowCreateOnUpdate is true for endpoints. diff --git a/test/integration/BUILD b/test/integration/BUILD index db7c700523b..baeeece5b5c 100644 --- a/test/integration/BUILD +++ b/test/integration/BUILD @@ -50,6 +50,7 @@ filegroup( "//test/integration/deployment:all-srcs", "//test/integration/disruption:all-srcs", "//test/integration/dryrun:all-srcs", + "//test/integration/endpoints:all-srcs", "//test/integration/endpointslice:all-srcs", "//test/integration/etcd:all-srcs", "//test/integration/events:all-srcs", diff --git a/test/integration/endpoints/BUILD b/test/integration/endpoints/BUILD new file mode 100644 index 00000000000..89b1f72994a --- /dev/null +++ b/test/integration/endpoints/BUILD @@ -0,0 +1,35 @@ +load("@io_bazel_rules_go//go:def.bzl", "go_test") + +go_test( + name = "go_default_test", + srcs = [ + "endpoints_test.go", + "main_test.go", + ], + tags = ["integration"], + deps = [ + "//pkg/controller/endpoint:go_default_library", + "//staging/src/k8s.io/api/core/v1:go_default_library", + "//staging/src/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library", + "//staging/src/k8s.io/apimachinery/pkg/util/intstr:go_default_library", + "//staging/src/k8s.io/apimachinery/pkg/util/wait:go_default_library", + "//staging/src/k8s.io/client-go/informers:go_default_library", + "//staging/src/k8s.io/client-go/kubernetes:go_default_library", + "//staging/src/k8s.io/client-go/rest:go_default_library", + "//test/integration/framework:go_default_library", + ], +) + +filegroup( + name = "package-srcs", + srcs = glob(["**"]), + tags = ["automanaged"], + visibility = ["//visibility:private"], +) + +filegroup( + name = "all-srcs", + srcs = [":package-srcs"], + tags = ["automanaged"], + visibility = ["//visibility:public"], +) diff --git a/test/integration/endpoints/endpoints_test.go b/test/integration/endpoints/endpoints_test.go new file mode 100644 index 00000000000..2f28f59af82 --- /dev/null +++ b/test/integration/endpoints/endpoints_test.go @@ -0,0 +1,180 @@ +/* +Copyright 2020 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package endpoints + +import ( + "context" + "testing" + "time" + + v1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/util/intstr" + "k8s.io/apimachinery/pkg/util/wait" + "k8s.io/client-go/informers" + clientset "k8s.io/client-go/kubernetes" + restclient "k8s.io/client-go/rest" + "k8s.io/kubernetes/pkg/controller/endpoint" + "k8s.io/kubernetes/test/integration/framework" +) + +func TestEndpointUpdates(t *testing.T) { + masterConfig := framework.NewIntegrationTestMasterConfig() + _, server, closeFn := framework.RunAMaster(masterConfig) + defer closeFn() + + config := restclient.Config{Host: server.URL} + client, err := clientset.NewForConfig(&config) + if err != nil { + t.Fatalf("Error creating clientset: %v", err) + } + + informers := informers.NewSharedInformerFactory(client, 0) + + epController := endpoint.NewEndpointController( + informers.Core().V1().Pods(), + informers.Core().V1().Services(), + informers.Core().V1().Endpoints(), + client, + 0) + + // Start informer and controllers + stopCh := make(chan struct{}) + defer close(stopCh) + informers.Start(stopCh) + go epController.Run(1, stopCh) + + // Create namespace + ns := framework.CreateTestingNamespace("test-endpoints-updates", server, t) + defer framework.DeleteTestingNamespace(ns, server, t) + + // Create a pod with labels + pod := &v1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + Name: "test-pod", + Namespace: ns.Name, + Labels: labelMap(), + }, + Spec: v1.PodSpec{ + NodeName: "fakenode", + Containers: []v1.Container{ + { + Name: "fake-name", + Image: "fakeimage", + }, + }, + }, + } + + createdPod, err := client.CoreV1().Pods(ns.Name).Create(context.TODO(), pod, metav1.CreateOptions{}) + if err != nil { + t.Fatalf("Failed to create pod %s: %v", pod.Name, err) + } + + // Set pod IPs + createdPod.Status = v1.PodStatus{ + Phase: v1.PodRunning, + PodIPs: []v1.PodIP{{IP: "1.1.1.1"}, {IP: "2001:db8::"}}, + } + _, err = client.CoreV1().Pods(ns.Name).UpdateStatus(context.TODO(), createdPod, metav1.UpdateOptions{}) + if err != nil { + t.Fatalf("Failed to update status of pod %s: %v", pod.Name, err) + } + + // Create a service associated to the pod + svc := newService(ns.Name, "foo1") + svc1, err := client.CoreV1().Services(ns.Name).Create(context.TODO(), svc, metav1.CreateOptions{}) + if err != nil { + t.Fatalf("Failed to create service %s: %v", svc.Name, err) + } + + // Obtain ResourceVersion of the new endpoint created + var resVersion string + if err := wait.PollImmediate(1*time.Second, wait.ForeverTestTimeout, func() (bool, error) { + endpoints, err := client.CoreV1().Endpoints(ns.Name).Get(context.TODO(), svc.Name, metav1.GetOptions{}) + if err != nil { + t.Logf("error fetching endpoints: %v", err) + return false, nil + } + resVersion = endpoints.ObjectMeta.ResourceVersion + return true, nil + }); err != nil { + t.Fatalf("endpoints not found: %v", err) + } + + // Force recomputation on the endpoint controller + svc1.SetAnnotations(map[string]string{"foo": "bar"}) + _, err = client.CoreV1().Services(ns.Name).Update(context.TODO(), svc1, metav1.UpdateOptions{}) + if err != nil { + t.Fatalf("Failed to update service %s: %v", svc1.Name, err) + } + + // Create a new service and wait until it has been processed, + // this way we can be sure that the endpoint for the original service + // was recomputed before asserting, since we only have 1 worker + // in the endpoint controller + svc2 := newService(ns.Name, "foo2") + _, err = client.CoreV1().Services(ns.Name).Create(context.TODO(), svc2, metav1.CreateOptions{}) + if err != nil { + t.Fatalf("Failed to create service %s: %v", svc.Name, err) + } + + if err := wait.PollImmediate(1*time.Second, wait.ForeverTestTimeout, func() (bool, error) { + _, err := client.CoreV1().Endpoints(ns.Name).Get(context.TODO(), svc2.Name, metav1.GetOptions{}) + if err != nil { + t.Logf("error fetching endpoints: %v", err) + return false, nil + } + return true, nil + }); err != nil { + t.Fatalf("endpoints not found: %v", err) + } + + // the endpoint controller should not update the endpoint created for the original + // service since nothing has changed, the resource version has to be the same + endpoints, err := client.CoreV1().Endpoints(ns.Name).Get(context.TODO(), svc.Name, metav1.GetOptions{}) + if err != nil { + t.Fatalf("error fetching endpoints: %v", err) + } + if resVersion != endpoints.ObjectMeta.ResourceVersion { + t.Fatalf("endpoints resource version does not match, expected %s received %s", resVersion, endpoints.ObjectMeta.ResourceVersion) + } + +} + +func labelMap() map[string]string { + return map[string]string{"foo": "bar"} +} + +// newService returns a service with selector and exposing ports +func newService(namespace, name string) *v1.Service { + return &v1.Service{ + ObjectMeta: metav1.ObjectMeta{ + Name: name, + Namespace: namespace, + Labels: labelMap(), + }, + Spec: v1.ServiceSpec{ + Selector: labelMap(), + Ports: []v1.ServicePort{ + {Name: "port-1338", Port: 1338, Protocol: "TCP", TargetPort: intstr.FromInt(1338)}, + {Name: "port-1337", Port: 1337, Protocol: "TCP", TargetPort: intstr.FromInt(1337)}, + }, + }, + } + +} diff --git a/test/integration/endpoints/main_test.go b/test/integration/endpoints/main_test.go new file mode 100644 index 00000000000..7138b8ae78d --- /dev/null +++ b/test/integration/endpoints/main_test.go @@ -0,0 +1,27 @@ +/* +Copyright 2020 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package endpoints + +import ( + "testing" + + "k8s.io/kubernetes/test/integration/framework" +) + +func TestMain(m *testing.M) { + framework.EtcdMain(m.Run) +}