From dc7fb0c9e5912c3221b6c1f2f8c3d294fd708cb6 Mon Sep 17 00:00:00 2001 From: xiangpengzhao Date: Mon, 3 Jul 2017 16:35:09 +0800 Subject: [PATCH 1/3] Use helper to init ClusterIP and NodePort in Create of service --- pkg/registry/core/service/rest.go | 114 ++++++++++++++---------------- 1 file changed, 55 insertions(+), 59 deletions(-) diff --git a/pkg/registry/core/service/rest.go b/pkg/registry/core/service/rest.go index 9195e9c5e5e..a5a9f9626c4 100644 --- a/pkg/registry/core/service/rest.go +++ b/pkg/registry/core/service/rest.go @@ -95,71 +95,19 @@ func (rs *REST) Create(ctx genericapirequest.Context, obj runtime.Object, includ } }() - if helper.IsServiceIPRequested(service) { - // Allocate next available. - ip, err := rs.serviceIPs.AllocateNext() - if err != nil { - // TODO: what error should be returned here? It's not a - // field-level validation failure (the field is valid), and it's - // not really an internal error. - return nil, errors.NewInternalError(fmt.Errorf("failed to allocate a serviceIP: %v", err)) + var err error + if service.Spec.Type != api.ServiceTypeExternalName { + if releaseServiceIP, err = rs.initClusterIP(service); err != nil { + return nil, err } - service.Spec.ClusterIP = ip.String() - releaseServiceIP = true - } else if helper.IsServiceIPSet(service) { - // Try to respect the requested IP. - if err := rs.serviceIPs.Allocate(net.ParseIP(service.Spec.ClusterIP)); err != nil { - // TODO: when validation becomes versioned, this gets more complicated. - el := field.ErrorList{field.Invalid(field.NewPath("spec", "clusterIP"), service.Spec.ClusterIP, err.Error())} - return nil, errors.NewInvalid(api.Kind("Service"), service.Name, el) - } - releaseServiceIP = true } nodePortOp := portallocator.StartOperation(rs.serviceNodePorts) defer nodePortOp.Finish() - assignNodePorts := shouldAssignNodePorts(service) - svcPortToNodePort := map[int]int{} - for i := range service.Spec.Ports { - servicePort := &service.Spec.Ports[i] - allocatedNodePort := svcPortToNodePort[int(servicePort.Port)] - if allocatedNodePort == 0 { - // This will only scan forward in the service.Spec.Ports list because any matches - // before the current port would have been found in svcPortToNodePort. This is really - // looking for any user provided values. - np := findRequestedNodePort(int(servicePort.Port), service.Spec.Ports) - if np != 0 { - err := nodePortOp.Allocate(np) - if err != nil { - // TODO: when validation becomes versioned, this gets more complicated. - el := field.ErrorList{field.Invalid(field.NewPath("spec", "ports").Index(i).Child("nodePort"), np, err.Error())} - return nil, errors.NewInvalid(api.Kind("Service"), service.Name, el) - } - servicePort.NodePort = int32(np) - svcPortToNodePort[int(servicePort.Port)] = np - } else if assignNodePorts { - nodePort, err := nodePortOp.AllocateNext() - if err != nil { - // TODO: what error should be returned here? It's not a - // field-level validation failure (the field is valid), and it's - // not really an internal error. - return nil, errors.NewInternalError(fmt.Errorf("failed to allocate a nodePort: %v", err)) - } - servicePort.NodePort = int32(nodePort) - svcPortToNodePort[int(servicePort.Port)] = nodePort - } - } else if int(servicePort.NodePort) != allocatedNodePort { - if servicePort.NodePort == 0 { - servicePort.NodePort = int32(allocatedNodePort) - } else { - err := nodePortOp.Allocate(int(servicePort.NodePort)) - if err != nil { - // TODO: when validation becomes versioned, this gets more complicated. - el := field.ErrorList{field.Invalid(field.NewPath("spec", "ports").Index(i).Child("nodePort"), servicePort.NodePort, err.Error())} - return nil, errors.NewInvalid(api.Kind("Service"), service.Name, el) - } - } + if service.Spec.Type == api.ServiceTypeNodePort || service.Spec.Type == api.ServiceTypeLoadBalancer { + if err := rs.initNodePort(service, nodePortOp); err != nil { + return nil, err } } @@ -586,6 +534,54 @@ func (rs *REST) initClusterIP(service *api.Service) (bool, error) { return false, nil } +func (rs *REST) initNodePort(service *api.Service, nodePortOp *portallocator.PortAllocationOperation) error { + assignNodePorts := shouldAssignNodePorts(service) + svcPortToNodePort := map[int]int{} + for i := range service.Spec.Ports { + servicePort := &service.Spec.Ports[i] + allocatedNodePort := svcPortToNodePort[int(servicePort.Port)] + if allocatedNodePort == 0 { + // This will only scan forward in the service.Spec.Ports list because any matches + // before the current port would have been found in svcPortToNodePort. This is really + // looking for any user provided values. + np := findRequestedNodePort(int(servicePort.Port), service.Spec.Ports) + if np != 0 { + err := nodePortOp.Allocate(np) + if err != nil { + // TODO: when validation becomes versioned, this gets more complicated. + el := field.ErrorList{field.Invalid(field.NewPath("spec", "ports").Index(i).Child("nodePort"), np, err.Error())} + return errors.NewInvalid(api.Kind("Service"), service.Name, el) + } + servicePort.NodePort = int32(np) + svcPortToNodePort[int(servicePort.Port)] = np + } else if assignNodePorts { + nodePort, err := nodePortOp.AllocateNext() + if err != nil { + // TODO: what error should be returned here? It's not a + // field-level validation failure (the field is valid), and it's + // not really an internal error. + return errors.NewInternalError(fmt.Errorf("failed to allocate a nodePort: %v", err)) + } + servicePort.NodePort = int32(nodePort) + svcPortToNodePort[int(servicePort.Port)] = nodePort + } + } else if int(servicePort.NodePort) != allocatedNodePort { + if servicePort.NodePort == 0 { + servicePort.NodePort = int32(allocatedNodePort) + } else { + err := nodePortOp.Allocate(int(servicePort.NodePort)) + if err != nil { + // TODO: when validation becomes versioned, this gets more complicated. + el := field.ErrorList{field.Invalid(field.NewPath("spec", "ports").Index(i).Child("nodePort"), servicePort.NodePort, err.Error())} + return errors.NewInvalid(api.Kind("Service"), service.Name, el) + } + } + } + } + + return nil +} + func (rs *REST) updateNodePort(oldService, newService *api.Service, nodePortOp *portallocator.PortAllocationOperation) error { oldNodePorts := CollectServiceNodePorts(oldService) From 895da2cd4950603235bc77fd7c1efe65cda03f12 Mon Sep 17 00:00:00 2001 From: xiangpengzhao Date: Thu, 6 Jul 2017 15:16:38 +0800 Subject: [PATCH 2/3] Remove shouldAssignNodePorts logic in initNodePort; add test cases. --- pkg/registry/core/service/rest.go | 33 ++--- pkg/registry/core/service/rest_test.go | 183 ++++++++++++++++++++++++- 2 files changed, 190 insertions(+), 26 deletions(-) diff --git a/pkg/registry/core/service/rest.go b/pkg/registry/core/service/rest.go index a5a9f9626c4..5e14617cefa 100644 --- a/pkg/registry/core/service/rest.go +++ b/pkg/registry/core/service/rest.go @@ -106,7 +106,7 @@ func (rs *REST) Create(ctx genericapirequest.Context, obj runtime.Object, includ defer nodePortOp.Finish() if service.Spec.Type == api.ServiceTypeNodePort || service.Spec.Type == api.ServiceTypeLoadBalancer { - if err := rs.initNodePort(service, nodePortOp); err != nil { + if err := rs.initNodePorts(service, nodePortOp); err != nil { return nil, err } } @@ -333,11 +333,11 @@ func (rs *REST) Update(ctx genericapirequest.Context, name string, objInfo rest. // Update service from NodePort or LoadBalancer to ExternalName or ClusterIP, should release NodePort if exists. if (oldService.Spec.Type == api.ServiceTypeNodePort || oldService.Spec.Type == api.ServiceTypeLoadBalancer) && (service.Spec.Type == api.ServiceTypeExternalName || service.Spec.Type == api.ServiceTypeClusterIP) { - rs.releaseNodePort(oldService, nodePortOp) + rs.releaseNodePorts(oldService, nodePortOp) } // Update service from any type to NodePort or LoadBalancer, should update NodePort. if service.Spec.Type == api.ServiceTypeNodePort || service.Spec.Type == api.ServiceTypeLoadBalancer { - if err := rs.updateNodePort(oldService, service, nodePortOp); err != nil { + if err := rs.updateNodePorts(oldService, service, nodePortOp); err != nil { return nil, false, err } } @@ -456,22 +456,6 @@ func CollectServiceNodePorts(service *api.Service) []int { return servicePorts } -func shouldAssignNodePorts(service *api.Service) bool { - switch service.Spec.Type { - case api.ServiceTypeLoadBalancer: - return true - case api.ServiceTypeNodePort: - return true - case api.ServiceTypeClusterIP: - return false - case api.ServiceTypeExternalName: - return false - default: - glog.Errorf("Unknown service type: %v", service.Spec.Type) - return false - } -} - // Loop through the service ports list, find one with the same port number and // NodePort specified, return this NodePort otherwise return 0. func findRequestedNodePort(port int, servicePorts []api.ServicePort) int { @@ -534,8 +518,7 @@ func (rs *REST) initClusterIP(service *api.Service) (bool, error) { return false, nil } -func (rs *REST) initNodePort(service *api.Service, nodePortOp *portallocator.PortAllocationOperation) error { - assignNodePorts := shouldAssignNodePorts(service) +func (rs *REST) initNodePorts(service *api.Service, nodePortOp *portallocator.PortAllocationOperation) error { svcPortToNodePort := map[int]int{} for i := range service.Spec.Ports { servicePort := &service.Spec.Ports[i] @@ -554,7 +537,7 @@ func (rs *REST) initNodePort(service *api.Service, nodePortOp *portallocator.Por } servicePort.NodePort = int32(np) svcPortToNodePort[int(servicePort.Port)] = np - } else if assignNodePorts { + } else { nodePort, err := nodePortOp.AllocateNext() if err != nil { // TODO: what error should be returned here? It's not a @@ -566,6 +549,8 @@ func (rs *REST) initNodePort(service *api.Service, nodePortOp *portallocator.Por svcPortToNodePort[int(servicePort.Port)] = nodePort } } else if int(servicePort.NodePort) != allocatedNodePort { + // TODO(xiangpengzhao): do we need to allocate a new NodePort in this case? + // Note: the current implementation is better, because it saves a NodePort. if servicePort.NodePort == 0 { servicePort.NodePort = int32(allocatedNodePort) } else { @@ -582,7 +567,7 @@ func (rs *REST) initNodePort(service *api.Service, nodePortOp *portallocator.Por return nil } -func (rs *REST) updateNodePort(oldService, newService *api.Service, nodePortOp *portallocator.PortAllocationOperation) error { +func (rs *REST) updateNodePorts(oldService, newService *api.Service, nodePortOp *portallocator.PortAllocationOperation) error { oldNodePorts := CollectServiceNodePorts(oldService) newNodePorts := []int{} @@ -626,7 +611,7 @@ func (rs *REST) updateNodePort(oldService, newService *api.Service, nodePortOp * return nil } -func (rs *REST) releaseNodePort(service *api.Service, nodePortOp *portallocator.PortAllocationOperation) { +func (rs *REST) releaseNodePorts(service *api.Service, nodePortOp *portallocator.PortAllocationOperation) { nodePorts := CollectServiceNodePorts(service) for _, nodePort := range nodePorts { diff --git a/pkg/registry/core/service/rest_test.go b/pkg/registry/core/service/rest_test.go index 100eadb26c5..6b3093e3a35 100644 --- a/pkg/registry/core/service/rest_test.go +++ b/pkg/registry/core/service/rest_test.go @@ -1362,7 +1362,186 @@ func TestInitClusterIP(t *testing.T) { } } -func TestUpdateNodePort(t *testing.T) { +func TestInitNodePorts(t *testing.T) { + storage, _ := NewTestREST(t, nil) + nodePortOp := portallocator.StartOperation(storage.serviceNodePorts) + defer nodePortOp.Finish() + + testCases := []struct { + name string + service *api.Service + expectSpecifiedNodePorts []int + }{ + { + name: "Service doesn't have specified NodePort", + service: &api.Service{ + ObjectMeta: metav1.ObjectMeta{Name: "foo"}, + Spec: api.ServiceSpec{ + Selector: map[string]string{"bar": "baz"}, + Type: api.ServiceTypeNodePort, + Ports: []api.ServicePort{ + { + Name: "port-tcp", + Port: 53, + TargetPort: intstr.FromInt(6502), + Protocol: api.ProtocolTCP, + }, + }, + }, + }, + expectSpecifiedNodePorts: []int{}, + }, + { + name: "Service has one specified NodePort", + service: &api.Service{ + ObjectMeta: metav1.ObjectMeta{Name: "foo"}, + Spec: api.ServiceSpec{ + Selector: map[string]string{"bar": "baz"}, + Type: api.ServiceTypeNodePort, + Ports: []api.ServicePort{{ + Name: "port-tcp", + Port: 53, + TargetPort: intstr.FromInt(6502), + Protocol: api.ProtocolTCP, + NodePort: 30053, + }}, + }, + }, + expectSpecifiedNodePorts: []int{30053}, + }, + { + name: "Service has two same ports with different protocols and specifies same NodePorts", + service: &api.Service{ + ObjectMeta: metav1.ObjectMeta{Name: "foo"}, + Spec: api.ServiceSpec{ + Selector: map[string]string{"bar": "baz"}, + Type: api.ServiceTypeNodePort, + Ports: []api.ServicePort{ + { + Name: "port-tcp", + Port: 53, + TargetPort: intstr.FromInt(6502), + Protocol: api.ProtocolTCP, + NodePort: 30054, + }, + { + Name: "port-udp", + Port: 53, + TargetPort: intstr.FromInt(6502), + Protocol: api.ProtocolUDP, + NodePort: 30054, + }, + }, + }, + }, + expectSpecifiedNodePorts: []int{30054, 30054}, + }, + { + name: "Service has two same ports with different protocols and specifies different NodePorts", + service: &api.Service{ + ObjectMeta: metav1.ObjectMeta{Name: "foo"}, + Spec: api.ServiceSpec{ + Selector: map[string]string{"bar": "baz"}, + Type: api.ServiceTypeNodePort, + Ports: []api.ServicePort{ + { + Name: "port-tcp", + Port: 53, + TargetPort: intstr.FromInt(6502), + Protocol: api.ProtocolTCP, + NodePort: 30055, + }, + { + Name: "port-udp", + Port: 53, + TargetPort: intstr.FromInt(6502), + Protocol: api.ProtocolUDP, + NodePort: 30056, + }, + }, + }, + }, + expectSpecifiedNodePorts: []int{30055, 30056}, + }, + { + name: "Service has two different ports with different protocols and specifies different NodePorts", + service: &api.Service{ + ObjectMeta: metav1.ObjectMeta{Name: "foo"}, + Spec: api.ServiceSpec{ + Selector: map[string]string{"bar": "baz"}, + Type: api.ServiceTypeNodePort, + Ports: []api.ServicePort{ + { + Name: "port-tcp", + Port: 53, + TargetPort: intstr.FromInt(6502), + Protocol: api.ProtocolTCP, + NodePort: 30057, + }, + { + Name: "port-udp", + Port: 54, + TargetPort: intstr.FromInt(6502), + Protocol: api.ProtocolUDP, + NodePort: 30058, + }, + }, + }, + }, + expectSpecifiedNodePorts: []int{30057, 30058}, + }, + { + name: "Service has two same ports with different protocols but only specifies one NodePort", + service: &api.Service{ + ObjectMeta: metav1.ObjectMeta{Name: "foo"}, + Spec: api.ServiceSpec{ + Selector: map[string]string{"bar": "baz"}, + Type: api.ServiceTypeNodePort, + Ports: []api.ServicePort{ + { + Name: "port-tcp", + Port: 53, + TargetPort: intstr.FromInt(6502), + Protocol: api.ProtocolTCP, + NodePort: 30059, + }, + { + Name: "port-udp", + Port: 53, + TargetPort: intstr.FromInt(6502), + Protocol: api.ProtocolUDP, + }, + }, + }, + }, + expectSpecifiedNodePorts: []int{30059, 30059}, + }, + } + + for _, test := range testCases { + err := storage.initNodePorts(test.service, nodePortOp) + if err != nil { + t.Errorf("%q: unexpected error: %v", test.name, err) + continue + } + _ = nodePortOp.Commit() + + serviceNodePorts := CollectServiceNodePorts(test.service) + + if len(test.expectSpecifiedNodePorts) == 0 { + for _, nodePort := range serviceNodePorts { + if !storage.serviceNodePorts.Has(nodePort) { + t.Errorf("%q: unexpected NodePort %d, out of range", test.name, nodePort) + } + } + } else if !reflect.DeepEqual(serviceNodePorts, test.expectSpecifiedNodePorts) { + t.Errorf("%q: expected NodePorts %v, but got %v", test.name, test.expectSpecifiedNodePorts, serviceNodePorts) + } + + } +} + +func TestUpdateNodePorts(t *testing.T) { storage, _ := NewTestREST(t, nil) nodePortOp := portallocator.StartOperation(storage.serviceNodePorts) defer nodePortOp.Finish() @@ -1521,7 +1700,7 @@ func TestUpdateNodePort(t *testing.T) { } for _, test := range testCases { - err := storage.updateNodePort(test.oldService, test.newService, nodePortOp) + err := storage.updateNodePorts(test.oldService, test.newService, nodePortOp) if err != nil { t.Errorf("%q: unexpected error: %v", test.name, err) continue From 472e10faf9512b891a8d4cafa77ad42c2f8e72aa Mon Sep 17 00:00:00 2001 From: xiangpengzhao Date: Fri, 7 Jul 2017 12:00:24 +0800 Subject: [PATCH 3/3] Fix ClusterIP leak flake and potential NodePort leak --- pkg/registry/core/service/BUILD | 1 + pkg/registry/core/service/rest_test.go | 9 +++++++-- 2 files changed, 8 insertions(+), 2 deletions(-) diff --git a/pkg/registry/core/service/BUILD b/pkg/registry/core/service/BUILD index 96159cc2b40..dd14cf29a46 100644 --- a/pkg/registry/core/service/BUILD +++ b/pkg/registry/core/service/BUILD @@ -59,6 +59,7 @@ go_test( tags = ["automanaged"], deps = [ "//pkg/api:go_default_library", + "//pkg/api/helper:go_default_library", "//pkg/api/service:go_default_library", "//pkg/api/testing:go_default_library", "//pkg/features:go_default_library", diff --git a/pkg/registry/core/service/rest_test.go b/pkg/registry/core/service/rest_test.go index 6b3093e3a35..4ea0a6ef814 100644 --- a/pkg/registry/core/service/rest_test.go +++ b/pkg/registry/core/service/rest_test.go @@ -34,6 +34,7 @@ import ( "k8s.io/apiserver/pkg/registry/rest" utilfeature "k8s.io/apiserver/pkg/util/feature" "k8s.io/kubernetes/pkg/api" + "k8s.io/kubernetes/pkg/api/helper" "k8s.io/kubernetes/pkg/api/service" "k8s.io/kubernetes/pkg/features" "k8s.io/kubernetes/pkg/registry/core/service/ipallocator" @@ -1359,6 +1360,12 @@ func TestInitClusterIP(t *testing.T) { if test.name == "Allocate specified ClusterIP" && test.svc.Spec.ClusterIP != "1.2.3.4" { t.Errorf("%q: expected ClusterIP %q, but got %q", test.name, "1.2.3.4", test.svc.Spec.ClusterIP) } + + if hasAllocatedIP { + if helper.IsServiceIPSet(test.svc) { + storage.serviceIPs.Release(net.ParseIP(test.svc.Spec.ClusterIP)) + } + } } } @@ -1524,7 +1531,6 @@ func TestInitNodePorts(t *testing.T) { t.Errorf("%q: unexpected error: %v", test.name, err) continue } - _ = nodePortOp.Commit() serviceNodePorts := CollectServiceNodePorts(test.service) @@ -1705,7 +1711,6 @@ func TestUpdateNodePorts(t *testing.T) { t.Errorf("%q: unexpected error: %v", test.name, err) continue } - _ = nodePortOp.Commit() serviceNodePorts := CollectServiceNodePorts(test.newService)