diff --git a/pkg/registry/core/service/portallocator/operation.go b/pkg/registry/core/service/portallocator/operation.go index 08d9d587815..00dd7f03cf4 100644 --- a/pkg/registry/core/service/portallocator/operation.go +++ b/pkg/registry/core/service/portallocator/operation.go @@ -28,7 +28,7 @@ package portallocator // ... // write(updatedOwner) /// op.Commit() -type portAllocationOperation struct { +type PortAllocationOperation struct { pa Interface allocated []int releaseDeferred []int @@ -36,8 +36,8 @@ type portAllocationOperation struct { } // Creates a portAllocationOperation, tracking a set of allocations & releases -func StartOperation(pa Interface) *portAllocationOperation { - op := &portAllocationOperation{} +func StartOperation(pa Interface) *PortAllocationOperation { + op := &PortAllocationOperation{} op.pa = pa op.allocated = []int{} op.releaseDeferred = []int{} @@ -46,14 +46,14 @@ func StartOperation(pa Interface) *portAllocationOperation { } // Will rollback unless marked as shouldRollback = false by a Commit(). Call from a defer block -func (op *portAllocationOperation) Finish() { +func (op *PortAllocationOperation) Finish() { if op.shouldRollback { op.Rollback() } } // (Try to) undo any operations we did -func (op *portAllocationOperation) Rollback() []error { +func (op *PortAllocationOperation) Rollback() []error { errors := []error{} for _, allocated := range op.allocated { @@ -72,7 +72,7 @@ func (op *portAllocationOperation) Rollback() []error { // (Try to) perform any deferred operations. // Note that even if this fails, we don't rollback; we always want to err on the side of over-allocation, // and Commit should be called _after_ the owner is written -func (op *portAllocationOperation) Commit() []error { +func (op *PortAllocationOperation) Commit() []error { errors := []error{} for _, release := range op.releaseDeferred { @@ -94,7 +94,7 @@ func (op *portAllocationOperation) Commit() []error { } // Allocates a port, and record it for future rollback -func (op *portAllocationOperation) Allocate(port int) error { +func (op *PortAllocationOperation) Allocate(port int) error { err := op.pa.Allocate(port) if err == nil { op.allocated = append(op.allocated, port) @@ -103,7 +103,7 @@ func (op *portAllocationOperation) Allocate(port int) error { } // Allocates a port, and record it for future rollback -func (op *portAllocationOperation) AllocateNext() (int, error) { +func (op *PortAllocationOperation) AllocateNext() (int, error) { port, err := op.pa.AllocateNext() if err == nil { op.allocated = append(op.allocated, port) @@ -112,6 +112,6 @@ func (op *portAllocationOperation) AllocateNext() (int, error) { } // Marks a port so that it will be released if this operation Commits -func (op *portAllocationOperation) ReleaseDeferred(port int) { +func (op *PortAllocationOperation) ReleaseDeferred(port int) { op.releaseDeferred = append(op.releaseDeferred, port) } diff --git a/pkg/registry/core/service/rest.go b/pkg/registry/core/service/rest.go index a369708579d..576bc599918 100644 --- a/pkg/registry/core/service/rest.go +++ b/pkg/registry/core/service/rest.go @@ -357,58 +357,45 @@ func (rs *REST) Update(ctx genericapirequest.Context, name string, objInfo rest. return nil, false, errors.NewInvalid(api.Kind("Service"), service.Name, errs) } + // TODO: this should probably move to strategy.PrepareForCreate() + releaseServiceIP := false + defer func() { + if releaseServiceIP { + if helper.IsServiceIPSet(service) { + rs.serviceIPs.Release(net.ParseIP(service.Spec.ClusterIP)) + } + } + }() + nodePortOp := portallocator.StartOperation(rs.serviceNodePorts) defer nodePortOp.Finish() - assignNodePorts := shouldAssignNodePorts(service) - - oldNodePorts := CollectServiceNodePorts(oldService) - - newNodePorts := []int{} - if assignNodePorts { - for i := range service.Spec.Ports { - servicePort := &service.Spec.Ports[i] - nodePort := int(servicePort.NodePort) - if nodePort != 0 { - if !contains(oldNodePorts, nodePort) { - err := nodePortOp.Allocate(nodePort) - if err != nil { - el := field.ErrorList{field.Invalid(field.NewPath("spec", "ports").Index(i).Child("nodePort"), nodePort, err.Error())} - return nil, false, errors.NewInvalid(api.Kind("Service"), service.Name, el) - } - } - } else { - nodePort, err = nodePortOp.AllocateNext() - if err != nil { - // TODO: what error should be returned here? It's not a - // field-level validation failure (the field is valid), and it's - // not really an internal error. - return nil, false, errors.NewInternalError(fmt.Errorf("failed to allocate a nodePort: %v", err)) - } - servicePort.NodePort = int32(nodePort) - } - // Detect duplicate node ports; this should have been caught by validation, so we panic - if contains(newNodePorts, nodePort) { - panic("duplicate node port") - } - newNodePorts = append(newNodePorts, nodePort) + // Update service from ExternalName to non-ExternalName, should initialize ClusterIP. + if oldService.Spec.Type == api.ServiceTypeExternalName && service.Spec.Type != api.ServiceTypeExternalName { + if releaseServiceIP, err = rs.initClusterIP(service); err != nil { + return nil, false, err } - } else { - // Validate should have validated that nodePort == 0 } - - // The comparison loops are O(N^2), but we don't expect N to be huge - // (there's a hard-limit at 2^16, because they're ports; and even 4 ports would be a lot) - for _, oldNodePort := range oldNodePorts { - if contains(newNodePorts, oldNodePort) { - continue + // Update service from non-ExternalName to ExternalName, should release ClusterIP if exists. + if oldService.Spec.Type != api.ServiceTypeExternalName && service.Spec.Type == api.ServiceTypeExternalName { + if helper.IsServiceIPSet(oldService) { + rs.serviceIPs.Release(net.ParseIP(oldService.Spec.ClusterIP)) } - nodePortOp.ReleaseDeferred(oldNodePort) } - - // Remove any LoadBalancerStatus now if Type != LoadBalancer; - // although loadbalancer delete is actually asynchronous, we don't need to expose the user to that complexity. + // Update service from NodePort or LoadBalancer to ExternalName or ClusterIP, should release NodePort if exists. + if (oldService.Spec.Type == api.ServiceTypeNodePort || oldService.Spec.Type == api.ServiceTypeLoadBalancer) && + (service.Spec.Type == api.ServiceTypeExternalName || service.Spec.Type == api.ServiceTypeClusterIP) { + rs.releaseNodePort(oldService, nodePortOp) + } + // Update service from any type to NodePort or LoadBalancer, should update NodePort. + if service.Spec.Type == api.ServiceTypeNodePort || service.Spec.Type == api.ServiceTypeLoadBalancer { + if err := rs.updateNodePort(oldService, service, nodePortOp); err != nil { + return nil, false, err + } + } + // Update service from LoadBalancer to non-LoadBalancer, should remove any LoadBalancerStatus. if service.Spec.Type != api.ServiceTypeLoadBalancer { + // Although loadbalancer delete is actually asynchronous, we don't need to expose the user to that complexity. service.Status.LoadBalancer = api.LoadBalancerStatus{} } @@ -425,13 +412,14 @@ func (rs *REST) Update(ctx genericapirequest.Context, name string, objInfo rest. } out, err := rs.registry.UpdateService(ctx, service) - if err == nil { el := nodePortOp.Commit() if el != nil { // problems should be fixed by an eventual reconciliation / restart glog.Errorf("error(s) committing NodePorts changes: %v", el) } + + releaseServiceIP = false } return out, false, err @@ -570,3 +558,82 @@ func (rs *REST) allocateHealthCheckNodePort(service *api.Service) error { } return nil } + +// The return bool value indicates if the caller should release clusterIP before return +func (rs *REST) initClusterIP(service *api.Service) (bool, error) { + switch { + case service.Spec.ClusterIP == "": + // Allocate next available. + ip, err := rs.serviceIPs.AllocateNext() + if err != nil { + // TODO: what error should be returned here? It's not a + // field-level validation failure (the field is valid), and it's + // not really an internal error. + return false, errors.NewInternalError(fmt.Errorf("failed to allocate a serviceIP: %v", err)) + } + service.Spec.ClusterIP = ip.String() + return true, nil + case service.Spec.ClusterIP != api.ClusterIPNone && service.Spec.ClusterIP != "": + // Try to respect the requested IP. + if err := rs.serviceIPs.Allocate(net.ParseIP(service.Spec.ClusterIP)); err != nil { + // TODO: when validation becomes versioned, this gets more complicated. + el := field.ErrorList{field.Invalid(field.NewPath("spec", "clusterIP"), service.Spec.ClusterIP, err.Error())} + return false, errors.NewInvalid(api.Kind("Service"), service.Name, el) + } + return true, nil + } + + return false, nil +} + +func (rs *REST) updateNodePort(oldService, service *api.Service, nodePortOp *portallocator.PortAllocationOperation) error { + oldNodePorts := CollectServiceNodePorts(oldService) + + newNodePorts := []int{} + for i := range service.Spec.Ports { + servicePort := &service.Spec.Ports[i] + nodePort := int(servicePort.NodePort) + if nodePort != 0 { + if !contains(oldNodePorts, nodePort) { + err := nodePortOp.Allocate(nodePort) + if err != nil { + el := field.ErrorList{field.Invalid(field.NewPath("spec", "ports").Index(i).Child("nodePort"), nodePort, err.Error())} + return errors.NewInvalid(api.Kind("Service"), service.Name, el) + } + } + } else { + nodePort, err := nodePortOp.AllocateNext() + if err != nil { + // TODO: what error should be returned here? It's not a + // field-level validation failure (the field is valid), and it's + // not really an internal error. + return errors.NewInternalError(fmt.Errorf("failed to allocate a nodePort: %v", err)) + } + servicePort.NodePort = int32(nodePort) + } + // Detect duplicate node ports; this should have been caught by validation, so we panic + if contains(newNodePorts, nodePort) { + panic("duplicate node port") + } + newNodePorts = append(newNodePorts, nodePort) + } + + // The comparison loops are O(N^2), but we don't expect N to be huge + // (there's a hard-limit at 2^16, because they're ports; and even 4 ports would be a lot) + for _, oldNodePort := range oldNodePorts { + if contains(newNodePorts, oldNodePort) { + continue + } + nodePortOp.ReleaseDeferred(oldNodePort) + } + + return nil +} + +func (rs *REST) releaseNodePort(service *api.Service, nodePortOp *portallocator.PortAllocationOperation) { + nodePorts := CollectServiceNodePorts(service) + + for _, nodePort := range nodePorts { + nodePortOp.ReleaseDeferred(nodePort) + } +} diff --git a/test/e2e/dns.go b/test/e2e/dns.go index 82e2de5f17a..9f78b22ecdc 100644 --- a/test/e2e/dns.go +++ b/test/e2e/dns.go @@ -225,7 +225,6 @@ func assertFilesContain(fileNames []string, fileDir string, pod *v1.Pod, client } func validateDNSResults(f *framework.Framework, pod *v1.Pod, fileNames []string) { - By("submitting the pod to kubernetes") podClient := f.ClientSet.Core().Pods(f.Namespace.Name) defer func() { @@ -254,7 +253,6 @@ func validateDNSResults(f *framework.Framework, pod *v1.Pod, fileNames []string) } func validateTargetedProbeOutput(f *framework.Framework, pod *v1.Pod, fileNames []string, value string) { - By("submitting the pod to kubernetes") podClient := f.ClientSet.Core().Pods(f.Namespace.Name) defer func() { @@ -424,6 +422,10 @@ var _ = framework.KubeDescribe("DNS", func() { }) It("should provide DNS for ExternalName services", func() { + // TODO(xiangpengzhao): allow AWS when pull-kubernetes-e2e-kops-aws and pull-kubernetes-e2e-gce-etcd3 + // have the same "service-cluster-ip-range". See: https://github.com/kubernetes/kubernetes/issues/47224 + framework.SkipUnlessProviderIs("gce") + // Create a test ExternalName service. By("Creating a test externalName service") serviceName := "dns-test-service-3" @@ -469,7 +471,7 @@ var _ = framework.KubeDescribe("DNS", func() { By("changing the service to type=ClusterIP") _, err = framework.UpdateService(f.ClientSet, f.Namespace.Name, serviceName, func(s *v1.Service) { s.Spec.Type = v1.ServiceTypeClusterIP - s.Spec.ClusterIP = "127.1.2.3" + s.Spec.ClusterIP = "10.0.0.123" s.Spec.Ports = []v1.ServicePort{ {Port: 80, Name: "http", Protocol: "TCP"}, } @@ -484,6 +486,6 @@ var _ = framework.KubeDescribe("DNS", func() { By("creating a third pod to probe DNS") pod3 := createDNSPod(f.Namespace.Name, wheezyProbeCmd, jessieProbeCmd, true) - validateTargetedProbeOutput(f, pod3, []string{wheezyFileName, jessieFileName}, "127.1.2.3") + validateTargetedProbeOutput(f, pod3, []string{wheezyFileName, jessieFileName}, "10.0.0.123") }) }) diff --git a/test/e2e/framework/service_util.go b/test/e2e/framework/service_util.go index f0bb2162ee4..d123803db80 100644 --- a/test/e2e/framework/service_util.go +++ b/test/e2e/framework/service_util.go @@ -35,6 +35,7 @@ import ( "k8s.io/apimachinery/pkg/util/sets" "k8s.io/apimachinery/pkg/util/uuid" "k8s.io/apimachinery/pkg/util/wait" + "k8s.io/kubernetes/pkg/api" "k8s.io/kubernetes/pkg/client/clientset_generated/clientset" "k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset" "k8s.io/kubernetes/pkg/client/retry" @@ -177,6 +178,31 @@ func (j *ServiceTestJig) CreateUDPServiceOrFail(namespace string, tweak func(svc return result } +// CreateExternalNameServiceOrFail creates a new ExternalName type Service based on the jig's defaults. +// Callers can provide a function to tweak the Service object before it is created. +func (j *ServiceTestJig) CreateExternalNameServiceOrFail(namespace string, tweak func(svc *v1.Service)) *v1.Service { + svc := &v1.Service{ + ObjectMeta: metav1.ObjectMeta{ + Namespace: namespace, + Name: j.Name, + Labels: j.Labels, + }, + Spec: v1.ServiceSpec{ + Selector: j.Labels, + ExternalName: "foo.example.com", + Type: v1.ServiceTypeExternalName, + }, + } + if tweak != nil { + tweak(svc) + } + result, err := j.Client.Core().Services(namespace).Create(svc) + if err != nil { + Failf("Failed to create ExternalName Service %q: %v", svc.Name, err) + } + return result +} + func (j *ServiceTestJig) ChangeServiceType(namespace, name string, newType v1.ServiceType, timeout time.Duration) { ingressIP := "" svc := j.UpdateServiceOrFail(namespace, name, func(s *v1.Service) { @@ -373,8 +399,18 @@ func (j *ServiceTestJig) SanityCheckService(svc *v1.Service, svcType v1.ServiceT if svc.Spec.Type != svcType { Failf("unexpected Spec.Type (%s) for service, expected %s", svc.Spec.Type, svcType) } + + if svcType != v1.ServiceTypeExternalName { + if svc.Spec.ExternalName != "" { + Failf("unexpected Spec.ExternalName (%s) for service, expected empty", svc.Spec.ExternalName) + } + if svc.Spec.ClusterIP != api.ClusterIPNone && svc.Spec.ClusterIP == "" { + Failf("didn't get ClusterIP for non-ExternamName service") + } + } + expectNodePorts := false - if svcType != v1.ServiceTypeClusterIP { + if svcType != v1.ServiceTypeClusterIP && svcType != v1.ServiceTypeExternalName { expectNodePorts = true } for i, port := range svc.Spec.Ports { diff --git a/test/e2e/service.go b/test/e2e/service.go index 2165e43e039..60e8bb11406 100644 --- a/test/e2e/service.go +++ b/test/e2e/service.go @@ -789,6 +789,65 @@ var _ = framework.KubeDescribe("Services", func() { } }) + It("should be able to change the type from ExternalName to ClusterIP", func() { + serviceName := "externalname-service" + ns := f.Namespace.Name + jig := framework.NewServiceTestJig(cs, serviceName) + + By("creating a service " + serviceName + " with the type=ExternalName in namespace " + ns) + externalNameService := jig.CreateExternalNameServiceOrFail(ns, nil) + jig.SanityCheckService(externalNameService, v1.ServiceTypeExternalName) + By("changing the ExternalName service to type=ClusterIP") + clusterIPService := jig.UpdateServiceOrFail(ns, externalNameService.Name, func(s *v1.Service) { + s.Spec.Type = v1.ServiceTypeClusterIP + s.Spec.ExternalName = "" + s.Spec.Ports = []v1.ServicePort{ + {Port: 80, Name: "http", Protocol: "TCP"}, + } + }) + jig.SanityCheckService(clusterIPService, v1.ServiceTypeClusterIP) + }) + + It("should be able to change the type from ExternalName to NodePort", func() { + serviceName := "externalname-service" + ns := f.Namespace.Name + jig := framework.NewServiceTestJig(cs, serviceName) + + By("creating a service " + serviceName + " with the type=ExternalName in namespace " + ns) + externalNameService := jig.CreateExternalNameServiceOrFail(ns, nil) + jig.SanityCheckService(externalNameService, v1.ServiceTypeExternalName) + By("changing the ExternalName service to type=NodePort") + nodePortService := jig.UpdateServiceOrFail(ns, externalNameService.Name, func(s *v1.Service) { + s.Spec.Type = v1.ServiceTypeNodePort + s.Spec.ExternalName = "" + s.Spec.Ports = []v1.ServicePort{ + {Port: 80, Name: "http", Protocol: "TCP"}, + } + }) + jig.SanityCheckService(nodePortService, v1.ServiceTypeNodePort) + }) + + It("should be able to change the type from ExternalName to LoadBalancer", func() { + serviceName := "externalname-service" + ns := f.Namespace.Name + loadBalancerCreateTimeout := framework.LoadBalancerCreateTimeoutDefault + jig := framework.NewServiceTestJig(cs, serviceName) + + By("creating a service " + serviceName + " with the type=ExternalName in namespace " + ns) + externalNameService := jig.CreateExternalNameServiceOrFail(ns, nil) + jig.SanityCheckService(externalNameService, v1.ServiceTypeExternalName) + By("changing the ExternalName service to type=LoadBalancer") + loadBalancerIPService := jig.UpdateServiceOrFail(ns, externalNameService.Name, func(s *v1.Service) { + s.Spec.Type = v1.ServiceTypeLoadBalancer + s.Spec.ExternalName = "" + s.Spec.Ports = []v1.ServicePort{ + {Port: 80, Name: "http", Protocol: "TCP"}, + } + }) + loadBalancerIPService = jig.WaitForLoadBalancerOrFail(ns, loadBalancerIPService.Name, loadBalancerCreateTimeout) + jig.SanityCheckService(loadBalancerIPService, v1.ServiceTypeLoadBalancer) + }) + It("should use same NodePort with same port but different protocols", func() { serviceName := "nodeports" ns := f.Namespace.Name @@ -866,7 +925,7 @@ var _ = framework.KubeDescribe("Services", func() { } port := result.Spec.Ports[0] if port.NodePort == 0 { - framework.Failf("got unexpected Spec.Ports[0].nodePort for new service: %v", result) + framework.Failf("got unexpected Spec.Ports[0].NodePort for new service: %v", result) } By("creating service " + serviceName2 + " with conflicting NodePort")