From e3892371719724bd9269e0a18dc95c4d07bf179c Mon Sep 17 00:00:00 2001 From: Ricardo Pchevuzinske Katz Date: Mon, 19 Aug 2019 16:55:37 -0300 Subject: [PATCH 1/2] Remove watching Endpoints of Headless Services Signed-off-by: Ricardo Pchevuzinske Katz --- cmd/kube-proxy/app/BUILD | 2 + cmd/kube-proxy/app/server.go | 17 +++++- pkg/controller/endpoint/BUILD | 1 + .../endpoint/endpoints_controller.go | 15 +++++- .../endpoint/endpoints_controller_test.go | 52 ++++++++++++++++++- pkg/proxy/endpoints.go | 1 + .../k8s.io/api/core/v1/well_known_labels.go | 6 +++ 7 files changed, 90 insertions(+), 4 deletions(-) diff --git a/cmd/kube-proxy/app/BUILD b/cmd/kube-proxy/app/BUILD index 30b66837aa2..2d1593a1909 100644 --- a/cmd/kube-proxy/app/BUILD +++ b/cmd/kube-proxy/app/BUILD @@ -45,7 +45,9 @@ go_library( "//pkg/version/verflag:go_default_library", "//staging/src/k8s.io/api/core/v1:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library", + "//staging/src/k8s.io/apimachinery/pkg/labels:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/runtime:go_default_library", + "//staging/src/k8s.io/apimachinery/pkg/selection:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/util/runtime:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/util/wait:go_default_library", "//staging/src/k8s.io/apiserver/pkg/server/healthz:go_default_library", diff --git a/cmd/kube-proxy/app/server.go b/cmd/kube-proxy/app/server.go index 13a751df477..374831fa20b 100644 --- a/cmd/kube-proxy/app/server.go +++ b/cmd/kube-proxy/app/server.go @@ -30,7 +30,9 @@ import ( v1 "k8s.io/api/core/v1" v1meta "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/labels" "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/selection" utilruntime "k8s.io/apimachinery/pkg/util/runtime" "k8s.io/apimachinery/pkg/util/wait" "k8s.io/apiserver/pkg/server/healthz" @@ -603,9 +605,22 @@ func (s *ProxyServer) Run() error { } } + noProxyName, err := labels.NewRequirement(apis.LabelServiceProxyName, selection.DoesNotExist, nil) + if err != nil { + return err + } + + noHeadlessEndpoints, err := labels.NewRequirement(v1.IsHeadlessService, selection.DoesNotExist, nil) + if err != nil { + return err + } + + labelSelector := labels.NewSelector() + labelSelector = labelSelector.Add(*noProxyName, *noHeadlessEndpoints) + informerFactory := informers.NewSharedInformerFactoryWithOptions(s.Client, s.ConfigSyncPeriod, informers.WithTweakListOptions(func(options *v1meta.ListOptions) { - options.LabelSelector = "!" + apis.LabelServiceProxyName + options.LabelSelector = labelSelector.String() })) // Create configs (i.e. Watches for Services and Endpoints) diff --git a/pkg/controller/endpoint/BUILD b/pkg/controller/endpoint/BUILD index d8d39d4a451..819759f4d1e 100644 --- a/pkg/controller/endpoint/BUILD +++ b/pkg/controller/endpoint/BUILD @@ -18,6 +18,7 @@ go_library( "//pkg/api/v1/endpoints:go_default_library", "//pkg/api/v1/pod:go_default_library", "//pkg/apis/core:go_default_library", + "//pkg/apis/core/v1/helper:go_default_library", "//pkg/controller:go_default_library", "//pkg/util/metrics:go_default_library", "//staging/src/k8s.io/api/core/v1:go_default_library", diff --git a/pkg/controller/endpoint/endpoints_controller.go b/pkg/controller/endpoint/endpoints_controller.go index b2157e75f70..9e51531a355 100644 --- a/pkg/controller/endpoint/endpoints_controller.go +++ b/pkg/controller/endpoint/endpoints_controller.go @@ -43,6 +43,7 @@ import ( "k8s.io/kubernetes/pkg/api/v1/endpoints" podutil "k8s.io/kubernetes/pkg/api/v1/pod" api "k8s.io/kubernetes/pkg/apis/core" + helper "k8s.io/kubernetes/pkg/apis/core/v1/helper" "k8s.io/kubernetes/pkg/controller" "k8s.io/kubernetes/pkg/util/metrics" ) @@ -55,8 +56,8 @@ const ( // 5ms, 10ms, 20ms, 40ms, 80ms, 160ms, 320ms, 640ms, 1.3s, 2.6s, 5.1s, 10.2s, 20.4s, 41s, 82s maxRetries = 15 - // An annotation on the Service denoting if the endpoints controller should - // go ahead and create endpoints for unready pods. This annotation is + // TolerateUnreadyEndpointsAnnotation is an annotation on the Service denoting if the endpoints + // controller should go ahead and create endpoints for unready pods. This annotation is // currently only used by StatefulSets, where we need the pod to be DNS // resolvable during initialization and termination. In this situation we // create a headless Service just for the StatefulSet, and clients shouldn't @@ -545,6 +546,16 @@ func (e *EndpointController) syncService(key string) error { delete(newEndpoints.Annotations, v1.EndpointsLastChangeTriggerTime) } + if newEndpoints.Labels == nil { + newEndpoints.Labels = make(map[string]string) + } + + if !helper.IsServiceIPSet(service) { + newEndpoints.Labels[v1.IsHeadlessService] = "" + } else { + delete(newEndpoints.Labels, v1.IsHeadlessService) + } + klog.V(4).Infof("Update endpoints for %v/%v, ready: %d not ready: %d", service.Namespace, service.Name, totalReadyEps, totalNotReadyEps) if createEndpoints { // No previous endpoints, create them diff --git a/pkg/controller/endpoint/endpoints_controller_test.go b/pkg/controller/endpoint/endpoints_controller_test.go index 2d1d200ed5f..28c1093d3f2 100644 --- a/pkg/controller/endpoint/endpoints_controller_test.go +++ b/pkg/controller/endpoint/endpoints_controller_test.go @@ -305,6 +305,9 @@ func TestSyncEndpointsProtocolTCP(t *testing.T) { Name: "foo", Namespace: ns, ResourceVersion: "1", + Labels: map[string]string{ + v1.IsHeadlessService: "", + }, }, Subsets: []v1.EndpointSubset{{ Addresses: []v1.EndpointAddress{{IP: "1.2.3.4", NodeName: &emptyNodeName, TargetRef: &v1.ObjectReference{Kind: "Pod", Name: "pod0", Namespace: ns}}}, @@ -346,6 +349,9 @@ func TestSyncEndpointsProtocolUDP(t *testing.T) { Name: "foo", Namespace: ns, ResourceVersion: "1", + Labels: map[string]string{ + v1.IsHeadlessService: "", + }, }, Subsets: []v1.EndpointSubset{{ Addresses: []v1.EndpointAddress{{IP: "1.2.3.4", NodeName: &emptyNodeName, TargetRef: &v1.ObjectReference{Kind: "Pod", Name: "pod0", Namespace: ns}}}, @@ -387,6 +393,9 @@ func TestSyncEndpointsProtocolSCTP(t *testing.T) { Name: "foo", Namespace: ns, ResourceVersion: "1", + Labels: map[string]string{ + v1.IsHeadlessService: "", + }, }, Subsets: []v1.EndpointSubset{{ Addresses: []v1.EndpointAddress{{IP: "1.2.3.4", NodeName: &emptyNodeName, TargetRef: &v1.ObjectReference{Kind: "Pod", Name: "pod0", Namespace: ns}}}, @@ -424,6 +433,9 @@ func TestSyncEndpointsItemsEmptySelectorSelectsAll(t *testing.T) { Name: "foo", Namespace: ns, ResourceVersion: "1", + Labels: map[string]string{ + v1.IsHeadlessService: "", + }, }, Subsets: []v1.EndpointSubset{{ Addresses: []v1.EndpointAddress{{IP: "1.2.3.4", NodeName: &emptyNodeName, TargetRef: &v1.ObjectReference{Kind: "Pod", Name: "pod0", Namespace: ns}}}, @@ -461,6 +473,9 @@ func TestSyncEndpointsItemsEmptySelectorSelectsAllNotReady(t *testing.T) { Name: "foo", Namespace: ns, ResourceVersion: "1", + Labels: map[string]string{ + v1.IsHeadlessService: "", + }, }, Subsets: []v1.EndpointSubset{{ NotReadyAddresses: []v1.EndpointAddress{{IP: "1.2.3.4", NodeName: &emptyNodeName, TargetRef: &v1.ObjectReference{Kind: "Pod", Name: "pod0", Namespace: ns}}}, @@ -498,6 +513,9 @@ func TestSyncEndpointsItemsEmptySelectorSelectsAllMixed(t *testing.T) { Name: "foo", Namespace: ns, ResourceVersion: "1", + Labels: map[string]string{ + v1.IsHeadlessService: "", + }, }, Subsets: []v1.EndpointSubset{{ Addresses: []v1.EndpointAddress{{IP: "1.2.3.4", NodeName: &emptyNodeName, TargetRef: &v1.ObjectReference{Kind: "Pod", Name: "pod0", Namespace: ns}}}, @@ -539,6 +557,9 @@ func TestSyncEndpointsItemsPreexisting(t *testing.T) { Name: "foo", Namespace: ns, ResourceVersion: "1", + Labels: map[string]string{ + v1.IsHeadlessService: "", + }, }, Subsets: []v1.EndpointSubset{{ Addresses: []v1.EndpointAddress{{IP: "1.2.3.4", NodeName: &emptyNodeName, TargetRef: &v1.ObjectReference{Kind: "Pod", Name: "pod0", Namespace: ns}}}, @@ -610,6 +631,9 @@ func TestSyncEndpointsItems(t *testing.T) { ObjectMeta: metav1.ObjectMeta{ ResourceVersion: "", Name: "foo", + Labels: map[string]string{ + v1.IsHeadlessService: "", + }, }, Subsets: endptspkg.SortSubsets(expectedSubsets), }) @@ -651,6 +675,8 @@ func TestSyncEndpointsItemsWithLabels(t *testing.T) { {Name: "port1", Port: 8088, Protocol: "TCP"}, }, }} + + serviceLabels[v1.IsHeadlessService] = "" data := runtime.EncodeOrDie(testapi.Default.Codec(), &v1.Endpoints{ ObjectMeta: metav1.ObjectMeta{ ResourceVersion: "", @@ -697,6 +723,7 @@ func TestSyncEndpointsItemsPreexistingLabelsChange(t *testing.T) { }) endpoints.syncService(ns + "/foo") + serviceLabels[v1.IsHeadlessService] = "" data := runtime.EncodeOrDie(testapi.Default.Codec(), &v1.Endpoints{ ObjectMeta: metav1.ObjectMeta{ Name: "foo", @@ -797,6 +824,9 @@ func TestSyncEndpointsHeadlessService(t *testing.T) { Name: "foo", Namespace: ns, ResourceVersion: "1", + Labels: map[string]string{ + v1.IsHeadlessService: "", + }, }, Subsets: []v1.EndpointSubset{{ Addresses: []v1.EndpointAddress{{IP: "1.2.3.4", NodeName: &emptyNodeName, TargetRef: &v1.ObjectReference{Kind: "Pod", Name: "pod0", Namespace: ns}}}, @@ -837,6 +867,9 @@ func TestSyncEndpointsItemsExcludeNotReadyPodsWithRestartPolicyNeverAndPhaseFail Name: "foo", Namespace: ns, ResourceVersion: "1", + Labels: map[string]string{ + v1.IsHeadlessService: "", + }, }, Subsets: []v1.EndpointSubset{}, }) @@ -873,6 +906,9 @@ func TestSyncEndpointsItemsExcludeNotReadyPodsWithRestartPolicyNeverAndPhaseSucc Name: "foo", Namespace: ns, ResourceVersion: "1", + Labels: map[string]string{ + v1.IsHeadlessService: "", + }, }, Subsets: []v1.EndpointSubset{}, }) @@ -909,6 +945,9 @@ func TestSyncEndpointsItemsExcludeNotReadyPodsWithRestartPolicyOnFailureAndPhase Name: "foo", Namespace: ns, ResourceVersion: "1", + Labels: map[string]string{ + v1.IsHeadlessService: "", + }, }, Subsets: []v1.EndpointSubset{}, }) @@ -934,6 +973,9 @@ func TestSyncEndpointsHeadlessWithoutPort(t *testing.T) { data := runtime.EncodeOrDie(testapi.Default.Codec(), &v1.Endpoints{ ObjectMeta: metav1.ObjectMeta{ Name: "foo", + Labels: map[string]string{ + v1.IsHeadlessService: "", + }, }, Subsets: []v1.EndpointSubset{{ Addresses: []v1.EndpointAddress{{IP: "1.2.3.4", NodeName: &emptyNodeName, TargetRef: &v1.ObjectReference{Kind: "Pod", Name: "pod0", Namespace: ns}}}, @@ -1222,6 +1264,9 @@ func TestLastTriggerChangeTimeAnnotation(t *testing.T) { Annotations: map[string]string{ v1.EndpointsLastChangeTriggerTime: triggerTimeString, }, + Labels: map[string]string{ + v1.IsHeadlessService: "", + }, }, Subsets: []v1.EndpointSubset{{ Addresses: []v1.EndpointAddress{{IP: "1.2.3.4", NodeName: &emptyNodeName, TargetRef: &v1.ObjectReference{Kind: "Pod", Name: "pod0", Namespace: ns}}}, @@ -1269,6 +1314,9 @@ func TestLastTriggerChangeTimeAnnotation_AnnotationOverridden(t *testing.T) { Annotations: map[string]string{ v1.EndpointsLastChangeTriggerTime: triggerTimeString, }, + Labels: map[string]string{ + v1.IsHeadlessService: "", + }, }, Subsets: []v1.EndpointSubset{{ Addresses: []v1.EndpointAddress{{IP: "1.2.3.4", NodeName: &emptyNodeName, TargetRef: &v1.ObjectReference{Kind: "Pod", Name: "pod0", Namespace: ns}}}, @@ -1314,7 +1362,9 @@ func TestLastTriggerChangeTimeAnnotation_AnnotationCleared(t *testing.T) { Name: "foo", Namespace: ns, ResourceVersion: "1", - Annotations: map[string]string{}, // Annotation not set anymore. + Labels: map[string]string{ + v1.IsHeadlessService: "", + }, // Annotation not set anymore. }, Subsets: []v1.EndpointSubset{{ Addresses: []v1.EndpointAddress{{IP: "1.2.3.4", NodeName: &emptyNodeName, TargetRef: &v1.ObjectReference{Kind: "Pod", Name: "pod0", Namespace: ns}}}, diff --git a/pkg/proxy/endpoints.go b/pkg/proxy/endpoints.go index dafd209857b..fa5610c270d 100644 --- a/pkg/proxy/endpoints.go +++ b/pkg/proxy/endpoints.go @@ -26,6 +26,7 @@ import ( "k8s.io/klog" v1 "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/types" "k8s.io/apimachinery/pkg/util/sets" "k8s.io/client-go/tools/record" diff --git a/staging/src/k8s.io/api/core/v1/well_known_labels.go b/staging/src/k8s.io/api/core/v1/well_known_labels.go index 4497760d3f6..3287fb51fc2 100644 --- a/staging/src/k8s.io/api/core/v1/well_known_labels.go +++ b/staging/src/k8s.io/api/core/v1/well_known_labels.go @@ -33,4 +33,10 @@ const ( // LabelNamespaceNodeRestriction is a forbidden label namespace that kubelets may not self-set when the NodeRestriction admission plugin is enabled LabelNamespaceNodeRestriction = "node-restriction.kubernetes.io" + + // IsHeadlessService is added by Controller to an Endpoint denoting if its parent + // Service is Headless. The existence of this label can be used further by other + // controllers and kube-proxy to check if the Endpoint objects should be replicated when + // using Headless Services + IsHeadlessService = "service.kubernetes.io/headless" ) From 6c30d23e6c9f5a9563d71981b264d6f85c56e09e Mon Sep 17 00:00:00 2001 From: Ricardo Pchevuzinske Katz Date: Sun, 25 Aug 2019 17:27:08 -0300 Subject: [PATCH 2/2] Remove watching Endpoints of Headless Services Signed-off-by: Ricardo Pchevuzinske Katz --- test/e2e/network/service.go | 62 +++++++++++++++++++++++++++++++++++++ 1 file changed, 62 insertions(+) diff --git a/test/e2e/network/service.go b/test/e2e/network/service.go index 11ba96d4aa5..541569a030c 100644 --- a/test/e2e/network/service.go +++ b/test/e2e/network/service.go @@ -1865,6 +1865,68 @@ var _ = SIGDescribe("Services", func() { framework.ExpectNoError(e2eservice.VerifyServeHostnameServiceDown(cs, host, svcDisabledIP, servicePort)) }) + ginkgo.It("should implement service.kubernetes.io/headless", func() { + // this test uses e2essh.NodeSSHHosts that does not work if a Node only reports LegacyHostIP + framework.SkipUnlessProviderIs(framework.ProvidersWithSSH...) + // this test does not work if the Node does not support SSH Key + framework.SkipUnlessSSHKeyPresent() + + ns := f.Namespace.Name + numPods, servicePort := 3, defaultServeHostnameServicePort + serviceHeadlessLabels := map[string]string{v1.IsHeadlessService: ""} + + // We will create 2 services to test creating services in both states and also dynamic updates + // svcHeadless: Created with the label, will always be disabled. We create this early and + // test again late to make sure it never becomes available. + // svcHeadlessToggled: Created without the label then the label is toggled verifying reachability at each step. + + ginkgo.By("creating service-headless in namespace " + ns) + svcHeadless := getServeHostnameService("service-headless") + svcHeadless.ObjectMeta.Labels = serviceHeadlessLabels + // This should be improved, as we do not want a Headlesss Service to contain an IP... + _, svcHeadlessIP, err := e2eservice.StartServeHostnameService(cs, svcHeadless, ns, numPods) + framework.ExpectNoError(err, "failed to create replication controller with headless service: %s in the namespace: %s", svcHeadlessIP, ns) + + ginkgo.By("creating service in namespace " + ns) + svcHeadlessToggled := getServeHostnameService("service-headless-toggled") + podHeadlessToggledNames, svcHeadlessToggledIP, err := e2eservice.StartServeHostnameService(cs, svcHeadlessToggled, ns, numPods) + framework.ExpectNoError(err, "failed to create replication controller with service: %s in the namespace: %s", svcHeadlessToggledIP, ns) + + jig := e2eservice.NewTestJig(cs, svcHeadlessToggled.ObjectMeta.Name) + + hosts, err := e2essh.NodeSSHHosts(cs) + framework.ExpectNoError(err, "failed to find external/internal IPs for every node") + if len(hosts) == 0 { + e2elog.Failf("No ssh-able nodes") + } + host := hosts[0] + + ginkgo.By("verifying service is up") + framework.ExpectNoError(e2eservice.VerifyServeHostnameServiceUp(cs, ns, host, podHeadlessToggledNames, svcHeadlessToggledIP, servicePort)) + + ginkgo.By("verifying service-headless is not up") + framework.ExpectNoError(e2eservice.VerifyServeHostnameServiceDown(cs, host, svcHeadlessIP, servicePort)) + + ginkgo.By("adding service.kubernetes.io/headless label") + jig.UpdateServiceOrFail(ns, svcHeadlessToggled.ObjectMeta.Name, func(svc *v1.Service) { + svc.ObjectMeta.Labels = serviceHeadlessLabels + }) + + ginkgo.By("verifying service is not up") + framework.ExpectNoError(e2eservice.VerifyServeHostnameServiceDown(cs, host, svcHeadlessToggledIP, servicePort)) + + ginkgo.By("removing service.kubernetes.io/headless annotation") + jig.UpdateServiceOrFail(ns, svcHeadlessToggled.ObjectMeta.Name, func(svc *v1.Service) { + svc.ObjectMeta.Labels = nil + }) + + ginkgo.By("verifying service is up") + framework.ExpectNoError(e2eservice.VerifyServeHostnameServiceUp(cs, ns, host, podHeadlessToggledNames, svcHeadlessToggledIP, servicePort)) + + ginkgo.By("verifying service-headless is still not up") + framework.ExpectNoError(e2eservice.VerifyServeHostnameServiceDown(cs, host, svcHeadlessIP, servicePort)) + }) + ginkgo.It("should be rejected when no endpoints exist", func() { namespace := f.Namespace.Name serviceName := "no-pods"