Allocate clusterIP when change service type from ExternalName to ClusterIP

This commit is contained in:
xiangpengzhao 2017-05-22 14:23:23 +08:00
parent 18362beb0d
commit 4e9abca6d0
5 changed files with 224 additions and 60 deletions

View File

@ -28,7 +28,7 @@ package portallocator
// ...
// write(updatedOwner)
/// op.Commit()
type portAllocationOperation struct {
type PortAllocationOperation struct {
pa Interface
allocated []int
releaseDeferred []int
@ -36,8 +36,8 @@ type portAllocationOperation struct {
}
// Creates a portAllocationOperation, tracking a set of allocations & releases
func StartOperation(pa Interface) *portAllocationOperation {
op := &portAllocationOperation{}
func StartOperation(pa Interface) *PortAllocationOperation {
op := &PortAllocationOperation{}
op.pa = pa
op.allocated = []int{}
op.releaseDeferred = []int{}
@ -46,14 +46,14 @@ func StartOperation(pa Interface) *portAllocationOperation {
}
// Will rollback unless marked as shouldRollback = false by a Commit(). Call from a defer block
func (op *portAllocationOperation) Finish() {
func (op *PortAllocationOperation) Finish() {
if op.shouldRollback {
op.Rollback()
}
}
// (Try to) undo any operations we did
func (op *portAllocationOperation) Rollback() []error {
func (op *PortAllocationOperation) Rollback() []error {
errors := []error{}
for _, allocated := range op.allocated {
@ -72,7 +72,7 @@ func (op *portAllocationOperation) Rollback() []error {
// (Try to) perform any deferred operations.
// Note that even if this fails, we don't rollback; we always want to err on the side of over-allocation,
// and Commit should be called _after_ the owner is written
func (op *portAllocationOperation) Commit() []error {
func (op *PortAllocationOperation) Commit() []error {
errors := []error{}
for _, release := range op.releaseDeferred {
@ -94,7 +94,7 @@ func (op *portAllocationOperation) Commit() []error {
}
// Allocates a port, and record it for future rollback
func (op *portAllocationOperation) Allocate(port int) error {
func (op *PortAllocationOperation) Allocate(port int) error {
err := op.pa.Allocate(port)
if err == nil {
op.allocated = append(op.allocated, port)
@ -103,7 +103,7 @@ func (op *portAllocationOperation) Allocate(port int) error {
}
// Allocates a port, and record it for future rollback
func (op *portAllocationOperation) AllocateNext() (int, error) {
func (op *PortAllocationOperation) AllocateNext() (int, error) {
port, err := op.pa.AllocateNext()
if err == nil {
op.allocated = append(op.allocated, port)
@ -112,6 +112,6 @@ func (op *portAllocationOperation) AllocateNext() (int, error) {
}
// Marks a port so that it will be released if this operation Commits
func (op *portAllocationOperation) ReleaseDeferred(port int) {
func (op *PortAllocationOperation) ReleaseDeferred(port int) {
op.releaseDeferred = append(op.releaseDeferred, port)
}

View File

@ -357,58 +357,45 @@ func (rs *REST) Update(ctx genericapirequest.Context, name string, objInfo rest.
return nil, false, errors.NewInvalid(api.Kind("Service"), service.Name, errs)
}
// TODO: this should probably move to strategy.PrepareForCreate()
releaseServiceIP := false
defer func() {
if releaseServiceIP {
if helper.IsServiceIPSet(service) {
rs.serviceIPs.Release(net.ParseIP(service.Spec.ClusterIP))
}
}
}()
nodePortOp := portallocator.StartOperation(rs.serviceNodePorts)
defer nodePortOp.Finish()
assignNodePorts := shouldAssignNodePorts(service)
oldNodePorts := CollectServiceNodePorts(oldService)
newNodePorts := []int{}
if assignNodePorts {
for i := range service.Spec.Ports {
servicePort := &service.Spec.Ports[i]
nodePort := int(servicePort.NodePort)
if nodePort != 0 {
if !contains(oldNodePorts, nodePort) {
err := nodePortOp.Allocate(nodePort)
if err != nil {
el := field.ErrorList{field.Invalid(field.NewPath("spec", "ports").Index(i).Child("nodePort"), nodePort, err.Error())}
return nil, false, errors.NewInvalid(api.Kind("Service"), service.Name, el)
}
}
} else {
nodePort, err = nodePortOp.AllocateNext()
if err != nil {
// TODO: what error should be returned here? It's not a
// field-level validation failure (the field is valid), and it's
// not really an internal error.
return nil, false, errors.NewInternalError(fmt.Errorf("failed to allocate a nodePort: %v", err))
}
servicePort.NodePort = int32(nodePort)
}
// Detect duplicate node ports; this should have been caught by validation, so we panic
if contains(newNodePorts, nodePort) {
panic("duplicate node port")
}
newNodePorts = append(newNodePorts, nodePort)
// Update service from ExternalName to non-ExternalName, should initialize ClusterIP.
if oldService.Spec.Type == api.ServiceTypeExternalName && service.Spec.Type != api.ServiceTypeExternalName {
if releaseServiceIP, err = rs.initClusterIP(service); err != nil {
return nil, false, err
}
} else {
// Validate should have validated that nodePort == 0
}
// The comparison loops are O(N^2), but we don't expect N to be huge
// (there's a hard-limit at 2^16, because they're ports; and even 4 ports would be a lot)
for _, oldNodePort := range oldNodePorts {
if contains(newNodePorts, oldNodePort) {
continue
// Update service from non-ExternalName to ExternalName, should release ClusterIP if exists.
if oldService.Spec.Type != api.ServiceTypeExternalName && service.Spec.Type == api.ServiceTypeExternalName {
if helper.IsServiceIPSet(oldService) {
rs.serviceIPs.Release(net.ParseIP(oldService.Spec.ClusterIP))
}
nodePortOp.ReleaseDeferred(oldNodePort)
}
// Remove any LoadBalancerStatus now if Type != LoadBalancer;
// although loadbalancer delete is actually asynchronous, we don't need to expose the user to that complexity.
// Update service from NodePort or LoadBalancer to ExternalName or ClusterIP, should release NodePort if exists.
if (oldService.Spec.Type == api.ServiceTypeNodePort || oldService.Spec.Type == api.ServiceTypeLoadBalancer) &&
(service.Spec.Type == api.ServiceTypeExternalName || service.Spec.Type == api.ServiceTypeClusterIP) {
rs.releaseNodePort(oldService, nodePortOp)
}
// Update service from any type to NodePort or LoadBalancer, should update NodePort.
if service.Spec.Type == api.ServiceTypeNodePort || service.Spec.Type == api.ServiceTypeLoadBalancer {
if err := rs.updateNodePort(oldService, service, nodePortOp); err != nil {
return nil, false, err
}
}
// Update service from LoadBalancer to non-LoadBalancer, should remove any LoadBalancerStatus.
if service.Spec.Type != api.ServiceTypeLoadBalancer {
// Although loadbalancer delete is actually asynchronous, we don't need to expose the user to that complexity.
service.Status.LoadBalancer = api.LoadBalancerStatus{}
}
@ -425,13 +412,14 @@ func (rs *REST) Update(ctx genericapirequest.Context, name string, objInfo rest.
}
out, err := rs.registry.UpdateService(ctx, service)
if err == nil {
el := nodePortOp.Commit()
if el != nil {
// problems should be fixed by an eventual reconciliation / restart
glog.Errorf("error(s) committing NodePorts changes: %v", el)
}
releaseServiceIP = false
}
return out, false, err
@ -570,3 +558,82 @@ func (rs *REST) allocateHealthCheckNodePort(service *api.Service) error {
}
return nil
}
// The return bool value indicates if the caller should release clusterIP before return
func (rs *REST) initClusterIP(service *api.Service) (bool, error) {
switch {
case service.Spec.ClusterIP == "":
// Allocate next available.
ip, err := rs.serviceIPs.AllocateNext()
if err != nil {
// TODO: what error should be returned here? It's not a
// field-level validation failure (the field is valid), and it's
// not really an internal error.
return false, errors.NewInternalError(fmt.Errorf("failed to allocate a serviceIP: %v", err))
}
service.Spec.ClusterIP = ip.String()
return true, nil
case service.Spec.ClusterIP != api.ClusterIPNone && service.Spec.ClusterIP != "":
// Try to respect the requested IP.
if err := rs.serviceIPs.Allocate(net.ParseIP(service.Spec.ClusterIP)); err != nil {
// TODO: when validation becomes versioned, this gets more complicated.
el := field.ErrorList{field.Invalid(field.NewPath("spec", "clusterIP"), service.Spec.ClusterIP, err.Error())}
return false, errors.NewInvalid(api.Kind("Service"), service.Name, el)
}
return true, nil
}
return false, nil
}
func (rs *REST) updateNodePort(oldService, service *api.Service, nodePortOp *portallocator.PortAllocationOperation) error {
oldNodePorts := CollectServiceNodePorts(oldService)
newNodePorts := []int{}
for i := range service.Spec.Ports {
servicePort := &service.Spec.Ports[i]
nodePort := int(servicePort.NodePort)
if nodePort != 0 {
if !contains(oldNodePorts, nodePort) {
err := nodePortOp.Allocate(nodePort)
if err != nil {
el := field.ErrorList{field.Invalid(field.NewPath("spec", "ports").Index(i).Child("nodePort"), nodePort, err.Error())}
return errors.NewInvalid(api.Kind("Service"), service.Name, el)
}
}
} else {
nodePort, err := nodePortOp.AllocateNext()
if err != nil {
// TODO: what error should be returned here? It's not a
// field-level validation failure (the field is valid), and it's
// not really an internal error.
return errors.NewInternalError(fmt.Errorf("failed to allocate a nodePort: %v", err))
}
servicePort.NodePort = int32(nodePort)
}
// Detect duplicate node ports; this should have been caught by validation, so we panic
if contains(newNodePorts, nodePort) {
panic("duplicate node port")
}
newNodePorts = append(newNodePorts, nodePort)
}
// The comparison loops are O(N^2), but we don't expect N to be huge
// (there's a hard-limit at 2^16, because they're ports; and even 4 ports would be a lot)
for _, oldNodePort := range oldNodePorts {
if contains(newNodePorts, oldNodePort) {
continue
}
nodePortOp.ReleaseDeferred(oldNodePort)
}
return nil
}
func (rs *REST) releaseNodePort(service *api.Service, nodePortOp *portallocator.PortAllocationOperation) {
nodePorts := CollectServiceNodePorts(service)
for _, nodePort := range nodePorts {
nodePortOp.ReleaseDeferred(nodePort)
}
}

View File

@ -225,7 +225,6 @@ func assertFilesContain(fileNames []string, fileDir string, pod *v1.Pod, client
}
func validateDNSResults(f *framework.Framework, pod *v1.Pod, fileNames []string) {
By("submitting the pod to kubernetes")
podClient := f.ClientSet.Core().Pods(f.Namespace.Name)
defer func() {
@ -254,7 +253,6 @@ func validateDNSResults(f *framework.Framework, pod *v1.Pod, fileNames []string)
}
func validateTargetedProbeOutput(f *framework.Framework, pod *v1.Pod, fileNames []string, value string) {
By("submitting the pod to kubernetes")
podClient := f.ClientSet.Core().Pods(f.Namespace.Name)
defer func() {
@ -424,6 +422,10 @@ var _ = framework.KubeDescribe("DNS", func() {
})
It("should provide DNS for ExternalName services", func() {
// TODO(xiangpengzhao): allow AWS when pull-kubernetes-e2e-kops-aws and pull-kubernetes-e2e-gce-etcd3
// have the same "service-cluster-ip-range". See: https://github.com/kubernetes/kubernetes/issues/47224
framework.SkipUnlessProviderIs("gce")
// Create a test ExternalName service.
By("Creating a test externalName service")
serviceName := "dns-test-service-3"
@ -469,7 +471,7 @@ var _ = framework.KubeDescribe("DNS", func() {
By("changing the service to type=ClusterIP")
_, err = framework.UpdateService(f.ClientSet, f.Namespace.Name, serviceName, func(s *v1.Service) {
s.Spec.Type = v1.ServiceTypeClusterIP
s.Spec.ClusterIP = "127.1.2.3"
s.Spec.ClusterIP = "10.0.0.123"
s.Spec.Ports = []v1.ServicePort{
{Port: 80, Name: "http", Protocol: "TCP"},
}
@ -484,6 +486,6 @@ var _ = framework.KubeDescribe("DNS", func() {
By("creating a third pod to probe DNS")
pod3 := createDNSPod(f.Namespace.Name, wheezyProbeCmd, jessieProbeCmd, true)
validateTargetedProbeOutput(f, pod3, []string{wheezyFileName, jessieFileName}, "127.1.2.3")
validateTargetedProbeOutput(f, pod3, []string{wheezyFileName, jessieFileName}, "10.0.0.123")
})
})

View File

@ -35,6 +35,7 @@ import (
"k8s.io/apimachinery/pkg/util/sets"
"k8s.io/apimachinery/pkg/util/uuid"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/kubernetes/pkg/api"
"k8s.io/kubernetes/pkg/client/clientset_generated/clientset"
"k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset"
"k8s.io/kubernetes/pkg/client/retry"
@ -177,6 +178,31 @@ func (j *ServiceTestJig) CreateUDPServiceOrFail(namespace string, tweak func(svc
return result
}
// CreateExternalNameServiceOrFail creates a new ExternalName type Service based on the jig's defaults.
// Callers can provide a function to tweak the Service object before it is created.
func (j *ServiceTestJig) CreateExternalNameServiceOrFail(namespace string, tweak func(svc *v1.Service)) *v1.Service {
svc := &v1.Service{
ObjectMeta: metav1.ObjectMeta{
Namespace: namespace,
Name: j.Name,
Labels: j.Labels,
},
Spec: v1.ServiceSpec{
Selector: j.Labels,
ExternalName: "foo.example.com",
Type: v1.ServiceTypeExternalName,
},
}
if tweak != nil {
tweak(svc)
}
result, err := j.Client.Core().Services(namespace).Create(svc)
if err != nil {
Failf("Failed to create ExternalName Service %q: %v", svc.Name, err)
}
return result
}
func (j *ServiceTestJig) ChangeServiceType(namespace, name string, newType v1.ServiceType, timeout time.Duration) {
ingressIP := ""
svc := j.UpdateServiceOrFail(namespace, name, func(s *v1.Service) {
@ -373,8 +399,18 @@ func (j *ServiceTestJig) SanityCheckService(svc *v1.Service, svcType v1.ServiceT
if svc.Spec.Type != svcType {
Failf("unexpected Spec.Type (%s) for service, expected %s", svc.Spec.Type, svcType)
}
if svcType != v1.ServiceTypeExternalName {
if svc.Spec.ExternalName != "" {
Failf("unexpected Spec.ExternalName (%s) for service, expected empty", svc.Spec.ExternalName)
}
if svc.Spec.ClusterIP != api.ClusterIPNone && svc.Spec.ClusterIP == "" {
Failf("didn't get ClusterIP for non-ExternamName service")
}
}
expectNodePorts := false
if svcType != v1.ServiceTypeClusterIP {
if svcType != v1.ServiceTypeClusterIP && svcType != v1.ServiceTypeExternalName {
expectNodePorts = true
}
for i, port := range svc.Spec.Ports {

View File

@ -789,6 +789,65 @@ var _ = framework.KubeDescribe("Services", func() {
}
})
It("should be able to change the type from ExternalName to ClusterIP", func() {
serviceName := "externalname-service"
ns := f.Namespace.Name
jig := framework.NewServiceTestJig(cs, serviceName)
By("creating a service " + serviceName + " with the type=ExternalName in namespace " + ns)
externalNameService := jig.CreateExternalNameServiceOrFail(ns, nil)
jig.SanityCheckService(externalNameService, v1.ServiceTypeExternalName)
By("changing the ExternalName service to type=ClusterIP")
clusterIPService := jig.UpdateServiceOrFail(ns, externalNameService.Name, func(s *v1.Service) {
s.Spec.Type = v1.ServiceTypeClusterIP
s.Spec.ExternalName = ""
s.Spec.Ports = []v1.ServicePort{
{Port: 80, Name: "http", Protocol: "TCP"},
}
})
jig.SanityCheckService(clusterIPService, v1.ServiceTypeClusterIP)
})
It("should be able to change the type from ExternalName to NodePort", func() {
serviceName := "externalname-service"
ns := f.Namespace.Name
jig := framework.NewServiceTestJig(cs, serviceName)
By("creating a service " + serviceName + " with the type=ExternalName in namespace " + ns)
externalNameService := jig.CreateExternalNameServiceOrFail(ns, nil)
jig.SanityCheckService(externalNameService, v1.ServiceTypeExternalName)
By("changing the ExternalName service to type=NodePort")
nodePortService := jig.UpdateServiceOrFail(ns, externalNameService.Name, func(s *v1.Service) {
s.Spec.Type = v1.ServiceTypeNodePort
s.Spec.ExternalName = ""
s.Spec.Ports = []v1.ServicePort{
{Port: 80, Name: "http", Protocol: "TCP"},
}
})
jig.SanityCheckService(nodePortService, v1.ServiceTypeNodePort)
})
It("should be able to change the type from ExternalName to LoadBalancer", func() {
serviceName := "externalname-service"
ns := f.Namespace.Name
loadBalancerCreateTimeout := framework.LoadBalancerCreateTimeoutDefault
jig := framework.NewServiceTestJig(cs, serviceName)
By("creating a service " + serviceName + " with the type=ExternalName in namespace " + ns)
externalNameService := jig.CreateExternalNameServiceOrFail(ns, nil)
jig.SanityCheckService(externalNameService, v1.ServiceTypeExternalName)
By("changing the ExternalName service to type=LoadBalancer")
loadBalancerIPService := jig.UpdateServiceOrFail(ns, externalNameService.Name, func(s *v1.Service) {
s.Spec.Type = v1.ServiceTypeLoadBalancer
s.Spec.ExternalName = ""
s.Spec.Ports = []v1.ServicePort{
{Port: 80, Name: "http", Protocol: "TCP"},
}
})
loadBalancerIPService = jig.WaitForLoadBalancerOrFail(ns, loadBalancerIPService.Name, loadBalancerCreateTimeout)
jig.SanityCheckService(loadBalancerIPService, v1.ServiceTypeLoadBalancer)
})
It("should use same NodePort with same port but different protocols", func() {
serviceName := "nodeports"
ns := f.Namespace.Name
@ -866,7 +925,7 @@ var _ = framework.KubeDescribe("Services", func() {
}
port := result.Spec.Ports[0]
if port.NodePort == 0 {
framework.Failf("got unexpected Spec.Ports[0].nodePort for new service: %v", result)
framework.Failf("got unexpected Spec.Ports[0].NodePort for new service: %v", result)
}
By("creating service " + serviceName2 + " with conflicting NodePort")