From 5cb8e8e1d6dc8d3e06b86fccb2c5ad7aafdf6f37 Mon Sep 17 00:00:00 2001 From: bprashanth Date: Mon, 10 Oct 2016 11:31:12 -0700 Subject: [PATCH 1/5] Fix health check node port leak --- pkg/proxy/iptables/proxier.go | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/pkg/proxy/iptables/proxier.go b/pkg/proxy/iptables/proxier.go index 0feda54e011..c9a886004b1 100644 --- a/pkg/proxy/iptables/proxier.go +++ b/pkg/proxy/iptables/proxier.go @@ -455,18 +455,20 @@ func (proxier *Proxier) OnServiceUpdate(allServices []api.Service) { info.loadBalancerStatus = *api.LoadBalancerStatusDeepCopy(&service.Status.LoadBalancer) info.sessionAffinityType = service.Spec.SessionAffinity info.loadBalancerSourceRanges = service.Spec.LoadBalancerSourceRanges - info.onlyNodeLocalEndpoints = apiservice.NeedsHealthCheck(service) && featuregate.DefaultFeatureGate.ExternalTrafficLocalOnly() + info.onlyNodeLocalEndpoints = apiservice.NeedsHealthCheck(service) && featuregate.DefaultFeatureGate.ExternalTrafficLocalOnly() && (service.Spec.Type == api.ServiceTypeLoadBalancer || service.Spec.Type == api.ServiceTypeNodePort) if info.onlyNodeLocalEndpoints { p := apiservice.GetServiceHealthCheckNodePort(service) if p == 0 { glog.Errorf("Service does not contain necessary annotation %v", apiservice.AnnotationHealthCheckNodePort) } else { + glog.V(4).Infof("Adding health check for %+v, port %v", serviceName.NamespacedName, p) info.healthCheckNodePort = int(p) // Turn on healthcheck responder to listen on the health check nodePort healthcheck.AddServiceListener(serviceName.NamespacedName, info.healthCheckNodePort) } } else { + glog.V(4).Infof("Deleting health check for %+v", serviceName.NamespacedName) // Delete healthcheck responders, if any, previously listening for this service healthcheck.DeleteServiceListener(serviceName.NamespacedName, 0) } @@ -488,6 +490,7 @@ func (proxier *Proxier) OnServiceUpdate(allServices []api.Service) { if info.onlyNodeLocalEndpoints && info.healthCheckNodePort > 0 { // Remove ServiceListener health check nodePorts from the health checker // TODO - Stats + glog.V(4).Infof("Deleting health check for %+v, port %v", name.NamespacedName, info.healthCheckNodePort) healthcheck.DeleteServiceListener(name.NamespacedName, info.healthCheckNodePort) } } From a46a849b9e67014b5f3c0c10dd423c36eb06863a Mon Sep 17 00:00:00 2001 From: bprashanth Date: Mon, 3 Oct 2016 11:51:57 -0700 Subject: [PATCH 2/5] Promote source ip annotations to beta --- pkg/api/service/annotations.go | 56 ++++++++++++++++++-------- pkg/cloudprovider/providers/gce/gce.go | 4 +- pkg/proxy/iptables/proxier.go | 2 +- pkg/registry/core/service/rest.go | 32 ++++++++------- pkg/registry/core/service/rest_test.go | 30 +++++++------- 5 files changed, 74 insertions(+), 50 deletions(-) diff --git a/pkg/api/service/annotations.go b/pkg/api/service/annotations.go index bd13cc51664..4b45549983b 100644 --- a/pkg/api/service/annotations.go +++ b/pkg/api/service/annotations.go @@ -33,29 +33,44 @@ const ( // Not all cloud providers support this annotation, though AWS & GCE do. AnnotationLoadBalancerSourceRangesKey = "service.beta.kubernetes.io/load-balancer-source-ranges" - // AnnotationExternalTraffic An annotation that denotes if this Service desires to route external traffic to local - // endpoints only. This preserves Source IP and avoids a second hop. - AnnotationExternalTraffic = "service.alpha.kubernetes.io/external-traffic" // AnnotationValueExternalTrafficLocal Value of annotation to specify local endpoints behaviour AnnotationValueExternalTrafficLocal = "OnlyLocal" // AnnotationValueExternalTrafficGlobal Value of annotation to specify global (legacy) behaviour AnnotationValueExternalTrafficGlobal = "Global" - // AnnotationHealthCheckNodePort Annotation specifying the healthcheck nodePort for the service + + // TODO: The alpha annotations have been deprecated, remove them when we move this feature to GA. + + // AlphaAnnotationHealthCheckNodePort Annotation specifying the healthcheck nodePort for the service // If not specified, annotation is created by the service api backend with the allocated nodePort // Will use user-specified nodePort value if specified by the client - AnnotationHealthCheckNodePort = "service.alpha.kubernetes.io/healthcheck-nodeport" + AlphaAnnotationHealthCheckNodePort = "service.alpha.kubernetes.io/healthcheck-nodeport" + + // AlphaAnnotationExternalTraffic An annotation that denotes if this Service desires to route external traffic to local + // endpoints only. This preserves Source IP and avoids a second hop. + AlphaAnnotationExternalTraffic = "service.alpha.kubernetes.io/external-traffic" + + // BetaAnnotationHealthCheckNodePort is the beta version of AlphaAnnotationHealthCheckNodePort. + BetaAnnotationHealthCheckNodePort = "service.beta.kubernetes.io/healthcheck-nodeport" + + // BetaAnnotationExternalTraffic is the beta version of AlphaAnnotationExternalTraffic. + BetaAnnotationExternalTraffic = "service.beta.kubernetes.io/external-traffic" ) // NeedsHealthCheck Check service for health check annotations func NeedsHealthCheck(service *api.Service) bool { - if l, ok := service.Annotations[AnnotationExternalTraffic]; ok { - if l == AnnotationValueExternalTrafficLocal { - return true - } else if l == AnnotationValueExternalTrafficGlobal { - return false - } else { - glog.Errorf("Invalid value for annotation %v", AnnotationExternalTraffic) - return false + // First check the alpha annotation and then the beta. This is so existing + // Services continue to work till the user decides to transition to beta. + // If they transition to beta, there's no way to go back to alpha without + // rolling back the cluster. + for _, annotation := range []string{AlphaAnnotationExternalTraffic, BetaAnnotationExternalTraffic} { + if l, ok := service.Annotations[annotation]; ok { + if l == AnnotationValueExternalTrafficLocal { + return true + } else if l == AnnotationValueExternalTrafficGlobal { + return false + } else { + glog.Errorf("Invalid value for annotation %v: %v", annotation, l) + } } } return false @@ -63,12 +78,19 @@ func NeedsHealthCheck(service *api.Service) bool { // GetServiceHealthCheckNodePort Return health check node port annotation for service, if one exists func GetServiceHealthCheckNodePort(service *api.Service) int32 { - if NeedsHealthCheck(service) { - if l, ok := service.Annotations[AnnotationHealthCheckNodePort]; ok { + if !NeedsHealthCheck(service) { + return 0 + } + // First check the alpha annotation and then the beta. This is so existing + // Services continue to work till the user decides to transition to beta. + // If they transition to beta, there's no way to go back to alpha without + // rolling back the cluster. + for _, annotation := range []string{AlphaAnnotationHealthCheckNodePort, BetaAnnotationHealthCheckNodePort} { + if l, ok := service.Annotations[annotation]; ok { p, err := strconv.Atoi(l) if err != nil { - glog.Errorf("Failed to parse annotation %v: %v", AnnotationHealthCheckNodePort, err) - return 0 + glog.Errorf("Failed to parse annotation %v: %v", annotation, err) + continue } return int32(p) } diff --git a/pkg/cloudprovider/providers/gce/gce.go b/pkg/cloudprovider/providers/gce/gce.go index 30eb35a2794..0de20d2b1d3 100644 --- a/pkg/cloudprovider/providers/gce/gce.go +++ b/pkg/cloudprovider/providers/gce/gce.go @@ -736,9 +736,7 @@ func (gce *GCECloud) EnsureLoadBalancer(clusterName string, apiService *api.Serv // This logic exists to detect a transition for a pre-existing service and turn on // the tpNeedsUpdate flag to delete/recreate fwdrule/tpool adding the health check // to the target pool. - glog.V(2).Infof("Annotation %s=%s added to new or pre-existing service", - apiservice.AnnotationExternalTraffic, - apiservice.AnnotationValueExternalTrafficLocal) + glog.V(2).Infof("Annotation external-traffic=OnlyLocal added to new or pre-existing service") tpNeedsUpdate = true } hcToCreate, err = gce.ensureHttpHealthCheck(loadBalancerName, path, healthCheckNodePort) diff --git a/pkg/proxy/iptables/proxier.go b/pkg/proxy/iptables/proxier.go index c9a886004b1..4f08a302b87 100644 --- a/pkg/proxy/iptables/proxier.go +++ b/pkg/proxy/iptables/proxier.go @@ -460,7 +460,7 @@ func (proxier *Proxier) OnServiceUpdate(allServices []api.Service) { p := apiservice.GetServiceHealthCheckNodePort(service) if p == 0 { glog.Errorf("Service does not contain necessary annotation %v", - apiservice.AnnotationHealthCheckNodePort) + apiservice.BetaAnnotationHealthCheckNodePort) } else { glog.V(4).Infof("Adding health check for %+v, port %v", serviceName.NamespacedName, p) info.healthCheckNodePort = int(p) diff --git a/pkg/registry/core/service/rest.go b/pkg/registry/core/service/rest.go index 37886a97336..10ef74d8f62 100644 --- a/pkg/registry/core/service/rest.go +++ b/pkg/registry/core/service/rest.go @@ -161,10 +161,10 @@ func (rs *REST) Create(ctx api.Context, obj runtime.Object) (runtime.Object, err if shouldCheckOrAssignHealthCheckNodePort(service) { var healthCheckNodePort int var err error - if l, ok := service.Annotations[apiservice.AnnotationHealthCheckNodePort]; ok { + if l, ok := service.Annotations[apiservice.BetaAnnotationHealthCheckNodePort]; ok { healthCheckNodePort, err = strconv.Atoi(l) if err != nil || healthCheckNodePort <= 0 { - return nil, errors.NewInternalError(fmt.Errorf("Failed to parse annotation %v: %v", apiservice.AnnotationHealthCheckNodePort, err)) + return nil, errors.NewInternalError(fmt.Errorf("Failed to parse annotation %v: %v", apiservice.BetaAnnotationHealthCheckNodePort, err)) } } if healthCheckNodePort > 0 { @@ -183,7 +183,7 @@ func (rs *REST) Create(ctx api.Context, obj runtime.Object) (runtime.Object, err return nil, errors.NewInternalError(fmt.Errorf("failed to allocate a nodePort: %v", err)) } // Insert the newly allocated health check port as an annotation (plan of record for Alpha) - service.Annotations[apiservice.AnnotationHealthCheckNodePort] = fmt.Sprintf("%d", healthCheckNodePort) + service.Annotations[apiservice.BetaAnnotationHealthCheckNodePort] = fmt.Sprintf("%d", healthCheckNodePort) } } @@ -313,7 +313,7 @@ func (rs *REST) healthCheckNodePortUpdate(oldService, service *api.Service) (boo errmsg := fmt.Sprintf("Failed to allocate requested HealthCheck nodePort %v:%v", requestedHealthCheckNodePort, err) el := field.ErrorList{field.Invalid(field.NewPath("metadata", "annotations"), - apiservice.AnnotationHealthCheckNodePort, errmsg)} + apiservice.BetaAnnotationHealthCheckNodePort, errmsg)} return false, errors.NewInvalid(api.Kind("Service"), service.Name, el) } glog.Infof("Reserved user requested nodePort: %d", requestedHealthCheckNodePort) @@ -327,7 +327,7 @@ func (rs *REST) healthCheckNodePortUpdate(oldService, service *api.Service) (boo return false, errors.NewInternalError(fmt.Errorf("failed to allocate a nodePort: %v", err)) } // Insert the newly allocated health check port as an annotation (plan of record for Alpha) - service.Annotations[apiservice.AnnotationHealthCheckNodePort] = fmt.Sprintf("%d", healthCheckNodePort) + service.Annotations[apiservice.BetaAnnotationHealthCheckNodePort] = fmt.Sprintf("%d", healthCheckNodePort) glog.Infof("Reserved health check nodePort: %d", healthCheckNodePort) } @@ -338,29 +338,33 @@ func (rs *REST) healthCheckNodePortUpdate(oldService, service *api.Service) (boo glog.Warningf("Error releasing service health check %s node port %d: %v", service.Name, oldHealthCheckNodePort, err) return false, errors.NewInternalError(fmt.Errorf("failed to free health check nodePort: %v", err)) } else { - delete(service.Annotations, apiservice.AnnotationHealthCheckNodePort) + delete(service.Annotations, apiservice.BetaAnnotationHealthCheckNodePort) + delete(service.Annotations, apiservice.AlphaAnnotationHealthCheckNodePort) glog.Infof("Freed health check nodePort: %d", oldHealthCheckNodePort) } case !oldServiceHasHealthCheckNodePort && !assignHealthCheckNodePort: - if _, ok := service.Annotations[apiservice.AnnotationHealthCheckNodePort]; ok { + if _, ok := service.Annotations[apiservice.BetaAnnotationHealthCheckNodePort]; ok { glog.Warningf("Attempt to insert health check node port annotation DENIED") el := field.ErrorList{field.Invalid(field.NewPath("metadata", "annotations"), - apiservice.AnnotationHealthCheckNodePort, "Cannot insert healthcheck nodePort annotation")} + apiservice.BetaAnnotationHealthCheckNodePort, "Cannot insert healthcheck nodePort annotation")} return false, errors.NewInvalid(api.Kind("Service"), service.Name, el) } case oldServiceHasHealthCheckNodePort && assignHealthCheckNodePort: - if _, ok := service.Annotations[apiservice.AnnotationHealthCheckNodePort]; !ok { - glog.Warningf("Attempt to delete health check node port annotation DENIED") - el := field.ErrorList{field.Invalid(field.NewPath("metadata", "annotations"), - apiservice.AnnotationHealthCheckNodePort, "Cannot delete healthcheck nodePort annotation")} - return false, errors.NewInvalid(api.Kind("Service"), service.Name, el) + for _, annotation := range []string{ + apiservice.AlphaAnnotationHealthCheckNodePort, apiservice.BetaAnnotationHealthCheckNodePort} { + if _, ok := service.Annotations[annotation]; !ok { + glog.Warningf("Attempt to delete health check node port annotation DENIED") + el := field.ErrorList{field.Invalid(field.NewPath("metadata", "annotations"), + annotation, "Cannot delete healthcheck nodePort annotation")} + return false, errors.NewInvalid(api.Kind("Service"), service.Name, el) + } } if oldHealthCheckNodePort != requestedHealthCheckNodePort { glog.Warningf("Attempt to change value of health check node port annotation DENIED") el := field.ErrorList{field.Invalid(field.NewPath("metadata", "annotations"), - apiservice.AnnotationHealthCheckNodePort, "Cannot change healthcheck nodePort during update")} + apiservice.BetaAnnotationHealthCheckNodePort, "Cannot change healthcheck nodePort during update")} return false, errors.NewInvalid(api.Kind("Service"), service.Name, el) } } diff --git a/pkg/registry/core/service/rest_test.go b/pkg/registry/core/service/rest_test.go index 806414ea796..b479a039933 100644 --- a/pkg/registry/core/service/rest_test.go +++ b/pkg/registry/core/service/rest_test.go @@ -955,7 +955,7 @@ func TestServiceRegistryExternalTrafficAnnotationHealthCheckNodePortAllocation(t svc := &api.Service{ ObjectMeta: api.ObjectMeta{Name: "external-lb-esipp", Annotations: map[string]string{ - service.AnnotationExternalTraffic: service.AnnotationValueExternalTrafficLocal, + service.BetaAnnotationExternalTraffic: service.AnnotationValueExternalTrafficLocal, }, }, Spec: api.ServiceSpec{ @@ -975,25 +975,25 @@ func TestServiceRegistryExternalTrafficAnnotationHealthCheckNodePortAllocation(t } created_service := created_svc.(*api.Service) if !service.NeedsHealthCheck(created_service) { - t.Errorf("Unexpected missing annotation %s", service.AnnotationExternalTraffic) + t.Errorf("Unexpected missing annotation %s", service.BetaAnnotationExternalTraffic) } port := service.GetServiceHealthCheckNodePort(created_service) if port == 0 { - t.Errorf("Failed to allocate and create the health check node port annotation %s", service.AnnotationHealthCheckNodePort) + t.Errorf("Failed to allocate and create the health check node port annotation %s", service.BetaAnnotationHealthCheckNodePort) } } // Validate using the user specified nodePort when the externalTraffic=OnlyLocal annotation is set // and type is LoadBalancer -func TestServiceRegistryExternalTrafficAnnotationHealthCheckNodePortUserAllocation(t *testing.T) { +func TestServiceRegistryExternalTrafficBetaAnnotationHealthCheckNodePortUserAllocation(t *testing.T) { ctx := api.NewDefaultContext() storage, _ := NewTestREST(t, nil) svc := &api.Service{ ObjectMeta: api.ObjectMeta{Name: "external-lb-esipp", Annotations: map[string]string{ - service.AnnotationExternalTraffic: service.AnnotationValueExternalTrafficLocal, - service.AnnotationHealthCheckNodePort: "30200", + service.BetaAnnotationExternalTraffic: service.AnnotationValueExternalTrafficLocal, + service.BetaAnnotationHealthCheckNodePort: "30200", }, }, Spec: api.ServiceSpec{ @@ -1013,11 +1013,11 @@ func TestServiceRegistryExternalTrafficAnnotationHealthCheckNodePortUserAllocati } created_service := created_svc.(*api.Service) if !service.NeedsHealthCheck(created_service) { - t.Errorf("Unexpected missing annotation %s", service.AnnotationExternalTraffic) + t.Errorf("Unexpected missing annotation %s", service.BetaAnnotationExternalTraffic) } port := service.GetServiceHealthCheckNodePort(created_service) if port == 0 { - t.Errorf("Failed to allocate and create the health check node port annotation %s", service.AnnotationHealthCheckNodePort) + t.Errorf("Failed to allocate and create the health check node port annotation %s", service.BetaAnnotationHealthCheckNodePort) } if port != 30200 { t.Errorf("Failed to allocate requested nodePort expected 30200, got %d", port) @@ -1031,8 +1031,8 @@ func TestServiceRegistryExternalTrafficAnnotationNegative(t *testing.T) { svc := &api.Service{ ObjectMeta: api.ObjectMeta{Name: "external-lb-esipp", Annotations: map[string]string{ - service.AnnotationExternalTraffic: service.AnnotationValueExternalTrafficLocal, - service.AnnotationHealthCheckNodePort: "-1", + service.BetaAnnotationExternalTraffic: service.AnnotationValueExternalTrafficLocal, + service.BetaAnnotationHealthCheckNodePort: "-1", }, }, Spec: api.ServiceSpec{ @@ -1060,7 +1060,7 @@ func TestServiceRegistryExternalTrafficAnnotationGlobal(t *testing.T) { svc := &api.Service{ ObjectMeta: api.ObjectMeta{Name: "external-lb-esipp", Annotations: map[string]string{ - service.AnnotationExternalTraffic: service.AnnotationValueExternalTrafficGlobal, + service.BetaAnnotationExternalTraffic: service.AnnotationValueExternalTrafficGlobal, }, }, Spec: api.ServiceSpec{ @@ -1081,12 +1081,12 @@ func TestServiceRegistryExternalTrafficAnnotationGlobal(t *testing.T) { created_service := created_svc.(*api.Service) // Make sure the service does not have the annotation if service.NeedsHealthCheck(created_service) { - t.Errorf("Unexpected value for annotation %s", service.AnnotationExternalTraffic) + t.Errorf("Unexpected value for annotation %s", service.BetaAnnotationExternalTraffic) } // Make sure the service does not have the health check node port allocated port := service.GetServiceHealthCheckNodePort(created_service) if port != 0 { - t.Errorf("Unexpected allocation of health check node port annotation %s", service.AnnotationHealthCheckNodePort) + t.Errorf("Unexpected allocation of health check node port annotation %s", service.BetaAnnotationHealthCheckNodePort) } } @@ -1097,7 +1097,7 @@ func TestServiceRegistryExternalTrafficAnnotationClusterIP(t *testing.T) { svc := &api.Service{ ObjectMeta: api.ObjectMeta{Name: "external-lb-esipp", Annotations: map[string]string{ - service.AnnotationExternalTraffic: service.AnnotationValueExternalTrafficGlobal, + service.BetaAnnotationExternalTraffic: service.AnnotationValueExternalTrafficGlobal, }, }, Spec: api.ServiceSpec{ @@ -1119,6 +1119,6 @@ func TestServiceRegistryExternalTrafficAnnotationClusterIP(t *testing.T) { // Make sure that ClusterIP services do not have the health check node port allocated port := service.GetServiceHealthCheckNodePort(created_service) if port != 0 { - t.Errorf("Unexpected allocation of health check node port annotation %s", service.AnnotationHealthCheckNodePort) + t.Errorf("Unexpected allocation of health check node port annotation %s", service.BetaAnnotationHealthCheckNodePort) } } From eb235a82183b4e004baa06f93d9d6c7f2316913a Mon Sep 17 00:00:00 2001 From: bprashanth Date: Mon, 3 Oct 2016 11:53:28 -0700 Subject: [PATCH 3/5] E2e tests --- test/e2e/service.go | 507 +++++++++++++++++++++++++++++++++++++------- 1 file changed, 426 insertions(+), 81 deletions(-) diff --git a/test/e2e/service.go b/test/e2e/service.go index c3425d4778a..a94d0c7d156 100644 --- a/test/e2e/service.go +++ b/test/e2e/service.go @@ -65,6 +65,13 @@ const ( //TODO: once support ticket 21807001 is resolved, reduce this timeout back to something reasonable loadBalancerCreateTimeoutDefault = 20 * time.Minute loadBalancerCreateTimeoutLarge = time.Hour + + largeClusterMinNodesNumber = 100 + + // Don't test with more than 3 nodes. + // Many tests create an endpoint per node, in large clusters, this is + // resource and time intensive. + maxNodesForEndpointsTests = 3 ) // This should match whatever the default/configured range is @@ -1073,79 +1080,293 @@ var _ = framework.KubeDescribe("Services", func() { framework.Failf("expected un-ready endpoint for Service %v within %v, stdout: %v", t.name, kubeProxyLagTimeout, stdout) } }) +}) - It("should be able to create services of type LoadBalancer and externalTraffic=localOnly [Slow][Feature:ExternalTrafficLocalOnly]", func() { +var _ = framework.KubeDescribe("ESIPP [Slow][Feature:ExternalTrafficLocalOnly]", func() { + f := framework.NewDefaultFramework("esipp") + loadBalancerCreateTimeout := loadBalancerCreateTimeoutDefault + + var c *client.Client + var cs clientset.Interface + + BeforeEach(func() { // requires cloud load-balancer support - this feature currently supported only on GCE/GKE framework.SkipUnlessProviderIs("gce", "gke") - loadBalancerCreateTimeout := loadBalancerCreateTimeoutDefault - largeClusterMinNodesNumber := 100 - if nodes := framework.GetReadySchedulableNodesOrDie(cs); len(nodes.Items) > largeClusterMinNodesNumber { + c = f.Client + cs = f.ClientSet + if nodes := framework.GetReadySchedulableNodesOrDie(c); len(nodes.Items) > largeClusterMinNodesNumber { loadBalancerCreateTimeout = loadBalancerCreateTimeoutLarge } + }) + + It("should work for type=LoadBalancer [Slow][Feature:ExternalTrafficLocalOnly]", func() { namespace := f.Namespace.Name serviceName := "external-local" jig := NewServiceTestJig(c, cs, serviceName) - By("creating a service " + namespace + "/" + namespace + " with type=LoadBalancer and annotation for local-traffic-only") - svc := jig.CreateTCPServiceOrFail(namespace, func(svc *api.Service) { - svc.Spec.Type = api.ServiceTypeLoadBalancer - // We need to turn affinity off for our LB distribution tests - svc.Spec.SessionAffinity = api.ServiceAffinityNone - svc.ObjectMeta.Annotations = map[string]string{ - service.AnnotationExternalTraffic: service.AnnotationValueExternalTrafficLocal} - svc.Spec.Ports = []api.ServicePort{{Protocol: "TCP", Port: 80}} - }) - By("creating a pod to be part of the service " + serviceName) - // This container is an nginx container listening on port 80 - // See kubernetes/contrib/ingress/echoheaders/nginx.conf for content of response - jig.RunOrFail(namespace, nil) - By("waiting for loadbalancer for service " + namespace + "/" + serviceName) - svc = jig.WaitForLoadBalancerOrFail(namespace, serviceName, loadBalancerCreateTimeout) - jig.SanityCheckService(svc, api.ServiceTypeLoadBalancer) - svcTcpPort := int(svc.Spec.Ports[0].Port) - framework.Logf("service port : %d", svcTcpPort) - ingressIP := getIngressPoint(&svc.Status.LoadBalancer.Ingress[0]) - framework.Logf("TCP load balancer: %s", ingressIP) + + svc := jig.createOnlyLocalLoadBalancerService(namespace, serviceName, loadBalancerCreateTimeout, true) healthCheckNodePort := int(service.GetServiceHealthCheckNodePort(svc)) - By("checking health check node port allocated") if healthCheckNodePort == 0 { framework.Failf("Service HealthCheck NodePort was not allocated") } - // TODO(33957): test localOnly nodePort Services. - By("hitting the TCP service's service port, via its external VIP " + ingressIP + ":" + fmt.Sprintf("%d", svcTcpPort)) - jig.TestReachableHTTP(ingressIP, svcTcpPort, kubeProxyLagTimeout) + defer func() { + jig.ChangeServiceType(svc.Namespace, svc.Name, api.ServiceTypeClusterIP, loadBalancerCreateTimeout) + + // Make sure we didn't leak the health check node port. + for name, ips := range jig.getEndpointNodes(svc) { + _, fail, status := jig.TestHTTPHealthCheckNodePort(ips[0], healthCheckNodePort, "/healthz", 5) + if fail < 2 { + framework.Failf("Health check node port %v not released on node %v: %v", healthCheckNodePort, name, status) + } + break + } + Expect(c.Services(svc.Namespace).Delete(svc.Name)).NotTo(HaveOccurred()) + }() + + svcTCPPort := int(svc.Spec.Ports[0].Port) + ingressIP := getIngressPoint(&svc.Status.LoadBalancer.Ingress[0]) + By("reading clientIP using the TCP service's service port via its external VIP") - content := jig.GetHTTPContent(ingressIP, svcTcpPort, kubeProxyLagTimeout, "/clientip") + content := jig.GetHTTPContent(ingressIP, svcTCPPort, kubeProxyLagTimeout, "/clientip") clientIP := content.String() framework.Logf("ClientIP detected by target pod using VIP:SvcPort is %s", clientIP) + By("checking if Source IP is preserved") if strings.HasPrefix(clientIP, "10.") { framework.Failf("Source IP was NOT preserved") } - By("finding nodes for all service endpoints") - endpoints, err := c.Endpoints(namespace).Get(serviceName) - if err != nil { - framework.Failf("Get endpoints for service %s/%s failed (%s)", namespace, serviceName, err) + }) + + It("should work for type=NodePort [Slow][Feature:ExternalTrafficLocalOnly]", func() { + namespace := f.Namespace.Name + serviceName := "external-local" + jig := NewServiceTestJig(c, cs, serviceName) + + svc := jig.createOnlyLocalNodePortService(namespace, serviceName, true) + defer func() { + Expect(c.Services(svc.Namespace).Delete(svc.Name)).NotTo(HaveOccurred()) + }() + + tcpNodePort := int(svc.Spec.Ports[0].NodePort) + endpointsNodeMap := jig.getEndpointNodes(svc) + path := "/clientip" + + for nodeName, nodeIPs := range endpointsNodeMap { + nodeIP := nodeIPs[0] + By(fmt.Sprintf("reading clientIP using the TCP service's NodePort, on node %v: %v%v%v", nodeName, nodeIP, tcpNodePort, path)) + content := jig.GetHTTPContent(nodeIP, tcpNodePort, kubeProxyLagTimeout, path) + clientIP := content.String() + framework.Logf("ClientIP detected by target pod using NodePort is %s", clientIP) + if strings.HasPrefix(clientIP, "10.") { + framework.Failf("Source IP was NOT preserved") + } } - if len(endpoints.Subsets[0].Addresses) == 0 { - framework.Failf("Expected Ready endpoints - found none") + }) + + It("should only target nodes with endpoints [Slow][Feature:ExternalTrafficLocalOnly]", func() { + namespace := f.Namespace.Name + serviceName := "external-local" + jig := NewServiceTestJig(c, cs, serviceName) + nodes := jig.getNodes(maxNodesForEndpointsTests) + + svc := jig.createOnlyLocalLoadBalancerService(namespace, serviceName, loadBalancerCreateTimeout, false) + defer func() { + jig.ChangeServiceType(svc.Namespace, svc.Name, api.ServiceTypeClusterIP, loadBalancerCreateTimeout) + Expect(c.Services(svc.Namespace).Delete(svc.Name)).NotTo(HaveOccurred()) + }() + + healthCheckNodePort := int(service.GetServiceHealthCheckNodePort(svc)) + if healthCheckNodePort == 0 { + framework.Failf("Service HealthCheck NodePort was not allocated") } - readyHostName := *endpoints.Subsets[0].Addresses[0].NodeName - framework.Logf("Pod for service %s/%s is on node %s", namespace, serviceName, readyHostName) - // HealthCheck responder validation - iterate over all node IPs and check their HC responses - // Collect all node names and their public IPs - the nodes and ips slices parallel each other - nodes := framework.GetReadySchedulableNodesOrDie(jig.ClientSet) + ips := collectAddresses(nodes, api.NodeExternalIP) if len(ips) == 0 { ips = collectAddresses(nodes, api.NodeLegacyHostIP) } - By("checking kube-proxy health check responses are correct") - for n, publicIP := range ips { - framework.Logf("Checking health check response for node %s, public IP %s", nodes.Items[n].Name, publicIP) + + ingressIP := getIngressPoint(&svc.Status.LoadBalancer.Ingress[0]) + svcTCPPort := int(svc.Spec.Ports[0].Port) + + threshold := 2 + path := "/healthz" + for i := 0; i < len(nodes.Items); i++ { + endpointNodeName := nodes.Items[i].Name + + By("creating a pod to be part of the service " + serviceName + " on node " + endpointNodeName) + jig.RunOrFail(namespace, func(rc *api.ReplicationController) { + rc.Name = serviceName + if endpointNodeName != "" { + rc.Spec.Template.Spec.NodeName = endpointNodeName + } + }) + + By(fmt.Sprintf("waiting for service endpoint on node %v", endpointNodeName)) + jig.waitForEndpointOnNode(namespace, serviceName, endpointNodeName) + // HealthCheck should pass only on the node where num(endpoints) > 0 // All other nodes should fail the healthcheck on the service healthCheckNodePort - expectedSuccess := nodes.Items[n].Name == readyHostName - jig.TestHTTPHealthCheckNodePort(publicIP, healthCheckNodePort, "/healthz", expectedSuccess) + for n, publicIP := range ips { + expectedSuccess := nodes.Items[n].Name == endpointNodeName + framework.Logf("Health checking %s, http://%s:%d/%s, expectedSuccess %v", nodes.Items[n].Name, publicIP, healthCheckNodePort, path, expectedSuccess) + pass, fail, err := jig.TestHTTPHealthCheckNodePort(publicIP, healthCheckNodePort, path, 5) + if expectedSuccess && pass < threshold { + framework.Failf("Expected %s successes on %v/%v, got %d, err %v", threshold, endpointNodeName, path, pass, err) + } else if !expectedSuccess && fail < threshold { + framework.Failf("Expected %s failures on %v/%v, got %d, err %v", threshold, endpointNodeName, path, fail, err) + } + // Make sure the loadbalancer picked up the helth check change + jig.TestReachableHTTP(ingressIP, svcTCPPort, kubeProxyLagTimeout) + } + framework.ExpectNoError(framework.DeleteRCAndPods(c, f.ClientSet, namespace, serviceName)) + } + }) + + It("should work from pods [Slow][Feature:ExternalTrafficLocalOnly]", func() { + namespace := f.Namespace.Name + serviceName := "external-local" + jig := NewServiceTestJig(c, cs, serviceName) + nodes := jig.getNodes(maxNodesForEndpointsTests) + + svc := jig.createOnlyLocalLoadBalancerService(namespace, serviceName, loadBalancerCreateTimeout, true) + defer func() { + jig.ChangeServiceType(svc.Namespace, svc.Name, api.ServiceTypeClusterIP, loadBalancerCreateTimeout) + Expect(c.Services(svc.Namespace).Delete(svc.Name)).NotTo(HaveOccurred()) + }() + + ingressIP := getIngressPoint(&svc.Status.LoadBalancer.Ingress[0]) + path := fmt.Sprintf("%s:%d/clientip", ingressIP, int(svc.Spec.Ports[0].Port)) + nodeName := nodes.Items[0].Name + podName := "execpod-sourceip" + + By(fmt.Sprintf("Creating %v on node %v", podName, nodeName)) + execPodName := createExecPodOnNode(f.Client, namespace, nodeName, podName) + defer func() { + err := c.Pods(namespace).Delete(execPodName, nil) + Expect(err).NotTo(HaveOccurred()) + }() + execPod, err := f.Client.Pods(namespace).Get(execPodName) + ExpectNoError(err) + + framework.Logf("Waiting up to %v wget %v", kubeProxyLagTimeout, path) + cmd := fmt.Sprintf(`wget -T 30 -qO- %v`, path) + + var srcIP string + By(fmt.Sprintf("Hitting external lb %v from pod %v on node %v", ingressIP, podName, nodeName)) + if pollErr := wait.PollImmediate(framework.Poll, loadBalancerCreateTimeoutDefault, func() (bool, error) { + stdout, err := framework.RunHostCmd(execPod.Namespace, execPod.Name, cmd) + if err != nil { + framework.Logf("got err: %v, retry until timeout", err) + return false, nil + } + srcIP = strings.TrimSpace(strings.Split(stdout, ":")[0]) + return srcIP == execPod.Status.PodIP, nil + }); pollErr != nil { + framework.Failf("Source IP not preserved from %v, expected '%v' got '%v'", podName, execPod.Status.PodIP, srcIP) + } + }) + + It("should handle updates to source ip annotation [Slow][Feature:ExternalTrafficLocalOnly]", func() { + namespace := f.Namespace.Name + serviceName := "external-local" + jig := NewServiceTestJig(c, cs, serviceName) + + nodes := jig.getNodes(maxNodesForEndpointsTests) + if len(nodes.Items) < 2 { + framework.Failf("Need at least 2 nodes to verify source ip from a node without endpoint") + } + + svc := jig.createOnlyLocalLoadBalancerService(namespace, serviceName, loadBalancerCreateTimeout, true) + defer func() { + jig.ChangeServiceType(svc.Namespace, svc.Name, api.ServiceTypeClusterIP, loadBalancerCreateTimeout) + Expect(c.Services(svc.Namespace).Delete(svc.Name)).NotTo(HaveOccurred()) + }() + + // save the health check node port because it disappears when lift the annotation. + healthCheckNodePort := int(service.GetServiceHealthCheckNodePort(svc)) + + By("turning ESIPP off") + svc = jig.UpdateServiceOrFail(svc.Namespace, svc.Name, func(svc *api.Service) { + svc.ObjectMeta.Annotations[service.BetaAnnotationExternalTraffic] = + service.AnnotationValueExternalTrafficGlobal + }) + if service.GetServiceHealthCheckNodePort(svc) > 0 { + framework.Failf("Service HealthCheck NodePort annotation still present") + } + + endpointNodeMap := jig.getEndpointNodes(svc) + noEndpointNodeMap := map[string][]string{} + for _, n := range nodes.Items { + if _, ok := endpointNodeMap[n.Name]; ok { + continue + } + noEndpointNodeMap[n.Name] = getNodeAddresses(&n, api.NodeExternalIP) + } + + svcTCPPort := int(svc.Spec.Ports[0].Port) + svcNodePort := int(svc.Spec.Ports[0].NodePort) + ingressIP := getIngressPoint(&svc.Status.LoadBalancer.Ingress[0]) + path := "/clientip" + + By(fmt.Sprintf("endpoints present on nodes %v, absent on nodes %v", endpointNodeMap, noEndpointNodeMap)) + for nodeName, nodeIPs := range noEndpointNodeMap { + By(fmt.Sprintf("Checking %v (%v:%v%v) proxies to endpoints on another node", nodeName, nodeIPs[0], path, svcNodePort)) + jig.GetHTTPContent(nodeIPs[0], svcNodePort, kubeProxyLagTimeout, path) + } + + for nodeName, nodeIPs := range endpointNodeMap { + By(fmt.Sprintf("checking kube-proxy health check fails on node with endpoint (%s), public IP %s", nodeName, nodeIPs[0])) + var body bytes.Buffer + var result bool + var err error + if pollErr := wait.PollImmediate(framework.Poll, timeout, func() (bool, error) { + result, err = testReachableHTTPWithContent(nodeIPs[0], healthCheckNodePort, "/healthz", "", &body) + return !result, nil + }); pollErr != nil { + framework.Failf("Kube-proxy still exposing health check on node %v:%v, after ESIPP was turned off. Last err %v, last body %v", + nodeName, healthCheckNodePort, err, body.String()) + } + } + + // Poll till kube-proxy re-adds the MASQUERADE rule on the node. + By(fmt.Sprintf("checking source ip is NOT preserved through loadbalancer %v", ingressIP)) + pollErr := wait.PollImmediate(framework.Poll, kubeProxyLagTimeout, func() (bool, error) { + content := jig.GetHTTPContent(ingressIP, svcTCPPort, kubeProxyLagTimeout, "/clientip") + clientIP := content.String() + if strings.HasPrefix(clientIP, "10.") { + return true, nil + } + return false, fmt.Errorf("Source IP (%v) is the client IP, expected a ten-dot cluster ip.", clientIP) + }) + if pollErr != nil { + framework.Failf("Source IP WAS preserved even after ESIPP turned off: %v", pollErr) + } + + // TODO: We need to attempt to create another service with the previously + // allocated healthcheck nodePort. If the health check nodePort has been + // freed, the new service creation will succeed, upon which we cleanup. + // If the health check nodePort has NOT been freed, the new service + // creation will fail. + + By("turning ESIPP annotation back on") + svc = jig.UpdateServiceOrFail(svc.Namespace, svc.Name, func(svc *api.Service) { + svc.ObjectMeta.Annotations[service.BetaAnnotationExternalTraffic] = + service.AnnotationValueExternalTrafficLocal + // Request the same healthCheckNodePort as before, to test the user-requested allocation path + svc.ObjectMeta.Annotations[service.BetaAnnotationHealthCheckNodePort] = + fmt.Sprintf("%d", healthCheckNodePort) + }) + pollErr = wait.PollImmediate(framework.Poll, kubeProxyLagTimeout, func() (bool, error) { + content := jig.GetHTTPContent(ingressIP, svcTCPPort, kubeProxyLagTimeout, "/clientip") + clientIP := content.String() + if !strings.HasPrefix(clientIP, "10.") { + return true, nil + } + return false, fmt.Errorf("Source IP (%v) is not the client IP, expected a public IP.", clientIP) + }) + if pollErr != nil { + framework.Failf("Source IP was not preserved when the ESIPP annotation was on: %v", pollErr) } }) }) @@ -1372,16 +1593,20 @@ func deletePodOrFail(c *client.Client, ns, name string) { Expect(err).NotTo(HaveOccurred()) } +func getNodeAddresses(node *api.Node, addressType api.NodeAddressType) (ips []string) { + for j := range node.Status.Addresses { + nodeAddress := &node.Status.Addresses[j] + if nodeAddress.Type == addressType { + ips = append(ips, nodeAddress.Address) + } + } + return +} + func collectAddresses(nodes *api.NodeList, addressType api.NodeAddressType) []string { ips := []string{} for i := range nodes.Items { - item := &nodes.Items[i] - for j := range item.Status.Addresses { - nodeAddress := &item.Status.Addresses[j] - if nodeAddress.Type == addressType { - ips = append(ips, nodeAddress.Address) - } - } + ips = append(ips, getNodeAddresses(&nodes.Items[i], addressType)...) } return ips } @@ -1855,6 +2080,128 @@ func (j *ServiceTestJig) CreateUDPServiceOrFail(namespace string, tweak func(svc return result } +func (j *ServiceTestJig) ChangeServiceType(namespace, name string, newType api.ServiceType, timeout time.Duration) { + ingressIP := "" + svc := j.UpdateServiceOrFail(namespace, name, func(s *api.Service) { + for _, ing := range s.Status.LoadBalancer.Ingress { + if ing.IP != "" { + ingressIP = ing.IP + } + } + s.Spec.Type = newType + s.Spec.Ports[0].NodePort = 0 + }) + if ingressIP != "" { + j.WaitForLoadBalancerDestroyOrFail(namespace, svc.Name, ingressIP, int(svc.Spec.Ports[0].Port), timeout) + } +} + +// createOnlyLocalNodePortService creates a loadbalancer service and sanity checks its +// nodePort. If createPod is true, it also creates an RC with 1 replica of +// the standard netexec container used everywhere in this test. +func (j *ServiceTestJig) createOnlyLocalNodePortService(namespace, serviceName string, createPod bool) *api.Service { + By("creating a service " + namespace + "/" + namespace + " with type=NodePort and annotation for local-traffic-only") + svc := j.CreateTCPServiceOrFail(namespace, func(svc *api.Service) { + svc.Spec.Type = api.ServiceTypeNodePort + svc.ObjectMeta.Annotations = map[string]string{ + service.AlphaAnnotationExternalTraffic: service.AnnotationValueExternalTrafficLocal} + svc.Spec.Ports = []api.ServicePort{{Protocol: "TCP", Port: 80}} + }) + + if createPod { + By("creating a pod to be part of the service " + serviceName) + j.RunOrFail(namespace, nil) + } + j.SanityCheckService(svc, api.ServiceTypeNodePort) + return svc +} + +// createOnlyLocalLoadBalancerService creates a loadbalancer service and waits for it to +// acquire an ingress IP. If createPod is true, it also creates an RC with 1 +// replica of the standard netexec container used everywhere in this test. +func (j *ServiceTestJig) createOnlyLocalLoadBalancerService(namespace, serviceName string, timeout time.Duration, createPod bool) *api.Service { + By("creating a service " + namespace + "/" + namespace + " with type=LoadBalancer and annotation for local-traffic-only") + svc := j.CreateTCPServiceOrFail(namespace, func(svc *api.Service) { + svc.Spec.Type = api.ServiceTypeLoadBalancer + // We need to turn affinity off for our LB distribution tests + svc.Spec.SessionAffinity = api.ServiceAffinityNone + svc.ObjectMeta.Annotations = map[string]string{ + service.AlphaAnnotationExternalTraffic: service.AnnotationValueExternalTrafficLocal} + svc.Spec.Ports = []api.ServicePort{{Protocol: "TCP", Port: 80}} + }) + + if createPod { + By("creating a pod to be part of the service " + serviceName) + j.RunOrFail(namespace, nil) + } + By("waiting for loadbalancer for service " + namespace + "/" + serviceName) + svc = j.WaitForLoadBalancerOrFail(namespace, serviceName, timeout) + j.SanityCheckService(svc, api.ServiceTypeLoadBalancer) + return svc +} + +// getEndpointNodes returns a map of nodenames:external-ip on which the +// endpoints of the given Service are running. +func (j *ServiceTestJig) getEndpointNodes(svc *api.Service) map[string][]string { + nodes := j.getNodes(maxNodesForEndpointsTests) + endpoints, err := j.Client.Endpoints(svc.Namespace).Get(svc.Name) + if err != nil { + framework.Failf("Get endpoints for service %s/%s failed (%s)", svc.Namespace, svc.Name, err) + } + if len(endpoints.Subsets) == 0 { + framework.Failf("Endpoint has no subsets, cannot determine node addresses.") + } + epNodes := sets.NewString() + for _, ss := range endpoints.Subsets { + for _, e := range ss.Addresses { + if e.NodeName != nil { + epNodes.Insert(*e.NodeName) + } + } + } + nodeMap := map[string][]string{} + for _, n := range nodes.Items { + if epNodes.Has(n.Name) { + nodeMap[n.Name] = getNodeAddresses(&n, api.NodeExternalIP) + } + } + return nodeMap +} + +// getNodes returns the first maxNodesForTest nodes. Useful in large clusters +// where we don't eg: want to create an endpoint per node. +func (j *ServiceTestJig) getNodes(maxNodesForTest int) (nodes *api.NodeList) { + nodes = framework.GetReadySchedulableNodesOrDie(j.Client) + if len(nodes.Items) <= maxNodesForTest { + maxNodesForTest = len(nodes.Items) + } + nodes.Items = nodes.Items[:maxNodesForTest] + return nodes +} + +func (j *ServiceTestJig) waitForEndpointOnNode(namespace, serviceName, nodeName string) { + err := wait.PollImmediate(framework.Poll, loadBalancerCreateTimeoutDefault, func() (bool, error) { + endpoints, err := j.Client.Endpoints(namespace).Get(serviceName) + if err != nil { + framework.Logf("Get endpoints for service %s/%s failed (%s)", namespace, serviceName, err) + return false, nil + } + // TODO: Handle multiple endpoints + if len(endpoints.Subsets[0].Addresses) == 0 { + framework.Logf("Expected Ready endpoints - found none") + return false, nil + } + epHostName := *endpoints.Subsets[0].Addresses[0].NodeName + framework.Logf("Pod for service %s/%s is on node %s", namespace, serviceName, epHostName) + if epHostName != nodeName { + framework.Logf("Found endpoint on wrong node, expected %v, got %v", nodeName, epHostName) + return false, nil + } + return true, nil + }) + framework.ExpectNoError(err) +} + func (j *ServiceTestJig) SanityCheckService(svc *api.Service, svcType api.ServiceType) { if svc.Spec.Type != svcType { framework.Failf("unexpected Spec.Type (%s) for service, expected %s", svc.Spec.Type, svcType) @@ -1900,7 +2247,6 @@ func (j *ServiceTestJig) UpdateService(namespace, name string, update func(*api. if err != nil { return nil, fmt.Errorf("Failed to get Service %q: %v", name, err) } - update(service) service, err = j.Client.Services(namespace).Update(service) if err == nil { @@ -2020,25 +2366,32 @@ func (j *ServiceTestJig) TestNotReachableUDP(host string, port int, timeout time func (j *ServiceTestJig) GetHTTPContent(host string, port int, timeout time.Duration, url string) bytes.Buffer { var body bytes.Buffer - if err := wait.PollImmediate(framework.Poll, timeout, func() (bool, error) { return testReachableHTTPWithContent(host, port, url, "", &body) }); err != nil { - framework.Failf("Could not reach HTTP service through %v:%v/%v after %v: %v", host, port, url, timeout, err) - return body + var err error + if pollErr := wait.PollImmediate(framework.Poll, timeout, func() (bool, error) { + result, err := testReachableHTTPWithContent(host, port, url, "", &body) + if err != nil { + framework.Logf("Error hitting %v:%v%v, retrying: %v", host, port, url, err) + return false, nil + } + return result, nil + }); pollErr != nil { + framework.Failf("Could not reach HTTP service through %v:%v%v after %v: %v", host, port, url, timeout, err) } return body } -func (j *ServiceTestJig) TestHTTPHealthCheckNodePort(host string, port int, request string, expectedSuccess bool) { - success, err := testHTTPHealthCheckNodePort(host, port, request) - if expectedSuccess && success { - framework.Logf("HealthCheck successful for node %v:%v, as expected", host, port) - return - } else if !expectedSuccess && (!success || err != nil) { - framework.Logf("HealthCheck failed for node %v:%v, as expected", host, port) - return - } else if expectedSuccess { - framework.Failf("HealthCheck NodePort incorrectly reporting unhealthy on %v:%v: %v", host, port, err) +func (j *ServiceTestJig) TestHTTPHealthCheckNodePort(host string, port int, request string, tries int) (pass, fail int, statusMsg string) { + for i := 0; i < tries; i++ { + success, err := testHTTPHealthCheckNodePort(host, port, request) + if success { + pass++ + } else { + fail++ + } + statusMsg += fmt.Sprintf("\nAttempt %d Error %v", i, err) + time.Sleep(1 * time.Second) } - framework.Failf("Unexpected HealthCheck NodePort still reporting healthy %v:%v: %v", host, port, err) + return pass, fail, statusMsg } func getIngressPoint(ing *api.LoadBalancerIngress) string { @@ -2310,26 +2663,21 @@ func (j *ServiceTestJig) launchEchoserverPodOnNode(f *framework.Framework, nodeN framework.Logf("Echo server pod %q in namespace %q running", pod.Name, f.Namespace.Name) } -func execSourceipTest(f *framework.Framework, c *client.Client, ns, nodeName, serviceIp string, servicePort int) (string, string) { - framework.Logf("Creating an exec pod on the same node") +func execSourceipTest(f *framework.Framework, c *client.Client, ns, nodeName, serviceIP string, servicePort int) (string, string) { + framework.Logf("Creating an exec pod on node %v", nodeName) execPodName := createExecPodOnNode(f.Client, ns, nodeName, fmt.Sprintf("execpod-sourceip-%s", nodeName)) defer func() { framework.Logf("Cleaning up the exec pod") err := c.Pods(ns).Delete(execPodName, nil) Expect(err).NotTo(HaveOccurred()) }() - podClient := f.Client.Pods(ns) - execPod, err := podClient.Get(execPodName) + execPod, err := f.Client.Pods(ns).Get(execPodName) ExpectNoError(err) - execPodIp := execPod.Status.PodIP - framework.Logf("Exec pod ip: %s", execPodIp) - framework.Logf("Getting echo response from service") var stdout string timeout := 2 * time.Minute - framework.Logf("Waiting up to %v for sourceIp test to be executed", timeout) - cmd := fmt.Sprintf(`wget -T 30 -qO- %s:%d | grep client_address`, serviceIp, servicePort) - // Need timeout mechanism because it may takes more times for iptables to be populated. + framework.Logf("Waiting up to %v wget %s:%d", timeout, serviceIP, servicePort) + cmd := fmt.Sprintf(`wget -T 30 -qO- %s:%d | grep client_address`, serviceIP, servicePort) for start := time.Now(); time.Since(start) < timeout; time.Sleep(2) { stdout, err = framework.RunHostCmd(execPod.Namespace, execPod.Name, cmd) if err != nil { @@ -2349,12 +2697,9 @@ func execSourceipTest(f *framework.Framework, c *client.Client, ns, nodeName, se // The stdout return from RunHostCmd seems to come with "\n", so TrimSpace is needed. // Desired stdout in this format: client_address=x.x.x.x outputs := strings.Split(strings.TrimSpace(stdout), "=") - sourceIp := "" if len(outputs) != 2 { // Fail the test if output format is unexpected. framework.Failf("exec pod returned unexpected stdout format: [%v]\n", stdout) - } else { - sourceIp = outputs[1] } - return execPodIp, sourceIp + return execPod.Status.PodIP, outputs[1] } From 243bd5743ba39382820663c3230137afbcb3131a Mon Sep 17 00:00:00 2001 From: bprashanth Date: Tue, 11 Oct 2016 15:16:38 -0700 Subject: [PATCH 4/5] Flip the beta flag --- pkg/util/config/feature_gate.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pkg/util/config/feature_gate.go b/pkg/util/config/feature_gate.go index c8782c4c421..00c2f4e6f9e 100644 --- a/pkg/util/config/feature_gate.go +++ b/pkg/util/config/feature_gate.go @@ -49,7 +49,7 @@ var ( // represented here. knownFeatures = map[string]featureSpec{ allAlphaGate: {false, alpha}, - externalTrafficLocalOnly: {false, alpha}, + externalTrafficLocalOnly: {true, beta}, appArmor: {true, beta}, dynamicKubeletConfig: {false, alpha}, dynamicVolumeProvisioning: {true, alpha}, From 5029bb06e97be34c371dead5452968249bebff8f Mon Sep 17 00:00:00 2001 From: bprashanth Date: Tue, 18 Oct 2016 18:43:13 -0700 Subject: [PATCH 5/5] Validation --- pkg/api/validation/validation.go | 69 ++++++++++++++++++++++++++- pkg/api/validation/validation_test.go | 45 +++++++++++++++++ pkg/registry/core/service/rest.go | 13 ++--- test/e2e/service.go | 8 ++-- 4 files changed, 121 insertions(+), 14 deletions(-) diff --git a/pkg/api/validation/validation.go b/pkg/api/validation/validation.go index 396f3b4017f..aa9ef469ae7 100644 --- a/pkg/api/validation/validation.go +++ b/pkg/api/validation/validation.go @@ -2385,8 +2385,15 @@ var supportedSessionAffinityType = sets.NewString(string(api.ServiceAffinityClie var supportedServiceType = sets.NewString(string(api.ServiceTypeClusterIP), string(api.ServiceTypeNodePort), string(api.ServiceTypeLoadBalancer), string(api.ServiceTypeExternalName)) -// ValidateService tests if required fields in the service are set. +// ValidateService tests if required fields/annotations of a Service are valid. func ValidateService(service *api.Service) field.ErrorList { + allErrs := validateServiceFields(service) + allErrs = append(allErrs, validateServiceAnnotations(service, nil)...) + return allErrs +} + +// validateServiceFields tests if required fields in the service are set. +func validateServiceFields(service *api.Service) field.ErrorList { allErrs := ValidateObjectMeta(&service.ObjectMeta, true, ValidateServiceName, field.NewPath("metadata")) specPath := field.NewPath("spec") @@ -2567,6 +2574,63 @@ func validateServicePort(sp *api.ServicePort, requireName, isHeadlessService boo return allErrs } +func validateServiceAnnotations(service *api.Service, oldService *api.Service) (allErrs field.ErrorList) { + // 2 annotations went from alpha to beta in 1.5: healthcheck-nodeport and + // external-traffic. The user cannot mix these. All updates to the alpha + // annotation are disallowed. The user must change both alpha annotations + // to beta before making any modifications, even though the system continues + // to respect the alpha version. + hcAlpha, healthCheckAlphaOk := service.Annotations[apiservice.AlphaAnnotationHealthCheckNodePort] + onlyLocalAlpha, onlyLocalAlphaOk := service.Annotations[apiservice.AlphaAnnotationExternalTraffic] + + _, healthCheckBetaOk := service.Annotations[apiservice.BetaAnnotationHealthCheckNodePort] + _, onlyLocalBetaOk := service.Annotations[apiservice.BetaAnnotationExternalTraffic] + + var oldHealthCheckAlpha, oldOnlyLocalAlpha string + var oldHealthCheckAlphaOk, oldOnlyLocalAlphaOk bool + if oldService != nil { + oldHealthCheckAlpha, oldHealthCheckAlphaOk = oldService.Annotations[apiservice.AlphaAnnotationHealthCheckNodePort] + oldOnlyLocalAlpha, oldOnlyLocalAlphaOk = oldService.Annotations[apiservice.AlphaAnnotationExternalTraffic] + } + hcValueChanged := oldHealthCheckAlphaOk && healthCheckAlphaOk && oldHealthCheckAlpha != hcAlpha + hcValueNew := !oldHealthCheckAlphaOk && healthCheckAlphaOk + hcValueGone := !healthCheckAlphaOk && !healthCheckBetaOk && oldHealthCheckAlphaOk + onlyLocalHCMismatch := onlyLocalBetaOk && healthCheckAlphaOk + + // On upgrading to a 1.5 cluster, the user is locked in at the current + // alpha setting, till they modify the Service such that the pair of + // annotations are both beta. Basically this means we need to: + // Disallow updates to the alpha annotation. + // Disallow creating a Service with the alpha annotation. + // Disallow removing both alpha annotations. Removing the health-check + // annotation is rejected at a later stage anyway, so if we allow removing + // just onlyLocal we might leak the port. + // Disallow a single field from transitioning to beta. Mismatched annotations + // cause confusion. + // Ignore changes to the fields if they're both transitioning to beta. + // Allow modifications to Services in fields other than the alpha annotation. + + if hcValueNew || hcValueChanged || hcValueGone || onlyLocalHCMismatch { + fieldPath := field.NewPath("metadata", "annotations").Key(apiservice.AlphaAnnotationHealthCheckNodePort) + msg := fmt.Sprintf("please replace the alpha annotation with the beta version %v", + apiservice.BetaAnnotationHealthCheckNodePort) + allErrs = append(allErrs, field.Invalid(fieldPath, apiservice.AlphaAnnotationHealthCheckNodePort, msg)) + } + + onlyLocalValueChanged := oldOnlyLocalAlphaOk && onlyLocalAlphaOk && oldOnlyLocalAlpha != onlyLocalAlpha + onlyLocalValueNew := !oldOnlyLocalAlphaOk && onlyLocalAlphaOk + onlyLocalValueGone := !onlyLocalAlphaOk && !onlyLocalBetaOk && oldOnlyLocalAlphaOk + hcOnlyLocalMismatch := onlyLocalAlphaOk && healthCheckBetaOk + + if onlyLocalValueNew || onlyLocalValueChanged || onlyLocalValueGone || hcOnlyLocalMismatch { + fieldPath := field.NewPath("metadata", "annotations").Key(apiservice.AlphaAnnotationExternalTraffic) + msg := fmt.Sprintf("please replace the alpha annotation with the beta version %v", + apiservice.BetaAnnotationExternalTraffic) + allErrs = append(allErrs, field.Invalid(fieldPath, apiservice.AlphaAnnotationExternalTraffic, msg)) + } + return +} + // ValidateServiceUpdate tests if required fields in the service are set during an update func ValidateServiceUpdate(service, oldService *api.Service) field.ErrorList { allErrs := ValidateObjectMetaUpdate(&service.ObjectMeta, &oldService.ObjectMeta, field.NewPath("metadata")) @@ -2578,7 +2642,8 @@ func ValidateServiceUpdate(service, oldService *api.Service) field.ErrorList { // TODO(freehan): allow user to update loadbalancerSourceRanges allErrs = append(allErrs, ValidateImmutableField(service.Spec.LoadBalancerSourceRanges, oldService.Spec.LoadBalancerSourceRanges, field.NewPath("spec", "loadBalancerSourceRanges"))...) - allErrs = append(allErrs, ValidateService(service)...) + allErrs = append(allErrs, validateServiceFields(service)...) + allErrs = append(allErrs, validateServiceAnnotations(service, oldService)...) return allErrs } diff --git a/pkg/api/validation/validation_test.go b/pkg/api/validation/validation_test.go index 9d11bea90fa..d9461071670 100644 --- a/pkg/api/validation/validation_test.go +++ b/pkg/api/validation/validation_test.go @@ -5207,6 +5207,13 @@ func TestValidateService(t *testing.T) { }, numErrs: 1, }, + { + name: "LoadBalancer disallows onlyLocal alpha annotations", + tweakSvc: func(s *api.Service) { + s.Annotations[service.AlphaAnnotationExternalTraffic] = service.AnnotationValueExternalTrafficLocal + }, + numErrs: 1, + }, } for _, tc := range testCases { @@ -6474,6 +6481,44 @@ func TestValidateServiceUpdate(t *testing.T) { }, numErrs: 1, }, + { + name: "Service disallows removing one onlyLocal alpha annotation", + tweakSvc: func(oldSvc, newSvc *api.Service) { + oldSvc.Annotations[service.AlphaAnnotationExternalTraffic] = service.AnnotationValueExternalTrafficLocal + oldSvc.Annotations[service.AlphaAnnotationHealthCheckNodePort] = "3001" + }, + numErrs: 2, + }, + { + name: "Service disallows modifying onlyLocal alpha annotations", + tweakSvc: func(oldSvc, newSvc *api.Service) { + oldSvc.Annotations[service.AlphaAnnotationExternalTraffic] = service.AnnotationValueExternalTrafficLocal + oldSvc.Annotations[service.AlphaAnnotationHealthCheckNodePort] = "3001" + newSvc.Annotations[service.AlphaAnnotationExternalTraffic] = service.AnnotationValueExternalTrafficGlobal + newSvc.Annotations[service.AlphaAnnotationHealthCheckNodePort] = oldSvc.Annotations[service.AlphaAnnotationHealthCheckNodePort] + }, + numErrs: 1, + }, + { + name: "Service disallows promoting one of the onlyLocal pair to beta", + tweakSvc: func(oldSvc, newSvc *api.Service) { + oldSvc.Annotations[service.AlphaAnnotationExternalTraffic] = service.AnnotationValueExternalTrafficLocal + oldSvc.Annotations[service.AlphaAnnotationHealthCheckNodePort] = "3001" + newSvc.Annotations[service.BetaAnnotationExternalTraffic] = service.AnnotationValueExternalTrafficGlobal + newSvc.Annotations[service.AlphaAnnotationHealthCheckNodePort] = oldSvc.Annotations[service.AlphaAnnotationHealthCheckNodePort] + }, + numErrs: 1, + }, + { + name: "Service allows changing both onlyLocal annotations from alpha to beta", + tweakSvc: func(oldSvc, newSvc *api.Service) { + oldSvc.Annotations[service.AlphaAnnotationExternalTraffic] = service.AnnotationValueExternalTrafficLocal + oldSvc.Annotations[service.AlphaAnnotationHealthCheckNodePort] = "3001" + newSvc.Annotations[service.BetaAnnotationExternalTraffic] = service.AnnotationValueExternalTrafficLocal + newSvc.Annotations[service.BetaAnnotationHealthCheckNodePort] = oldSvc.Annotations[service.AlphaAnnotationHealthCheckNodePort] + }, + numErrs: 0, + }, } for _, tc := range testCases { diff --git a/pkg/registry/core/service/rest.go b/pkg/registry/core/service/rest.go index 10ef74d8f62..bcb14fa5584 100644 --- a/pkg/registry/core/service/rest.go +++ b/pkg/registry/core/service/rest.go @@ -352,14 +352,11 @@ func (rs *REST) healthCheckNodePortUpdate(oldService, service *api.Service) (boo } case oldServiceHasHealthCheckNodePort && assignHealthCheckNodePort: - for _, annotation := range []string{ - apiservice.AlphaAnnotationHealthCheckNodePort, apiservice.BetaAnnotationHealthCheckNodePort} { - if _, ok := service.Annotations[annotation]; !ok { - glog.Warningf("Attempt to delete health check node port annotation DENIED") - el := field.ErrorList{field.Invalid(field.NewPath("metadata", "annotations"), - annotation, "Cannot delete healthcheck nodePort annotation")} - return false, errors.NewInvalid(api.Kind("Service"), service.Name, el) - } + if _, ok := service.Annotations[apiservice.BetaAnnotationHealthCheckNodePort]; !ok { + glog.Warningf("Attempt to delete health check node port annotation DENIED") + el := field.ErrorList{field.Invalid(field.NewPath("metadata", "annotations"), + apiservice.BetaAnnotationHealthCheckNodePort, "Cannot delete healthcheck nodePort annotation")} + return false, errors.NewInvalid(api.Kind("Service"), service.Name, el) } if oldHealthCheckNodePort != requestedHealthCheckNodePort { glog.Warningf("Attempt to change value of health check node port annotation DENIED") diff --git a/test/e2e/service.go b/test/e2e/service.go index a94d0c7d156..218061cbf96 100644 --- a/test/e2e/service.go +++ b/test/e2e/service.go @@ -1095,7 +1095,7 @@ var _ = framework.KubeDescribe("ESIPP [Slow][Feature:ExternalTrafficLocalOnly]", c = f.Client cs = f.ClientSet - if nodes := framework.GetReadySchedulableNodesOrDie(c); len(nodes.Items) > largeClusterMinNodesNumber { + if nodes := framework.GetReadySchedulableNodesOrDie(cs); len(nodes.Items) > largeClusterMinNodesNumber { loadBalancerCreateTimeout = loadBalancerCreateTimeoutLarge } }) @@ -2104,7 +2104,7 @@ func (j *ServiceTestJig) createOnlyLocalNodePortService(namespace, serviceName s svc := j.CreateTCPServiceOrFail(namespace, func(svc *api.Service) { svc.Spec.Type = api.ServiceTypeNodePort svc.ObjectMeta.Annotations = map[string]string{ - service.AlphaAnnotationExternalTraffic: service.AnnotationValueExternalTrafficLocal} + service.BetaAnnotationExternalTraffic: service.AnnotationValueExternalTrafficLocal} svc.Spec.Ports = []api.ServicePort{{Protocol: "TCP", Port: 80}} }) @@ -2126,7 +2126,7 @@ func (j *ServiceTestJig) createOnlyLocalLoadBalancerService(namespace, serviceNa // We need to turn affinity off for our LB distribution tests svc.Spec.SessionAffinity = api.ServiceAffinityNone svc.ObjectMeta.Annotations = map[string]string{ - service.AlphaAnnotationExternalTraffic: service.AnnotationValueExternalTrafficLocal} + service.BetaAnnotationExternalTraffic: service.AnnotationValueExternalTrafficLocal} svc.Spec.Ports = []api.ServicePort{{Protocol: "TCP", Port: 80}} }) @@ -2171,7 +2171,7 @@ func (j *ServiceTestJig) getEndpointNodes(svc *api.Service) map[string][]string // getNodes returns the first maxNodesForTest nodes. Useful in large clusters // where we don't eg: want to create an endpoint per node. func (j *ServiceTestJig) getNodes(maxNodesForTest int) (nodes *api.NodeList) { - nodes = framework.GetReadySchedulableNodesOrDie(j.Client) + nodes = framework.GetReadySchedulableNodesOrDie(j.ClientSet) if len(nodes.Items) <= maxNodesForTest { maxNodesForTest = len(nodes.Items) }