diff --git a/pkg/api/testing/fuzzer.go b/pkg/api/testing/fuzzer.go index fb0a4b20a19..6e076a2d727 100644 --- a/pkg/api/testing/fuzzer.go +++ b/pkg/api/testing/fuzzer.go @@ -187,7 +187,7 @@ func FuzzerFor(t *testing.T, version string, src rand.Source) *fuzz.Fuzzer { *p = types[c.Rand.Intn(len(types))] }, func(p *api.ServiceType, c fuzz.Continue) { - types := []api.ServiceType{api.ServiceTypeClusterIP, api.ServiceTypeLoadBalancer} + types := []api.ServiceType{api.ServiceTypeClusterIP, api.ServiceTypeNodePort, api.ServiceTypeLoadBalancer} *p = types[c.Rand.Intn(len(types))] }, func(ct *api.Container, c fuzz.Continue) { diff --git a/pkg/api/types.go b/pkg/api/types.go index e545987084f..85f661260bd 100644 --- a/pkg/api/types.go +++ b/pkg/api/types.go @@ -1019,6 +1019,10 @@ const ( // cluster, via the portal IP. ServiceTypeClusterIP ServiceType = "ClusterIP" + // ServiceTypeNodePort means a service will be exposed on one port of + // every node, in addition to 'ClusterIP' visibility. + ServiceTypeNodePort ServiceType = "NodePort" + // ServiceTypeLoadBalancer means a service will be exposed via an // external load balancer (if the cloud provider supports it), in addition // to 'NodePort' type. diff --git a/pkg/api/v1/types.go b/pkg/api/v1/types.go index cb37158fef8..815ab591714 100644 --- a/pkg/api/v1/types.go +++ b/pkg/api/v1/types.go @@ -1001,6 +1001,10 @@ const ( // cluster, via the portal IP. ServiceTypeClusterIP ServiceType = "ClusterIP" + // ServiceTypeNodePort means a service will be exposed on one port of + // every node, in addition to 'ClusterIP' visibility. + ServiceTypeNodePort ServiceType = "NodePort" + // ServiceTypeLoadBalancer means a service will be exposed via an // external load balancer (if the cloud provider supports it), in addition // to 'NodePort' type. diff --git a/pkg/api/v1beta1/types.go b/pkg/api/v1beta1/types.go index 139173aba64..815fc1b90b2 100644 --- a/pkg/api/v1beta1/types.go +++ b/pkg/api/v1beta1/types.go @@ -843,6 +843,10 @@ const ( // cluster, via the portal IP. ServiceTypeClusterIP ServiceType = "ClusterIP" + // ServiceTypeNodePort means a service will be exposed on one port of + // every node, in addition to 'ClusterIP' visibility. + ServiceTypeNodePort ServiceType = "NodePort" + // ServiceTypeLoadBalancer means a service will be exposed via an // external load balancer (if the cloud provider supports it), in addition // to 'NodePort' type. diff --git a/pkg/api/v1beta2/types.go b/pkg/api/v1beta2/types.go index 1fef9c353fb..06c2998a467 100644 --- a/pkg/api/v1beta2/types.go +++ b/pkg/api/v1beta2/types.go @@ -845,6 +845,10 @@ const ( // cluster, via the portal IP. ServiceTypeClusterIP ServiceType = "ClusterIP" + // ServiceTypeNodePort means a service will be exposed on one port of + // every node, in addition to 'ClusterIP' visibility. + ServiceTypeNodePort ServiceType = "NodePort" + // ServiceTypeLoadBalancer means a service will be exposed via an // external load balancer (if the cloud provider supports it), in addition // to 'NodePort' type. diff --git a/pkg/api/v1beta3/types.go b/pkg/api/v1beta3/types.go index 3f3125f1ae7..27df9272ed5 100644 --- a/pkg/api/v1beta3/types.go +++ b/pkg/api/v1beta3/types.go @@ -1005,6 +1005,10 @@ const ( // cluster, via the portal IP. ServiceTypeClusterIP ServiceType = "ClusterIP" + // ServiceTypeNodePort means a service will be exposed on one port of + // every node, in addition to 'ClusterIP' visibility. + ServiceTypeNodePort ServiceType = "NodePort" + // ServiceTypeLoadBalancer means a service will be exposed via an // external load balancer (if the cloud provider supports it), in addition // to 'NodePort' type. diff --git a/pkg/api/validation/validation.go b/pkg/api/validation/validation.go index 8012cca7052..771cad2c4e8 100644 --- a/pkg/api/validation/validation.go +++ b/pkg/api/validation/validation.go @@ -1032,7 +1032,7 @@ func ValidatePodTemplateUpdate(newPod, oldPod *api.PodTemplate) errs.ValidationE } var supportedSessionAffinityType = util.NewStringSet(string(api.ServiceAffinityClientIP), string(api.ServiceAffinityNone)) -var supportedServiceType = util.NewStringSet(string(api.ServiceTypeClusterIP), +var supportedServiceType = util.NewStringSet(string(api.ServiceTypeClusterIP), string(api.ServiceTypeNodePort), string(api.ServiceTypeLoadBalancer)) // ValidateService tests if required fields in the service are set. @@ -1086,6 +1086,14 @@ func ValidateService(service *api.Service) errs.ValidationErrorList { } } + if service.Spec.Type == api.ServiceTypeClusterIP { + for i := range service.Spec.Ports { + if service.Spec.Ports[i].NodePort != 0 { + allErrs = append(allErrs, errs.NewFieldInvalid("spec.ports", service.Spec.Ports[i], "cannot specify a node port with cluster-visibility services")) + } + } + } + // Check for duplicate NodePorts, considering (protocol,port) pairs nodePorts := make(map[api.ServicePort]bool) for i := range service.Spec.Ports { @@ -1103,11 +1111,6 @@ func ValidateService(service *api.Service) errs.ValidationErrorList { nodePorts[key] = true } - // Temporary validation to prevent people creating NodePorts before we have the full infrastructure in place - if len(nodePorts) > 0 { - allErrs = append(allErrs, errs.NewFieldInvalid("spec.ports", service.Spec.Ports[0], "nodePorts not (yet) enabled")) - } - return allErrs } @@ -1373,7 +1376,7 @@ func ValidateSecret(secret *api.Secret) errs.ValidationErrorList { allErrs = append(allErrs, errs.NewFieldRequired(fmt.Sprintf("metadata.annotations[%s]", api.ServiceAccountNameKey))) } case api.SecretTypeOpaque, "": - // no-op + // no-op case api.SecretTypeDockercfg: dockercfgBytes, exists := secret.Data[api.DockerConfigKey] if !exists { diff --git a/pkg/api/validation/validation_test.go b/pkg/api/validation/validation_test.go index 47a74bb03e7..a2922b48280 100644 --- a/pkg/api/validation/validation_test.go +++ b/pkg/api/validation/validation_test.go @@ -1691,14 +1691,14 @@ func TestValidateService(t *testing.T) { numErrs: 0, }, { - name: "valid visbility - cluster", + name: "valid type - cluster", tweakSvc: func(s *api.Service) { s.Spec.Type = api.ServiceTypeClusterIP }, numErrs: 0, }, { - name: "valid visbility - loadbalancer", + name: "valid type - loadbalancer", tweakSvc: func(s *api.Service) { s.Spec.Type = api.ServiceTypeLoadBalancer }, @@ -1723,18 +1723,106 @@ func TestValidateService(t *testing.T) { { name: "duplicate nodeports", tweakSvc: func(s *api.Service) { + s.Spec.Type = api.ServiceTypeNodePort s.Spec.Ports = append(s.Spec.Ports, api.ServicePort{Name: "q", Port: 1, Protocol: "TCP", NodePort: 1}) s.Spec.Ports = append(s.Spec.Ports, api.ServicePort{Name: "r", Port: 2, Protocol: "TCP", NodePort: 1}) }, - numErrs: 2, // TODO(justinsb): change to 1 when NodePorts enabled + numErrs: 1, }, { name: "duplicate nodeports (different protocols)", tweakSvc: func(s *api.Service) { + s.Spec.Type = api.ServiceTypeNodePort s.Spec.Ports = append(s.Spec.Ports, api.ServicePort{Name: "q", Port: 1, Protocol: "TCP", NodePort: 1}) s.Spec.Ports = append(s.Spec.Ports, api.ServicePort{Name: "r", Port: 2, Protocol: "UDP", NodePort: 1}) }, - numErrs: 1, // TODO(justinsb): change to 0 when NodePorts enabled + numErrs: 0, + }, + { + name: "valid type - cluster", + tweakSvc: func(s *api.Service) { + s.Spec.Type = api.ServiceTypeClusterIP + }, + numErrs: 0, + }, + { + name: "valid type - nodeport", + tweakSvc: func(s *api.Service) { + s.Spec.Type = api.ServiceTypeNodePort + }, + numErrs: 0, + }, + { + name: "valid type - loadbalancer", + tweakSvc: func(s *api.Service) { + s.Spec.Type = api.ServiceTypeLoadBalancer + }, + numErrs: 0, + }, + { + name: "valid type loadbalancer 2 ports", + tweakSvc: func(s *api.Service) { + s.Spec.Type = api.ServiceTypeLoadBalancer + s.Spec.Ports = append(s.Spec.Ports, api.ServicePort{Name: "q", Port: 12345, Protocol: "TCP"}) + }, + numErrs: 0, + }, + { + name: "valid type loadbalancer with NodePort", + tweakSvc: func(s *api.Service) { + s.Spec.Type = api.ServiceTypeLoadBalancer + s.Spec.Ports = append(s.Spec.Ports, api.ServicePort{Name: "q", Port: 12345, Protocol: "TCP", NodePort: 12345}) + }, + numErrs: 0, + }, + { + name: "valid type=NodePort service with NodePort", + tweakSvc: func(s *api.Service) { + s.Spec.Type = api.ServiceTypeNodePort + s.Spec.Ports = append(s.Spec.Ports, api.ServicePort{Name: "q", Port: 12345, Protocol: "TCP", NodePort: 12345}) + }, + numErrs: 0, + }, + { + name: "valid type=NodePort service without NodePort", + tweakSvc: func(s *api.Service) { + s.Spec.Type = api.ServiceTypeNodePort + s.Spec.Ports = append(s.Spec.Ports, api.ServicePort{Name: "q", Port: 12345, Protocol: "TCP"}) + }, + numErrs: 0, + }, + { + name: "valid cluster service without NodePort", + tweakSvc: func(s *api.Service) { + s.Spec.Type = api.ServiceTypeClusterIP + s.Spec.Ports = append(s.Spec.Ports, api.ServicePort{Name: "q", Port: 12345, Protocol: "TCP"}) + }, + numErrs: 0, + }, + { + name: "invalid cluster service with NodePort", + tweakSvc: func(s *api.Service) { + s.Spec.Type = api.ServiceTypeClusterIP + s.Spec.Ports = append(s.Spec.Ports, api.ServicePort{Name: "q", Port: 12345, Protocol: "TCP", NodePort: 12345}) + }, + numErrs: 1, + }, + { + name: "invalid public service with duplicate NodePort", + tweakSvc: func(s *api.Service) { + s.Spec.Type = api.ServiceTypeNodePort + s.Spec.Ports = append(s.Spec.Ports, api.ServicePort{Name: "p1", Port: 1, Protocol: "TCP", NodePort: 1}) + s.Spec.Ports = append(s.Spec.Ports, api.ServicePort{Name: "p2", Port: 2, Protocol: "TCP", NodePort: 1}) + }, + numErrs: 1, + }, + { + name: "valid type=LoadBalancer", + tweakSvc: func(s *api.Service) { + s.Spec.Type = api.ServiceTypeLoadBalancer + s.Spec.Ports = append(s.Spec.Ports, api.ServicePort{Name: "q", Port: 12345, Protocol: "TCP"}) + }, + numErrs: 0, }, } @@ -2511,6 +2599,13 @@ func TestValidateServiceUpdate(t *testing.T) { }, numErrs: 1, }, + { + name: "change type -> nodeport", + tweakSvc: func(oldSvc, newSvc *api.Service) { + newSvc.Spec.Type = api.ServiceTypeNodePort + }, + numErrs: 0, + }, } for _, tc := range testCases { diff --git a/pkg/cloudprovider/servicecontroller/servicecontroller_test.go b/pkg/cloudprovider/servicecontroller/servicecontroller_test.go index f7a1513100e..d2e75bd45a8 100644 --- a/pkg/cloudprovider/servicecontroller/servicecontroller_test.go +++ b/pkg/cloudprovider/servicecontroller/servicecontroller_test.go @@ -145,7 +145,7 @@ func TestUpdateNodesInExternalLoadBalancer(t *testing.T) { // Services do not have external load balancers: no calls should be made. services: []*api.Service{ newService("s0", "111", api.ServiceTypeClusterIP), - newService("s1", "222", api.ServiceTypeClusterIP), + newService("s1", "222", api.ServiceTypeNodePort), }, expectedUpdateCalls: nil, }, @@ -174,7 +174,7 @@ func TestUpdateNodesInExternalLoadBalancer(t *testing.T) { { // Two services have an external load balancer and two don't: two calls. services: []*api.Service{ - newService("s0", "777", api.ServiceTypeClusterIP), + newService("s0", "777", api.ServiceTypeNodePort), newService("s1", "888", api.ServiceTypeLoadBalancer), newService("s3", "999", api.ServiceTypeLoadBalancer), newService("s4", "123", api.ServiceTypeClusterIP), diff --git a/pkg/kubectl/service_test.go b/pkg/kubectl/service_test.go index e3c7682ef85..deaa87777a0 100644 --- a/pkg/kubectl/service_test.go +++ b/pkg/kubectl/service_test.go @@ -180,6 +180,67 @@ func TestGenerateService(t *testing.T) { }, }, }, + { + params: map[string]string{ + "selector": "foo=bar,baz=blah", + "name": "test", + "port": "80", + "protocol": "UDP", + "container-port": "foobar", + "type": string(api.ServiceTypeNodePort), + }, + expected: api.Service{ + ObjectMeta: api.ObjectMeta{ + Name: "test", + }, + Spec: api.ServiceSpec{ + Selector: map[string]string{ + "foo": "bar", + "baz": "blah", + }, + Ports: []api.ServicePort{ + { + Name: "default", + Port: 80, + Protocol: "UDP", + TargetPort: util.NewIntOrStringFromString("foobar"), + }, + }, + Type: api.ServiceTypeNodePort, + }, + }, + }, + { + params: map[string]string{ + "selector": "foo=bar,baz=blah", + "name": "test", + "port": "80", + "protocol": "UDP", + "container-port": "foobar", + "create-external-load-balancer": "true", // ignored when type is present + "type": string(api.ServiceTypeNodePort), + }, + expected: api.Service{ + ObjectMeta: api.ObjectMeta{ + Name: "test", + }, + Spec: api.ServiceSpec{ + Selector: map[string]string{ + "foo": "bar", + "baz": "blah", + }, + Ports: []api.ServicePort{ + { + Name: "default", + Port: 80, + Protocol: "UDP", + TargetPort: util.NewIntOrStringFromString("foobar"), + }, + }, + Type: api.ServiceTypeNodePort, + }, + }, + }, } generator := ServiceGenerator{} for _, test := range tests { diff --git a/pkg/proxy/proxier.go b/pkg/proxy/proxier.go index eeef23aeb4d..70530d7f0ad 100644 --- a/pkg/proxy/proxier.go +++ b/pkg/proxy/proxier.go @@ -287,10 +287,9 @@ func (proxier *Proxier) OnUpdate(services []api.Service) { } info.portalIP = serviceIP info.portalPort = servicePort.Port - // TODO(justinsb): switch to servicePort.NodePort when that lands - info.nodePort = 0 // Deep-copy in case the service instance changes info.loadBalancerStatus = *api.LoadBalancerStatusDeepCopy(&service.Status.LoadBalancer) + info.nodePort = servicePort.NodePort info.sessionAffinityType = service.Spec.SessionAffinity glog.V(4).Infof("info: %+v", info) @@ -319,8 +318,7 @@ func (proxier *Proxier) OnUpdate(services []api.Service) { } func sameConfig(info *serviceInfo, service *api.Service, port *api.ServicePort) bool { - // TODO(justinsb): switch to port.NodePort when that lands - if info.protocol != port.Protocol || info.portalPort != port.Port || info.nodePort != 0 /*port.NodePort*/ { + if info.protocol != port.Protocol || info.portalPort != port.Port || info.nodePort != port.NodePort { return false } if !info.portalIP.Equal(net.ParseIP(service.Spec.PortalIP)) { diff --git a/pkg/registry/service/portallocator/controller/repair.go b/pkg/registry/service/portallocator/controller/repair.go index ea760f6bfc0..5b8753b6d13 100644 --- a/pkg/registry/service/portallocator/controller/repair.go +++ b/pkg/registry/service/portallocator/controller/repair.go @@ -75,10 +75,9 @@ func (c *Repair) RunOnce() error { } r := portallocator.NewPortAllocator(c.portRange) - for _, svc := range list.Items { - ports := []int{} - - // TODO(justinsb): Collect NodePorts + for i := range list.Items { + svc := &list.Items[i] + ports := service.CollectServiceNodePorts(svc) if len(ports) == 0 { continue } diff --git a/pkg/registry/service/rest.go b/pkg/registry/service/rest.go index 8e7c23982bf..c61169d4c27 100644 --- a/pkg/registry/service/rest.go +++ b/pkg/registry/service/rest.go @@ -153,7 +153,7 @@ func (rs *REST) Delete(ctx api.Context, id string) (runtime.Object, error) { rs.portals.Release(net.ParseIP(service.Spec.PortalIP)) } - for _, nodePort := range collectServiceNodePorts(service) { + for _, nodePort := range CollectServiceNodePorts(service) { err := rs.serviceNodePorts.Release(nodePort) if err != nil { // these should be caught by an eventual reconciliation / restart @@ -223,7 +223,7 @@ func (rs *REST) Update(ctx api.Context, obj runtime.Object) (runtime.Object, boo assignNodePorts := shouldAssignNodePorts(service) - oldNodePorts := collectServiceNodePorts(oldService) + oldNodePorts := CollectServiceNodePorts(oldService) newNodePorts := []int{} if assignNodePorts { @@ -328,7 +328,7 @@ func contains(haystack []int, needle int) bool { return false } -func collectServiceNodePorts(service *api.Service) []int { +func CollectServiceNodePorts(service *api.Service) []int { servicePorts := []int{} for i := range service.Spec.Ports { servicePort := &service.Spec.Ports[i] @@ -340,17 +340,15 @@ func collectServiceNodePorts(service *api.Service) []int { } func shouldAssignNodePorts(service *api.Service) bool { - // TODO(justinsb): Switch on service.Spec.Type - // switch service.Spec.Type { - // case api.ServiceVisibilityLoadBalancer: - // return true - // case api.ServiceVisibilityNodePort: - // return true - // case api.ServiceVisibilityCluster: - // return false - // default: - // glog.Errorf("Unknown visibility value: %v", service.Spec.Visibility) - // return false - // } - return false + switch service.Spec.Type { + case api.ServiceTypeLoadBalancer: + return true + case api.ServiceTypeNodePort: + return true + case api.ServiceTypeClusterIP: + return false + default: + glog.Errorf("Unknown service type: %v", service.Spec.Type) + return false + } } diff --git a/test/e2e/service.go b/test/e2e/service.go index 3ec0d2df08e..91492b0e557 100644 --- a/test/e2e/service.go +++ b/test/e2e/service.go @@ -29,6 +29,8 @@ import ( "github.com/GoogleCloudPlatform/kubernetes/pkg/client" "github.com/GoogleCloudPlatform/kubernetes/pkg/util" + "github.com/GoogleCloudPlatform/kubernetes/pkg/fields" + "github.com/GoogleCloudPlatform/kubernetes/pkg/labels" . "github.com/onsi/ginkgo" . "github.com/onsi/gomega" ) @@ -351,6 +353,103 @@ var _ = Describe("Services", func() { } }) + It("should be able to create a functioning NodePort service", func() { + serviceName := "nodeportservice-test" + ns := namespaces[0] + labels := map[string]string{ + "testid": "nodeportservice-test", + } + service := &api.Service{ + ObjectMeta: api.ObjectMeta{ + Name: serviceName, + }, + Spec: api.ServiceSpec{ + Selector: labels, + Ports: []api.ServicePort{{ + Port: 80, + TargetPort: util.NewIntOrStringFromInt(80), + }}, + Type: api.ServiceTypeNodePort, + }, + } + + By("creating service " + serviceName + " with visibility=NodePort in namespace " + ns) + result, err := c.Services(ns).Create(service) + Expect(err).NotTo(HaveOccurred()) + defer func(ns, serviceName string) { // clean up when we're done + By("deleting service " + serviceName + " in namespace " + ns) + err := c.Services(ns).Delete(serviceName) + Expect(err).NotTo(HaveOccurred()) + }(ns, serviceName) + + if len(result.Spec.Ports) != 1 { + Failf("got unexpected number (%d) of Ports for NodePort service: %v", len(result.Spec.Ports), result) + } + + nodePort := result.Spec.Ports[0].NodePort + if nodePort == 0 { + Failf("got unexpected nodePort (%d) on Ports[0] for NodePort service: %v", nodePort, result) + } + + publicIps, err := getMinionPublicIps(c) + Expect(err).NotTo(HaveOccurred()) + if len(publicIps) == 0 { + Failf("got unexpected number (%d) of public IPs", len(publicIps)) + } + ip := publicIps[0] + + pod := &api.Pod{ + TypeMeta: api.TypeMeta{ + Kind: "Pod", + APIVersion: latest.Version, + }, + ObjectMeta: api.ObjectMeta{ + Name: "publicservice-test-" + string(util.NewUUID()), + Labels: labels, + }, + Spec: api.PodSpec{ + Containers: []api.Container{ + { + Name: "webserver", + Image: "gcr.io/google_containers/test-webserver", + }, + }, + }, + } + + By("creating pod to be part of service " + serviceName) + podClient := c.Pods(ns) + defer func() { + By("deleting pod " + pod.Name) + defer GinkgoRecover() + podClient.Delete(pod.Name, nil) + }() + if _, err := podClient.Create(pod); err != nil { + Failf("Failed to create pod %s: %v", pod.Name, err) + } + expectNoError(waitForPodRunningInNamespace(c, pod.Name, ns)) + + By("hitting the pod through the service's external IPs") + var resp *http.Response + for t := time.Now(); time.Since(t) < podStartTimeout; time.Sleep(5 * time.Second) { + resp, err = http.Get(fmt.Sprintf("http://%s:%d", ip, nodePort)) + if err == nil { + break + } + } + Expect(err).NotTo(HaveOccurred()) + defer resp.Body.Close() + + body, err := ioutil.ReadAll(resp.Body) + Expect(err).NotTo(HaveOccurred()) + if resp.StatusCode != 200 { + Failf("received non-success return status %q trying to access pod through public port; got body: %s", resp.Status, string(body)) + } + if !strings.Contains(string(body), "test-webserver") { + Failf("received response body without expected substring 'test-webserver': %s", string(body)) + } + }) + It("should correctly serve identically named services in different namespaces on different external IP addresses", func() { if !providerIs("gce", "gke") { By(fmt.Sprintf("Skipping service namespace collision test; uses ServiceTypeLoadBalancer, a (gce|gke) feature")) @@ -534,3 +633,30 @@ func addEndpointPodOrFail(c *client.Client, ns, name string, labels map[string]s _, err := c.Pods(ns).Create(pod) Expect(err).NotTo(HaveOccurred()) } + +func collectAddresses(nodes *api.NodeList, addressType api.NodeAddressType) []string { + ips := []string{} + for i := range nodes.Items { + item := &nodes.Items[i] + for j := range item.Status.Addresses { + nodeAddress := &item.Status.Addresses[j] + if nodeAddress.Type == addressType { + ips = append(ips, nodeAddress.Address) + } + } + } + return ips +} + +func getMinionPublicIps(c *client.Client) ([]string, error) { + nodes, err := c.Nodes().List(labels.Everything(), fields.Everything()) + if err != nil { + return nil, err + } + + ips := collectAddresses(nodes, api.NodeExternalIP) + if len(ips) == 0 { + ips = collectAddresses(nodes, api.NodeLegacyHostIP) + } + return ips, nil +}