mirror of
https://github.com/k3s-io/kubernetes.git
synced 2025-07-25 12:43:23 +00:00
Merge pull request #94112 from damemi/sort-endpoints
Remove canonicalization of endpoints by endpoints controller for better comparison
This commit is contained in:
commit
1f708f6e62
@ -472,9 +472,18 @@ func (e *EndpointController) syncService(key string) error {
|
|||||||
|
|
||||||
createEndpoints := len(currentEndpoints.ResourceVersion) == 0
|
createEndpoints := len(currentEndpoints.ResourceVersion) == 0
|
||||||
|
|
||||||
|
// Compare the sorted subsets and labels
|
||||||
|
// Remove the HeadlessService label from the endpoints if it exists,
|
||||||
|
// as this won't be set on the service itself
|
||||||
|
// and will cause a false negative in this diff check.
|
||||||
|
// But first check if it has that label to avoid expensive copies.
|
||||||
|
compareLabels := currentEndpoints.Labels
|
||||||
|
if _, ok := currentEndpoints.Labels[v1.IsHeadlessService]; ok {
|
||||||
|
compareLabels = utillabels.CloneAndRemoveLabel(currentEndpoints.Labels, v1.IsHeadlessService)
|
||||||
|
}
|
||||||
if !createEndpoints &&
|
if !createEndpoints &&
|
||||||
apiequality.Semantic.DeepEqual(currentEndpoints.Subsets, subsets) &&
|
apiequality.Semantic.DeepEqual(currentEndpoints.Subsets, subsets) &&
|
||||||
apiequality.Semantic.DeepEqual(currentEndpoints.Labels, service.Labels) {
|
apiequality.Semantic.DeepEqual(compareLabels, service.Labels) {
|
||||||
klog.V(5).Infof("endpoints are equal for %s/%s, skipping update", service.Namespace, service.Name)
|
klog.V(5).Infof("endpoints are equal for %s/%s, skipping update", service.Namespace, service.Name)
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
@ -383,6 +383,33 @@ func TestSyncEndpointsProtocolTCP(t *testing.T) {
|
|||||||
endpointsHandler.ValidateRequest(t, "/api/v1/namespaces/"+ns+"/endpoints/foo", "PUT", &data)
|
endpointsHandler.ValidateRequest(t, "/api/v1/namespaces/"+ns+"/endpoints/foo", "PUT", &data)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func TestSyncEndpointsHeadlessServiceLabel(t *testing.T) {
|
||||||
|
ns := metav1.NamespaceDefault
|
||||||
|
testServer, endpointsHandler := makeTestServer(t, ns)
|
||||||
|
defer testServer.Close()
|
||||||
|
endpoints := newController(testServer.URL, 0*time.Second)
|
||||||
|
endpoints.endpointsStore.Add(&v1.Endpoints{
|
||||||
|
ObjectMeta: metav1.ObjectMeta{
|
||||||
|
Name: "foo",
|
||||||
|
Namespace: ns,
|
||||||
|
ResourceVersion: "1",
|
||||||
|
Labels: map[string]string{
|
||||||
|
v1.IsHeadlessService: "",
|
||||||
|
},
|
||||||
|
},
|
||||||
|
Subsets: []v1.EndpointSubset{},
|
||||||
|
})
|
||||||
|
endpoints.serviceStore.Add(&v1.Service{
|
||||||
|
ObjectMeta: metav1.ObjectMeta{Name: "foo", Namespace: ns},
|
||||||
|
Spec: v1.ServiceSpec{
|
||||||
|
Selector: map[string]string{"foo": "bar"},
|
||||||
|
Ports: []v1.ServicePort{{Port: 80}},
|
||||||
|
},
|
||||||
|
})
|
||||||
|
endpoints.syncService(ns + "/foo")
|
||||||
|
endpointsHandler.ValidateRequestCount(t, 0)
|
||||||
|
}
|
||||||
|
|
||||||
func TestSyncEndpointsProtocolUDP(t *testing.T) {
|
func TestSyncEndpointsProtocolUDP(t *testing.T) {
|
||||||
ns := "other"
|
ns := "other"
|
||||||
testServer, endpointsHandler := makeTestServer(t, ns)
|
testServer, endpointsHandler := makeTestServer(t, ns)
|
||||||
|
@ -13,7 +13,6 @@ go_library(
|
|||||||
],
|
],
|
||||||
importpath = "k8s.io/kubernetes/pkg/registry/core/endpoint",
|
importpath = "k8s.io/kubernetes/pkg/registry/core/endpoint",
|
||||||
deps = [
|
deps = [
|
||||||
"//pkg/api/endpoints:go_default_library",
|
|
||||||
"//pkg/api/legacyscheme:go_default_library",
|
"//pkg/api/legacyscheme:go_default_library",
|
||||||
"//pkg/apis/core:go_default_library",
|
"//pkg/apis/core:go_default_library",
|
||||||
"//pkg/apis/core/validation:go_default_library",
|
"//pkg/apis/core/validation:go_default_library",
|
||||||
|
@ -22,7 +22,6 @@ import (
|
|||||||
"k8s.io/apimachinery/pkg/runtime"
|
"k8s.io/apimachinery/pkg/runtime"
|
||||||
"k8s.io/apimachinery/pkg/util/validation/field"
|
"k8s.io/apimachinery/pkg/util/validation/field"
|
||||||
"k8s.io/apiserver/pkg/storage/names"
|
"k8s.io/apiserver/pkg/storage/names"
|
||||||
endptspkg "k8s.io/kubernetes/pkg/api/endpoints"
|
|
||||||
"k8s.io/kubernetes/pkg/api/legacyscheme"
|
"k8s.io/kubernetes/pkg/api/legacyscheme"
|
||||||
api "k8s.io/kubernetes/pkg/apis/core"
|
api "k8s.io/kubernetes/pkg/apis/core"
|
||||||
"k8s.io/kubernetes/pkg/apis/core/validation"
|
"k8s.io/kubernetes/pkg/apis/core/validation"
|
||||||
@ -60,8 +59,6 @@ func (endpointsStrategy) Validate(ctx context.Context, obj runtime.Object) field
|
|||||||
|
|
||||||
// Canonicalize normalizes the object after validation.
|
// Canonicalize normalizes the object after validation.
|
||||||
func (endpointsStrategy) Canonicalize(obj runtime.Object) {
|
func (endpointsStrategy) Canonicalize(obj runtime.Object) {
|
||||||
endpoints := obj.(*api.Endpoints)
|
|
||||||
endpoints.Subsets = endptspkg.RepackSubsets(endpoints.Subsets)
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// AllowCreateOnUpdate is true for endpoints.
|
// AllowCreateOnUpdate is true for endpoints.
|
||||||
|
@ -50,6 +50,7 @@ filegroup(
|
|||||||
"//test/integration/deployment:all-srcs",
|
"//test/integration/deployment:all-srcs",
|
||||||
"//test/integration/disruption:all-srcs",
|
"//test/integration/disruption:all-srcs",
|
||||||
"//test/integration/dryrun:all-srcs",
|
"//test/integration/dryrun:all-srcs",
|
||||||
|
"//test/integration/endpoints:all-srcs",
|
||||||
"//test/integration/endpointslice:all-srcs",
|
"//test/integration/endpointslice:all-srcs",
|
||||||
"//test/integration/etcd:all-srcs",
|
"//test/integration/etcd:all-srcs",
|
||||||
"//test/integration/events:all-srcs",
|
"//test/integration/events:all-srcs",
|
||||||
|
35
test/integration/endpoints/BUILD
Normal file
35
test/integration/endpoints/BUILD
Normal file
@ -0,0 +1,35 @@
|
|||||||
|
load("@io_bazel_rules_go//go:def.bzl", "go_test")
|
||||||
|
|
||||||
|
go_test(
|
||||||
|
name = "go_default_test",
|
||||||
|
srcs = [
|
||||||
|
"endpoints_test.go",
|
||||||
|
"main_test.go",
|
||||||
|
],
|
||||||
|
tags = ["integration"],
|
||||||
|
deps = [
|
||||||
|
"//pkg/controller/endpoint:go_default_library",
|
||||||
|
"//staging/src/k8s.io/api/core/v1:go_default_library",
|
||||||
|
"//staging/src/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library",
|
||||||
|
"//staging/src/k8s.io/apimachinery/pkg/util/intstr:go_default_library",
|
||||||
|
"//staging/src/k8s.io/apimachinery/pkg/util/wait:go_default_library",
|
||||||
|
"//staging/src/k8s.io/client-go/informers:go_default_library",
|
||||||
|
"//staging/src/k8s.io/client-go/kubernetes:go_default_library",
|
||||||
|
"//staging/src/k8s.io/client-go/rest:go_default_library",
|
||||||
|
"//test/integration/framework:go_default_library",
|
||||||
|
],
|
||||||
|
)
|
||||||
|
|
||||||
|
filegroup(
|
||||||
|
name = "package-srcs",
|
||||||
|
srcs = glob(["**"]),
|
||||||
|
tags = ["automanaged"],
|
||||||
|
visibility = ["//visibility:private"],
|
||||||
|
)
|
||||||
|
|
||||||
|
filegroup(
|
||||||
|
name = "all-srcs",
|
||||||
|
srcs = [":package-srcs"],
|
||||||
|
tags = ["automanaged"],
|
||||||
|
visibility = ["//visibility:public"],
|
||||||
|
)
|
180
test/integration/endpoints/endpoints_test.go
Normal file
180
test/integration/endpoints/endpoints_test.go
Normal file
@ -0,0 +1,180 @@
|
|||||||
|
/*
|
||||||
|
Copyright 2020 The Kubernetes Authors.
|
||||||
|
|
||||||
|
Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
|
you may not use this file except in compliance with the License.
|
||||||
|
You may obtain a copy of the License at
|
||||||
|
|
||||||
|
http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
|
||||||
|
Unless required by applicable law or agreed to in writing, software
|
||||||
|
distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
See the License for the specific language governing permissions and
|
||||||
|
limitations under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package endpoints
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
"testing"
|
||||||
|
"time"
|
||||||
|
|
||||||
|
v1 "k8s.io/api/core/v1"
|
||||||
|
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||||
|
"k8s.io/apimachinery/pkg/util/intstr"
|
||||||
|
"k8s.io/apimachinery/pkg/util/wait"
|
||||||
|
"k8s.io/client-go/informers"
|
||||||
|
clientset "k8s.io/client-go/kubernetes"
|
||||||
|
restclient "k8s.io/client-go/rest"
|
||||||
|
"k8s.io/kubernetes/pkg/controller/endpoint"
|
||||||
|
"k8s.io/kubernetes/test/integration/framework"
|
||||||
|
)
|
||||||
|
|
||||||
|
func TestEndpointUpdates(t *testing.T) {
|
||||||
|
masterConfig := framework.NewIntegrationTestMasterConfig()
|
||||||
|
_, server, closeFn := framework.RunAMaster(masterConfig)
|
||||||
|
defer closeFn()
|
||||||
|
|
||||||
|
config := restclient.Config{Host: server.URL}
|
||||||
|
client, err := clientset.NewForConfig(&config)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("Error creating clientset: %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
informers := informers.NewSharedInformerFactory(client, 0)
|
||||||
|
|
||||||
|
epController := endpoint.NewEndpointController(
|
||||||
|
informers.Core().V1().Pods(),
|
||||||
|
informers.Core().V1().Services(),
|
||||||
|
informers.Core().V1().Endpoints(),
|
||||||
|
client,
|
||||||
|
0)
|
||||||
|
|
||||||
|
// Start informer and controllers
|
||||||
|
stopCh := make(chan struct{})
|
||||||
|
defer close(stopCh)
|
||||||
|
informers.Start(stopCh)
|
||||||
|
go epController.Run(1, stopCh)
|
||||||
|
|
||||||
|
// Create namespace
|
||||||
|
ns := framework.CreateTestingNamespace("test-endpoints-updates", server, t)
|
||||||
|
defer framework.DeleteTestingNamespace(ns, server, t)
|
||||||
|
|
||||||
|
// Create a pod with labels
|
||||||
|
pod := &v1.Pod{
|
||||||
|
ObjectMeta: metav1.ObjectMeta{
|
||||||
|
Name: "test-pod",
|
||||||
|
Namespace: ns.Name,
|
||||||
|
Labels: labelMap(),
|
||||||
|
},
|
||||||
|
Spec: v1.PodSpec{
|
||||||
|
NodeName: "fakenode",
|
||||||
|
Containers: []v1.Container{
|
||||||
|
{
|
||||||
|
Name: "fake-name",
|
||||||
|
Image: "fakeimage",
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
}
|
||||||
|
|
||||||
|
createdPod, err := client.CoreV1().Pods(ns.Name).Create(context.TODO(), pod, metav1.CreateOptions{})
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("Failed to create pod %s: %v", pod.Name, err)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Set pod IPs
|
||||||
|
createdPod.Status = v1.PodStatus{
|
||||||
|
Phase: v1.PodRunning,
|
||||||
|
PodIPs: []v1.PodIP{{IP: "1.1.1.1"}, {IP: "2001:db8::"}},
|
||||||
|
}
|
||||||
|
_, err = client.CoreV1().Pods(ns.Name).UpdateStatus(context.TODO(), createdPod, metav1.UpdateOptions{})
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("Failed to update status of pod %s: %v", pod.Name, err)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Create a service associated to the pod
|
||||||
|
svc := newService(ns.Name, "foo1")
|
||||||
|
svc1, err := client.CoreV1().Services(ns.Name).Create(context.TODO(), svc, metav1.CreateOptions{})
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("Failed to create service %s: %v", svc.Name, err)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Obtain ResourceVersion of the new endpoint created
|
||||||
|
var resVersion string
|
||||||
|
if err := wait.PollImmediate(1*time.Second, wait.ForeverTestTimeout, func() (bool, error) {
|
||||||
|
endpoints, err := client.CoreV1().Endpoints(ns.Name).Get(context.TODO(), svc.Name, metav1.GetOptions{})
|
||||||
|
if err != nil {
|
||||||
|
t.Logf("error fetching endpoints: %v", err)
|
||||||
|
return false, nil
|
||||||
|
}
|
||||||
|
resVersion = endpoints.ObjectMeta.ResourceVersion
|
||||||
|
return true, nil
|
||||||
|
}); err != nil {
|
||||||
|
t.Fatalf("endpoints not found: %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Force recomputation on the endpoint controller
|
||||||
|
svc1.SetAnnotations(map[string]string{"foo": "bar"})
|
||||||
|
_, err = client.CoreV1().Services(ns.Name).Update(context.TODO(), svc1, metav1.UpdateOptions{})
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("Failed to update service %s: %v", svc1.Name, err)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Create a new service and wait until it has been processed,
|
||||||
|
// this way we can be sure that the endpoint for the original service
|
||||||
|
// was recomputed before asserting, since we only have 1 worker
|
||||||
|
// in the endpoint controller
|
||||||
|
svc2 := newService(ns.Name, "foo2")
|
||||||
|
_, err = client.CoreV1().Services(ns.Name).Create(context.TODO(), svc2, metav1.CreateOptions{})
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("Failed to create service %s: %v", svc.Name, err)
|
||||||
|
}
|
||||||
|
|
||||||
|
if err := wait.PollImmediate(1*time.Second, wait.ForeverTestTimeout, func() (bool, error) {
|
||||||
|
_, err := client.CoreV1().Endpoints(ns.Name).Get(context.TODO(), svc2.Name, metav1.GetOptions{})
|
||||||
|
if err != nil {
|
||||||
|
t.Logf("error fetching endpoints: %v", err)
|
||||||
|
return false, nil
|
||||||
|
}
|
||||||
|
return true, nil
|
||||||
|
}); err != nil {
|
||||||
|
t.Fatalf("endpoints not found: %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
// the endpoint controller should not update the endpoint created for the original
|
||||||
|
// service since nothing has changed, the resource version has to be the same
|
||||||
|
endpoints, err := client.CoreV1().Endpoints(ns.Name).Get(context.TODO(), svc.Name, metav1.GetOptions{})
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("error fetching endpoints: %v", err)
|
||||||
|
}
|
||||||
|
if resVersion != endpoints.ObjectMeta.ResourceVersion {
|
||||||
|
t.Fatalf("endpoints resource version does not match, expected %s received %s", resVersion, endpoints.ObjectMeta.ResourceVersion)
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
func labelMap() map[string]string {
|
||||||
|
return map[string]string{"foo": "bar"}
|
||||||
|
}
|
||||||
|
|
||||||
|
// newService returns a service with selector and exposing ports
|
||||||
|
func newService(namespace, name string) *v1.Service {
|
||||||
|
return &v1.Service{
|
||||||
|
ObjectMeta: metav1.ObjectMeta{
|
||||||
|
Name: name,
|
||||||
|
Namespace: namespace,
|
||||||
|
Labels: labelMap(),
|
||||||
|
},
|
||||||
|
Spec: v1.ServiceSpec{
|
||||||
|
Selector: labelMap(),
|
||||||
|
Ports: []v1.ServicePort{
|
||||||
|
{Name: "port-1338", Port: 1338, Protocol: "TCP", TargetPort: intstr.FromInt(1338)},
|
||||||
|
{Name: "port-1337", Port: 1337, Protocol: "TCP", TargetPort: intstr.FromInt(1337)},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
27
test/integration/endpoints/main_test.go
Normal file
27
test/integration/endpoints/main_test.go
Normal file
@ -0,0 +1,27 @@
|
|||||||
|
/*
|
||||||
|
Copyright 2020 The Kubernetes Authors.
|
||||||
|
|
||||||
|
Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
|
you may not use this file except in compliance with the License.
|
||||||
|
You may obtain a copy of the License at
|
||||||
|
|
||||||
|
http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
|
||||||
|
Unless required by applicable law or agreed to in writing, software
|
||||||
|
distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
See the License for the specific language governing permissions and
|
||||||
|
limitations under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package endpoints
|
||||||
|
|
||||||
|
import (
|
||||||
|
"testing"
|
||||||
|
|
||||||
|
"k8s.io/kubernetes/test/integration/framework"
|
||||||
|
)
|
||||||
|
|
||||||
|
func TestMain(m *testing.M) {
|
||||||
|
framework.EtcdMain(m.Run)
|
||||||
|
}
|
Loading…
Reference in New Issue
Block a user