diff --git a/cmd/kube-proxy/app/BUILD b/cmd/kube-proxy/app/BUILD index 30b66837aa2..2d1593a1909 100644 --- a/cmd/kube-proxy/app/BUILD +++ b/cmd/kube-proxy/app/BUILD @@ -45,7 +45,9 @@ go_library( "//pkg/version/verflag:go_default_library", "//staging/src/k8s.io/api/core/v1:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library", + "//staging/src/k8s.io/apimachinery/pkg/labels:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/runtime:go_default_library", + "//staging/src/k8s.io/apimachinery/pkg/selection:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/util/runtime:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/util/wait:go_default_library", "//staging/src/k8s.io/apiserver/pkg/server/healthz:go_default_library", diff --git a/cmd/kube-proxy/app/server.go b/cmd/kube-proxy/app/server.go index 13a751df477..374831fa20b 100644 --- a/cmd/kube-proxy/app/server.go +++ b/cmd/kube-proxy/app/server.go @@ -30,7 +30,9 @@ import ( v1 "k8s.io/api/core/v1" v1meta "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/labels" "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/selection" utilruntime "k8s.io/apimachinery/pkg/util/runtime" "k8s.io/apimachinery/pkg/util/wait" "k8s.io/apiserver/pkg/server/healthz" @@ -603,9 +605,22 @@ func (s *ProxyServer) Run() error { } } + noProxyName, err := labels.NewRequirement(apis.LabelServiceProxyName, selection.DoesNotExist, nil) + if err != nil { + return err + } + + noHeadlessEndpoints, err := labels.NewRequirement(v1.IsHeadlessService, selection.DoesNotExist, nil) + if err != nil { + return err + } + + labelSelector := labels.NewSelector() + labelSelector = labelSelector.Add(*noProxyName, *noHeadlessEndpoints) + informerFactory := informers.NewSharedInformerFactoryWithOptions(s.Client, s.ConfigSyncPeriod, informers.WithTweakListOptions(func(options *v1meta.ListOptions) { - options.LabelSelector = "!" + apis.LabelServiceProxyName + options.LabelSelector = labelSelector.String() })) // Create configs (i.e. Watches for Services and Endpoints) diff --git a/pkg/controller/endpoint/BUILD b/pkg/controller/endpoint/BUILD index d8d39d4a451..819759f4d1e 100644 --- a/pkg/controller/endpoint/BUILD +++ b/pkg/controller/endpoint/BUILD @@ -18,6 +18,7 @@ go_library( "//pkg/api/v1/endpoints:go_default_library", "//pkg/api/v1/pod:go_default_library", "//pkg/apis/core:go_default_library", + "//pkg/apis/core/v1/helper:go_default_library", "//pkg/controller:go_default_library", "//pkg/util/metrics:go_default_library", "//staging/src/k8s.io/api/core/v1:go_default_library", diff --git a/pkg/controller/endpoint/endpoints_controller.go b/pkg/controller/endpoint/endpoints_controller.go index b2157e75f70..9e51531a355 100644 --- a/pkg/controller/endpoint/endpoints_controller.go +++ b/pkg/controller/endpoint/endpoints_controller.go @@ -43,6 +43,7 @@ import ( "k8s.io/kubernetes/pkg/api/v1/endpoints" podutil "k8s.io/kubernetes/pkg/api/v1/pod" api "k8s.io/kubernetes/pkg/apis/core" + helper "k8s.io/kubernetes/pkg/apis/core/v1/helper" "k8s.io/kubernetes/pkg/controller" "k8s.io/kubernetes/pkg/util/metrics" ) @@ -55,8 +56,8 @@ const ( // 5ms, 10ms, 20ms, 40ms, 80ms, 160ms, 320ms, 640ms, 1.3s, 2.6s, 5.1s, 10.2s, 20.4s, 41s, 82s maxRetries = 15 - // An annotation on the Service denoting if the endpoints controller should - // go ahead and create endpoints for unready pods. This annotation is + // TolerateUnreadyEndpointsAnnotation is an annotation on the Service denoting if the endpoints + // controller should go ahead and create endpoints for unready pods. This annotation is // currently only used by StatefulSets, where we need the pod to be DNS // resolvable during initialization and termination. In this situation we // create a headless Service just for the StatefulSet, and clients shouldn't @@ -545,6 +546,16 @@ func (e *EndpointController) syncService(key string) error { delete(newEndpoints.Annotations, v1.EndpointsLastChangeTriggerTime) } + if newEndpoints.Labels == nil { + newEndpoints.Labels = make(map[string]string) + } + + if !helper.IsServiceIPSet(service) { + newEndpoints.Labels[v1.IsHeadlessService] = "" + } else { + delete(newEndpoints.Labels, v1.IsHeadlessService) + } + klog.V(4).Infof("Update endpoints for %v/%v, ready: %d not ready: %d", service.Namespace, service.Name, totalReadyEps, totalNotReadyEps) if createEndpoints { // No previous endpoints, create them diff --git a/pkg/controller/endpoint/endpoints_controller_test.go b/pkg/controller/endpoint/endpoints_controller_test.go index 2d1d200ed5f..28c1093d3f2 100644 --- a/pkg/controller/endpoint/endpoints_controller_test.go +++ b/pkg/controller/endpoint/endpoints_controller_test.go @@ -305,6 +305,9 @@ func TestSyncEndpointsProtocolTCP(t *testing.T) { Name: "foo", Namespace: ns, ResourceVersion: "1", + Labels: map[string]string{ + v1.IsHeadlessService: "", + }, }, Subsets: []v1.EndpointSubset{{ Addresses: []v1.EndpointAddress{{IP: "1.2.3.4", NodeName: &emptyNodeName, TargetRef: &v1.ObjectReference{Kind: "Pod", Name: "pod0", Namespace: ns}}}, @@ -346,6 +349,9 @@ func TestSyncEndpointsProtocolUDP(t *testing.T) { Name: "foo", Namespace: ns, ResourceVersion: "1", + Labels: map[string]string{ + v1.IsHeadlessService: "", + }, }, Subsets: []v1.EndpointSubset{{ Addresses: []v1.EndpointAddress{{IP: "1.2.3.4", NodeName: &emptyNodeName, TargetRef: &v1.ObjectReference{Kind: "Pod", Name: "pod0", Namespace: ns}}}, @@ -387,6 +393,9 @@ func TestSyncEndpointsProtocolSCTP(t *testing.T) { Name: "foo", Namespace: ns, ResourceVersion: "1", + Labels: map[string]string{ + v1.IsHeadlessService: "", + }, }, Subsets: []v1.EndpointSubset{{ Addresses: []v1.EndpointAddress{{IP: "1.2.3.4", NodeName: &emptyNodeName, TargetRef: &v1.ObjectReference{Kind: "Pod", Name: "pod0", Namespace: ns}}}, @@ -424,6 +433,9 @@ func TestSyncEndpointsItemsEmptySelectorSelectsAll(t *testing.T) { Name: "foo", Namespace: ns, ResourceVersion: "1", + Labels: map[string]string{ + v1.IsHeadlessService: "", + }, }, Subsets: []v1.EndpointSubset{{ Addresses: []v1.EndpointAddress{{IP: "1.2.3.4", NodeName: &emptyNodeName, TargetRef: &v1.ObjectReference{Kind: "Pod", Name: "pod0", Namespace: ns}}}, @@ -461,6 +473,9 @@ func TestSyncEndpointsItemsEmptySelectorSelectsAllNotReady(t *testing.T) { Name: "foo", Namespace: ns, ResourceVersion: "1", + Labels: map[string]string{ + v1.IsHeadlessService: "", + }, }, Subsets: []v1.EndpointSubset{{ NotReadyAddresses: []v1.EndpointAddress{{IP: "1.2.3.4", NodeName: &emptyNodeName, TargetRef: &v1.ObjectReference{Kind: "Pod", Name: "pod0", Namespace: ns}}}, @@ -498,6 +513,9 @@ func TestSyncEndpointsItemsEmptySelectorSelectsAllMixed(t *testing.T) { Name: "foo", Namespace: ns, ResourceVersion: "1", + Labels: map[string]string{ + v1.IsHeadlessService: "", + }, }, Subsets: []v1.EndpointSubset{{ Addresses: []v1.EndpointAddress{{IP: "1.2.3.4", NodeName: &emptyNodeName, TargetRef: &v1.ObjectReference{Kind: "Pod", Name: "pod0", Namespace: ns}}}, @@ -539,6 +557,9 @@ func TestSyncEndpointsItemsPreexisting(t *testing.T) { Name: "foo", Namespace: ns, ResourceVersion: "1", + Labels: map[string]string{ + v1.IsHeadlessService: "", + }, }, Subsets: []v1.EndpointSubset{{ Addresses: []v1.EndpointAddress{{IP: "1.2.3.4", NodeName: &emptyNodeName, TargetRef: &v1.ObjectReference{Kind: "Pod", Name: "pod0", Namespace: ns}}}, @@ -610,6 +631,9 @@ func TestSyncEndpointsItems(t *testing.T) { ObjectMeta: metav1.ObjectMeta{ ResourceVersion: "", Name: "foo", + Labels: map[string]string{ + v1.IsHeadlessService: "", + }, }, Subsets: endptspkg.SortSubsets(expectedSubsets), }) @@ -651,6 +675,8 @@ func TestSyncEndpointsItemsWithLabels(t *testing.T) { {Name: "port1", Port: 8088, Protocol: "TCP"}, }, }} + + serviceLabels[v1.IsHeadlessService] = "" data := runtime.EncodeOrDie(testapi.Default.Codec(), &v1.Endpoints{ ObjectMeta: metav1.ObjectMeta{ ResourceVersion: "", @@ -697,6 +723,7 @@ func TestSyncEndpointsItemsPreexistingLabelsChange(t *testing.T) { }) endpoints.syncService(ns + "/foo") + serviceLabels[v1.IsHeadlessService] = "" data := runtime.EncodeOrDie(testapi.Default.Codec(), &v1.Endpoints{ ObjectMeta: metav1.ObjectMeta{ Name: "foo", @@ -797,6 +824,9 @@ func TestSyncEndpointsHeadlessService(t *testing.T) { Name: "foo", Namespace: ns, ResourceVersion: "1", + Labels: map[string]string{ + v1.IsHeadlessService: "", + }, }, Subsets: []v1.EndpointSubset{{ Addresses: []v1.EndpointAddress{{IP: "1.2.3.4", NodeName: &emptyNodeName, TargetRef: &v1.ObjectReference{Kind: "Pod", Name: "pod0", Namespace: ns}}}, @@ -837,6 +867,9 @@ func TestSyncEndpointsItemsExcludeNotReadyPodsWithRestartPolicyNeverAndPhaseFail Name: "foo", Namespace: ns, ResourceVersion: "1", + Labels: map[string]string{ + v1.IsHeadlessService: "", + }, }, Subsets: []v1.EndpointSubset{}, }) @@ -873,6 +906,9 @@ func TestSyncEndpointsItemsExcludeNotReadyPodsWithRestartPolicyNeverAndPhaseSucc Name: "foo", Namespace: ns, ResourceVersion: "1", + Labels: map[string]string{ + v1.IsHeadlessService: "", + }, }, Subsets: []v1.EndpointSubset{}, }) @@ -909,6 +945,9 @@ func TestSyncEndpointsItemsExcludeNotReadyPodsWithRestartPolicyOnFailureAndPhase Name: "foo", Namespace: ns, ResourceVersion: "1", + Labels: map[string]string{ + v1.IsHeadlessService: "", + }, }, Subsets: []v1.EndpointSubset{}, }) @@ -934,6 +973,9 @@ func TestSyncEndpointsHeadlessWithoutPort(t *testing.T) { data := runtime.EncodeOrDie(testapi.Default.Codec(), &v1.Endpoints{ ObjectMeta: metav1.ObjectMeta{ Name: "foo", + Labels: map[string]string{ + v1.IsHeadlessService: "", + }, }, Subsets: []v1.EndpointSubset{{ Addresses: []v1.EndpointAddress{{IP: "1.2.3.4", NodeName: &emptyNodeName, TargetRef: &v1.ObjectReference{Kind: "Pod", Name: "pod0", Namespace: ns}}}, @@ -1222,6 +1264,9 @@ func TestLastTriggerChangeTimeAnnotation(t *testing.T) { Annotations: map[string]string{ v1.EndpointsLastChangeTriggerTime: triggerTimeString, }, + Labels: map[string]string{ + v1.IsHeadlessService: "", + }, }, Subsets: []v1.EndpointSubset{{ Addresses: []v1.EndpointAddress{{IP: "1.2.3.4", NodeName: &emptyNodeName, TargetRef: &v1.ObjectReference{Kind: "Pod", Name: "pod0", Namespace: ns}}}, @@ -1269,6 +1314,9 @@ func TestLastTriggerChangeTimeAnnotation_AnnotationOverridden(t *testing.T) { Annotations: map[string]string{ v1.EndpointsLastChangeTriggerTime: triggerTimeString, }, + Labels: map[string]string{ + v1.IsHeadlessService: "", + }, }, Subsets: []v1.EndpointSubset{{ Addresses: []v1.EndpointAddress{{IP: "1.2.3.4", NodeName: &emptyNodeName, TargetRef: &v1.ObjectReference{Kind: "Pod", Name: "pod0", Namespace: ns}}}, @@ -1314,7 +1362,9 @@ func TestLastTriggerChangeTimeAnnotation_AnnotationCleared(t *testing.T) { Name: "foo", Namespace: ns, ResourceVersion: "1", - Annotations: map[string]string{}, // Annotation not set anymore. + Labels: map[string]string{ + v1.IsHeadlessService: "", + }, // Annotation not set anymore. }, Subsets: []v1.EndpointSubset{{ Addresses: []v1.EndpointAddress{{IP: "1.2.3.4", NodeName: &emptyNodeName, TargetRef: &v1.ObjectReference{Kind: "Pod", Name: "pod0", Namespace: ns}}}, diff --git a/pkg/proxy/endpoints.go b/pkg/proxy/endpoints.go index dafd209857b..fa5610c270d 100644 --- a/pkg/proxy/endpoints.go +++ b/pkg/proxy/endpoints.go @@ -26,6 +26,7 @@ import ( "k8s.io/klog" v1 "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/types" "k8s.io/apimachinery/pkg/util/sets" "k8s.io/client-go/tools/record" diff --git a/staging/src/k8s.io/api/core/v1/well_known_labels.go b/staging/src/k8s.io/api/core/v1/well_known_labels.go index 4497760d3f6..3287fb51fc2 100644 --- a/staging/src/k8s.io/api/core/v1/well_known_labels.go +++ b/staging/src/k8s.io/api/core/v1/well_known_labels.go @@ -33,4 +33,10 @@ const ( // LabelNamespaceNodeRestriction is a forbidden label namespace that kubelets may not self-set when the NodeRestriction admission plugin is enabled LabelNamespaceNodeRestriction = "node-restriction.kubernetes.io" + + // IsHeadlessService is added by Controller to an Endpoint denoting if its parent + // Service is Headless. The existence of this label can be used further by other + // controllers and kube-proxy to check if the Endpoint objects should be replicated when + // using Headless Services + IsHeadlessService = "service.kubernetes.io/headless" )