diff --git a/pkg/registry/service/rest.go b/pkg/registry/service/rest.go index d9a8eb0e76b..d94a60fb693 100644 --- a/pkg/registry/service/rest.go +++ b/pkg/registry/service/rest.go @@ -37,6 +37,7 @@ import ( "k8s.io/kubernetes/pkg/runtime" featuregate "k8s.io/kubernetes/pkg/util/config" utilnet "k8s.io/kubernetes/pkg/util/net" + utilruntime "k8s.io/kubernetes/pkg/util/runtime" "k8s.io/kubernetes/pkg/util/validation/field" "k8s.io/kubernetes/pkg/watch" ) @@ -157,7 +158,7 @@ func (rs *REST) Create(ctx api.Context, obj runtime.Object) (runtime.Object, err } } - if featuregate.DefaultFeatureGate.ExternalTrafficLocalOnly() && shouldCheckOrAssignHealthCheckNodePort(service) { + if shouldCheckOrAssignHealthCheckNodePort(service) { var healthCheckNodePort int var err error if l, ok := service.Annotations[apiservice.AnnotationHealthCheckNodePort]; ok { @@ -234,6 +235,16 @@ func (rs *REST) Delete(ctx api.Context, id string) (runtime.Object, error) { } } + if shouldCheckOrAssignHealthCheckNodePort(service) { + nodePort := apiservice.GetServiceHealthCheckNodePort(service) + if nodePort > 0 { + err := rs.serviceNodePorts.Release(int(nodePort)) + if err != nil { + // these should be caught by an eventual reconciliation / restart + utilruntime.HandleError(fmt.Errorf("Error releasing service health check %s node port %d: %v", service.Name, nodePort, err)) + } + } + } return &unversioned.Status{Status: unversioned.StatusSuccess}, nil } @@ -265,6 +276,97 @@ func (*REST) NewList() runtime.Object { return &api.ServiceList{} } +func (rs *REST) healthCheckNodePortUpdate(oldService, service *api.Service) (bool, error) { + // Health Check Node Port handling during updates + // + // Case 1. Transition from globalTraffic to OnlyLocal for the ESIPP annotation + // + // Allocate a health check node port or attempt to reserve the user-specified one, if provided. + // Insert health check node port as an annotation into the service's annotations + // + // Case 2. Transition from OnlyLocal to Global for the ESIPP annotation + // + // Free the existing healthCheckNodePort and clear the health check nodePort annotation + // + // Case 3. No change (Global ---stays--> Global) but prevent invalid annotation manipulations + // + // Reject insertion of the "service.alpha.kubernetes.io/healthcheck-nodeport" annotation + // + // Case 4. No change (OnlyLocal ---stays--> OnlyLocal) but prevent invalid annotation manipulations + // + // Reject deletion of the "service.alpha.kubernetes.io/healthcheck-nodeport" annotation + // Reject changing the value of the healthCheckNodePort annotation + // + oldServiceHasHealthCheckNodePort := shouldCheckOrAssignHealthCheckNodePort(oldService) + oldHealthCheckNodePort := apiservice.GetServiceHealthCheckNodePort(oldService) + + assignHealthCheckNodePort := shouldCheckOrAssignHealthCheckNodePort(service) + requestedHealthCheckNodePort := apiservice.GetServiceHealthCheckNodePort(service) + + switch { + case !oldServiceHasHealthCheckNodePort && assignHealthCheckNodePort: + glog.Infof("Transition from Global LB service to OnlyLocal service") + if requestedHealthCheckNodePort > 0 { + // If the request has a health check nodePort in mind, attempt to reserve it + err := rs.serviceNodePorts.Allocate(int(requestedHealthCheckNodePort)) + if err != nil { + errmsg := fmt.Sprintf("Failed to allocate requested HealthCheck nodePort %v:%v", + requestedHealthCheckNodePort, err) + el := field.ErrorList{field.Invalid(field.NewPath("metadata", "annotations"), + apiservice.AnnotationHealthCheckNodePort, errmsg)} + return false, errors.NewInvalid(api.Kind("Service"), service.Name, el) + } + glog.Infof("Reserved user requested nodePort: %d", requestedHealthCheckNodePort) + } else { + // If the request has no health check nodePort specified, allocate any + healthCheckNodePort, err := rs.serviceNodePorts.AllocateNext() + if err != nil { + // TODO: what error should be returned here? It's not a + // field-level validation failure (the field is valid), and it's + // not really an internal error. + return false, errors.NewInternalError(fmt.Errorf("failed to allocate a nodePort: %v", err)) + } + // Insert the newly allocated health check port as an annotation (plan of record for Alpha) + service.Annotations[apiservice.AnnotationHealthCheckNodePort] = fmt.Sprintf("%d", healthCheckNodePort) + glog.Infof("Reserved health check nodePort: %d", healthCheckNodePort) + } + + case oldServiceHasHealthCheckNodePort && !assignHealthCheckNodePort: + glog.Infof("Transition from OnlyLocal LB service to Global service") + err := rs.serviceNodePorts.Release(int(oldHealthCheckNodePort)) + if err != nil { + glog.Warningf("Error releasing service health check %s node port %d: %v", service.Name, oldHealthCheckNodePort, err) + return false, errors.NewInternalError(fmt.Errorf("failed to free health check nodePort: %v", err)) + } else { + delete(service.Annotations, apiservice.AnnotationHealthCheckNodePort) + glog.Infof("Freed health check nodePort: %d", oldHealthCheckNodePort) + } + + case !oldServiceHasHealthCheckNodePort && !assignHealthCheckNodePort: + if _, ok := service.Annotations[apiservice.AnnotationHealthCheckNodePort]; ok { + glog.Warningf("Attempt to insert health check node port annotation DENIED") + el := field.ErrorList{field.Invalid(field.NewPath("metadata", "annotations"), + apiservice.AnnotationHealthCheckNodePort, "Cannot insert healthcheck nodePort annotation")} + return false, errors.NewInvalid(api.Kind("Service"), service.Name, el) + } + + case oldServiceHasHealthCheckNodePort && assignHealthCheckNodePort: + if _, ok := service.Annotations[apiservice.AnnotationHealthCheckNodePort]; !ok { + glog.Warningf("Attempt to delete health check node port annotation DENIED") + el := field.ErrorList{field.Invalid(field.NewPath("metadata", "annotations"), + apiservice.AnnotationHealthCheckNodePort, "Cannot delete healthcheck nodePort annotation")} + return false, errors.NewInvalid(api.Kind("Service"), service.Name, el) + } + if oldHealthCheckNodePort != requestedHealthCheckNodePort { + glog.Warningf("Attempt to change value of health check node port annotation DENIED") + el := field.ErrorList{field.Invalid(field.NewPath("metadata", "annotations"), + apiservice.AnnotationHealthCheckNodePort, "Cannot change healthcheck nodePort during update")} + return false, errors.NewInvalid(api.Kind("Service"), service.Name, el) + } + } + return true, nil +} + func (rs *REST) Update(ctx api.Context, name string, objInfo rest.UpdatedObjectInfo) (runtime.Object, bool, error) { oldService, err := rs.registry.GetService(ctx, name) if err != nil { @@ -342,6 +444,11 @@ func (rs *REST) Update(ctx api.Context, name string, objInfo rest.UpdatedObjectI service.Status.LoadBalancer = api.LoadBalancerStatus{} } + success, err := rs.healthCheckNodePortUpdate(oldService, service) + if !success { + return nil, false, err + } + out, err := rs.registry.UpdateService(ctx, service) if err == nil {