Merge pull request #96684 from thockin/rest-hooks-use-by-svc

Simplify and de-layer Service REST implementation
This commit is contained in:
Kubernetes Prow Robot 2021-09-11 12:40:06 -07:00 committed by GitHub
commit dd2d12f6dc
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
17 changed files with 14057 additions and 6137 deletions

View File

@ -25973,6 +25973,127 @@
}
},
"/api/v1/namespaces/{namespace}/services": {
"delete": {
"consumes": [
"*/*"
],
"description": "delete collection of Service",
"operationId": "deleteCoreV1CollectionNamespacedService",
"parameters": [
{
"in": "body",
"name": "body",
"schema": {
"$ref": "#/definitions/io.k8s.apimachinery.pkg.apis.meta.v1.DeleteOptions"
}
},
{
"description": "The continue option should be set when retrieving more results from the server. Since this value is server defined, clients may only use the continue value from a previous query result with identical query parameters (except for the value of continue) and the server may reject a continue value it does not recognize. If the specified continue value is no longer valid whether due to expiration (generally five to fifteen minutes) or a configuration change on the server, the server will respond with a 410 ResourceExpired error together with a continue token. If the client needs a consistent list, it must restart their list without the continue field. Otherwise, the client may send another list request with the token received with the 410 error, the server will respond with a list starting from the next key, but from the latest snapshot, which is inconsistent from the previous list results - objects that are created, modified, or deleted after the first list request will be included in the response, as long as their keys are after the \"next key\".\n\nThis field is not supported when watch is true. Clients may start a watch from the last resourceVersion value returned by the server and not miss any modifications.",
"in": "query",
"name": "continue",
"type": "string",
"uniqueItems": true
},
{
"description": "When present, indicates that modifications should not be persisted. An invalid or unrecognized dryRun directive will result in an error response and no further processing of the request. Valid values are: - All: all dry run stages will be processed",
"in": "query",
"name": "dryRun",
"type": "string",
"uniqueItems": true
},
{
"description": "A selector to restrict the list of returned objects by their fields. Defaults to everything.",
"in": "query",
"name": "fieldSelector",
"type": "string",
"uniqueItems": true
},
{
"description": "The duration in seconds before the object should be deleted. Value must be non-negative integer. The value zero indicates delete immediately. If this value is nil, the default grace period for the specified type will be used. Defaults to a per object value if not specified. zero means delete immediately.",
"in": "query",
"name": "gracePeriodSeconds",
"type": "integer",
"uniqueItems": true
},
{
"description": "A selector to restrict the list of returned objects by their labels. Defaults to everything.",
"in": "query",
"name": "labelSelector",
"type": "string",
"uniqueItems": true
},
{
"description": "limit is a maximum number of responses to return for a list call. If more items exist, the server will set the `continue` field on the list metadata to a value that can be used with the same initial query to retrieve the next set of results. Setting a limit may return fewer than the requested amount of items (up to zero items) in the event all requested objects are filtered out and clients should only use the presence of the continue field to determine whether more results are available. Servers may choose not to support the limit argument and will return all of the available results. If limit is specified and the continue field is empty, clients may assume that no more results are available. This field is not supported if watch is true.\n\nThe server guarantees that the objects returned when using continue will be identical to issuing a single list call without a limit - that is, no objects created, modified, or deleted after the first request is issued will be included in any subsequent continued requests. This is sometimes referred to as a consistent snapshot, and ensures that a client that is using limit to receive smaller chunks of a very large result can ensure they see all possible objects. If objects are updated during a chunked list the version of the object that was present at the time the first list result was calculated is returned.",
"in": "query",
"name": "limit",
"type": "integer",
"uniqueItems": true
},
{
"description": "Deprecated: please use the PropagationPolicy, this field will be deprecated in 1.7. Should the dependent objects be orphaned. If true/false, the \"orphan\" finalizer will be added to/removed from the object's finalizers list. Either this field or PropagationPolicy may be set, but not both.",
"in": "query",
"name": "orphanDependents",
"type": "boolean",
"uniqueItems": true
},
{
"description": "Whether and how garbage collection will be performed. Either this field or OrphanDependents may be set, but not both. The default policy is decided by the existing finalizer set in the metadata.finalizers and the resource-specific default policy. Acceptable values are: 'Orphan' - orphan the dependents; 'Background' - allow the garbage collector to delete the dependents in the background; 'Foreground' - a cascading policy that deletes all dependents in the foreground.",
"in": "query",
"name": "propagationPolicy",
"type": "string",
"uniqueItems": true
},
{
"description": "resourceVersion sets a constraint on what resource versions a request may be served from. See https://kubernetes.io/docs/reference/using-api/api-concepts/#resource-versions for details.\n\nDefaults to unset",
"in": "query",
"name": "resourceVersion",
"type": "string",
"uniqueItems": true
},
{
"description": "resourceVersionMatch determines how resourceVersion is applied to list calls. It is highly recommended that resourceVersionMatch be set for list calls where resourceVersion is set See https://kubernetes.io/docs/reference/using-api/api-concepts/#resource-versions for details.\n\nDefaults to unset",
"in": "query",
"name": "resourceVersionMatch",
"type": "string",
"uniqueItems": true
},
{
"description": "Timeout for the list/watch call. This limits the duration of the call, regardless of any activity or inactivity.",
"in": "query",
"name": "timeoutSeconds",
"type": "integer",
"uniqueItems": true
}
],
"produces": [
"application/json",
"application/yaml",
"application/vnd.kubernetes.protobuf"
],
"responses": {
"200": {
"description": "OK",
"schema": {
"$ref": "#/definitions/io.k8s.apimachinery.pkg.apis.meta.v1.Status"
}
},
"401": {
"description": "Unauthorized"
}
},
"schemes": [
"https"
],
"tags": [
"core_v1"
],
"x-kubernetes-action": "deletecollection",
"x-kubernetes-group-version-kind": {
"group": "",
"kind": "Service",
"version": "v1"
}
},
"get": {
"consumes": [
"*/*"
@ -26217,13 +26338,13 @@
"200": {
"description": "OK",
"schema": {
"$ref": "#/definitions/io.k8s.apimachinery.pkg.apis.meta.v1.Status"
"$ref": "#/definitions/io.k8s.api.core.v1.Service"
}
},
"202": {
"description": "Accepted",
"schema": {
"$ref": "#/definitions/io.k8s.apimachinery.pkg.apis.meta.v1.Status"
"$ref": "#/definitions/io.k8s.api.core.v1.Service"
}
},
"401": {

View File

@ -48,7 +48,8 @@ func MakeService(name string, tweaks ...Tweak) *api.Service {
SetTypeClusterIP(svc)
// Default to 1 port
SetPorts(MakeServicePort("", 93, intstr.FromInt(76), api.ProtocolTCP))(svc)
// Default internalTrafficPolicy to "Cluster"
// Default internalTrafficPolicy to "Cluster". This probably should not
// apply to ExternalName, but it went into beta and is not worth breaking.
SetInternalTrafficPolicy(api.ServiceInternalTrafficPolicyCluster)(svc)
for _, tweak := range tweaks {
@ -113,6 +114,29 @@ func MakeServicePort(name string, port int, tgtPort intstr.IntOrString, proto ap
}
}
// SetHeadless sets the service as headless and clears other fields.
func SetHeadless(svc *api.Service) {
SetTypeClusterIP(svc)
svc.Spec.ClusterIP = api.ClusterIPNone
}
// SetSelector sets the service selector.
func SetSelector(sel map[string]string) Tweak {
return func(svc *api.Service) {
svc.Spec.Selector = map[string]string{}
for k, v := range sel {
svc.Spec.Selector[k] = v
}
}
}
// SetClusterIP sets the service ClusterIP fields.
func SetClusterIP(ip string) Tweak {
return func(svc *api.Service) {
svc.Spec.ClusterIP = ip
}
}
// SetClusterIPs sets the service ClusterIP and ClusterIPs fields.
func SetClusterIPs(ips ...string) Tweak {
return func(svc *api.Service) {
@ -169,9 +193,41 @@ func SetAllocateLoadBalancerNodePorts(val bool) Tweak {
}
}
// SetUniqueNodePorts sets all nodeports to unique values.
func SetUniqueNodePorts(svc *api.Service) {
for i := range svc.Spec.Ports {
svc.Spec.Ports[i].NodePort = int32(30000 + i)
}
}
// SetHealthCheckNodePort sets the healthCheckNodePort field for a Service.
func SetHealthCheckNodePort(value int32) Tweak {
return func(svc *api.Service) {
svc.Spec.HealthCheckNodePort = value
}
}
// SetSessionAffinity sets the SessionAffinity field.
func SetSessionAffinity(affinity api.ServiceAffinity) Tweak {
return func(svc *api.Service) {
svc.Spec.SessionAffinity = affinity
switch affinity {
case api.ServiceAffinityNone:
svc.Spec.SessionAffinityConfig = nil
case api.ServiceAffinityClientIP:
timeout := int32(10)
svc.Spec.SessionAffinityConfig = &api.SessionAffinityConfig{
ClientIP: &api.ClientIPConfig{
TimeoutSeconds: &timeout,
},
}
}
}
}
// SetExternalName sets the ExternalName field.
func SetExternalName(val string) Tweak {
return func(svc *api.Service) {
svc.Spec.ExternalName = val
}
}

View File

@ -45,6 +45,7 @@ import (
schedulinghelper "k8s.io/component-helpers/scheduling/corev1"
apiservice "k8s.io/kubernetes/pkg/api/service"
"k8s.io/kubernetes/pkg/apis/core"
api "k8s.io/kubernetes/pkg/apis/core"
"k8s.io/kubernetes/pkg/apis/core/helper"
podshelper "k8s.io/kubernetes/pkg/apis/core/pods"
corev1 "k8s.io/kubernetes/pkg/apis/core/v1"
@ -4357,7 +4358,7 @@ func ValidateService(service *core.Service) field.ErrorList {
}
// dualstack <-> ClusterIPs <-> ipfamilies
allErrs = append(allErrs, validateServiceClusterIPsRelatedFields(service)...)
allErrs = append(allErrs, ValidateServiceClusterIPsRelatedFields(service)...)
ipPath := specPath.Child("externalIPs")
for i, ip := range service.Spec.ExternalIPs {
@ -4453,8 +4454,8 @@ func ValidateService(service *core.Service) field.ErrorList {
// validate LoadBalancerClass field
allErrs = append(allErrs, validateLoadBalancerClassField(nil, service)...)
// external traffic fields
allErrs = append(allErrs, validateServiceExternalTrafficFieldsValue(service)...)
// external traffic policy fields
allErrs = append(allErrs, validateServiceExternalTrafficPolicy(service)...)
// internal traffic policy field
allErrs = append(allErrs, validateServiceInternalTrafficFieldsValue(service)...)
@ -4506,22 +4507,58 @@ func validateServicePort(sp *core.ServicePort, requireName, isHeadlessService bo
return allErrs
}
// validateServiceExternalTrafficFieldsValue validates ExternalTraffic related annotations
// have legal value.
func validateServiceExternalTrafficFieldsValue(service *core.Service) field.ErrorList {
func needsExternalTrafficPolicy(svc *api.Service) bool {
return svc.Spec.Type == core.ServiceTypeLoadBalancer || svc.Spec.Type == core.ServiceTypeNodePort
}
var validExternalTrafficPolicies = sets.NewString(
string(core.ServiceExternalTrafficPolicyTypeCluster),
string(core.ServiceExternalTrafficPolicyTypeLocal))
func validateServiceExternalTrafficPolicy(service *core.Service) field.ErrorList {
allErrs := field.ErrorList{}
// Check first class fields.
if service.Spec.ExternalTrafficPolicy != "" &&
service.Spec.ExternalTrafficPolicy != core.ServiceExternalTrafficPolicyTypeCluster &&
service.Spec.ExternalTrafficPolicy != core.ServiceExternalTrafficPolicyTypeLocal {
allErrs = append(allErrs, field.Invalid(field.NewPath("spec").Child("externalTrafficPolicy"), service.Spec.ExternalTrafficPolicy,
fmt.Sprintf("ExternalTrafficPolicy must be empty, %v or %v", core.ServiceExternalTrafficPolicyTypeCluster, core.ServiceExternalTrafficPolicyTypeLocal)))
fldPath := field.NewPath("spec")
if !needsExternalTrafficPolicy(service) {
if service.Spec.ExternalTrafficPolicy != "" {
allErrs = append(allErrs, field.Invalid(fldPath.Child("externalTrafficPolicy"), service.Spec.ExternalTrafficPolicy,
"may only be set when `type` is 'NodePort' or 'LoadBalancer'"))
}
} else {
if service.Spec.ExternalTrafficPolicy == "" {
allErrs = append(allErrs, field.Required(fldPath.Child("externalTrafficPolicy"), ""))
} else if !validExternalTrafficPolicies.Has(string(service.Spec.ExternalTrafficPolicy)) {
allErrs = append(allErrs, field.NotSupported(fldPath.Child("externalTrafficPolicy"),
service.Spec.ExternalTrafficPolicy, validExternalTrafficPolicies.List()))
}
}
if service.Spec.HealthCheckNodePort < 0 {
allErrs = append(allErrs, field.Invalid(field.NewPath("spec").Child("healthCheckNodePort"), service.Spec.HealthCheckNodePort,
"HealthCheckNodePort must be not less than 0"))
if !apiservice.NeedsHealthCheck(service) {
if service.Spec.HealthCheckNodePort != 0 {
allErrs = append(allErrs, field.Invalid(fldPath.Child("healthCheckNodePort"), service.Spec.HealthCheckNodePort,
"may only be set when `type` is 'LoadBalancer' and `externalTrafficPolicy` is 'Local'"))
}
} else {
if service.Spec.HealthCheckNodePort == 0 {
allErrs = append(allErrs, field.Required(fldPath.Child("healthCheckNodePort"), ""))
} else {
for _, msg := range validation.IsValidPortNum(int(service.Spec.HealthCheckNodePort)) {
allErrs = append(allErrs, field.Invalid(fldPath.Child("healthCheckNodePort"), service.Spec.HealthCheckNodePort, msg))
}
}
}
return allErrs
}
func validateServiceExternalTrafficFieldsUpdate(before, after *api.Service) field.ErrorList {
allErrs := field.ErrorList{}
if apiservice.NeedsHealthCheck(before) && apiservice.NeedsHealthCheck(after) {
if after.Spec.HealthCheckNodePort != before.Spec.HealthCheckNodePort {
allErrs = append(allErrs, field.Forbidden(field.NewPath("spec", "healthCheckNodePort"), "field is immutable"))
}
}
return allErrs
@ -4545,29 +4582,6 @@ func validateServiceInternalTrafficFieldsValue(service *core.Service) field.Erro
return allErrs
}
// ValidateServiceExternalTrafficFieldsCombination validates if ExternalTrafficPolicy,
// HealthCheckNodePort and Type combination are legal. For update, it should be called
// after clearing externalTraffic related fields for the ease of transitioning between
// different service types.
func ValidateServiceExternalTrafficFieldsCombination(service *core.Service) field.ErrorList {
allErrs := field.ErrorList{}
if service.Spec.Type != core.ServiceTypeLoadBalancer &&
service.Spec.Type != core.ServiceTypeNodePort &&
service.Spec.ExternalTrafficPolicy != "" {
allErrs = append(allErrs, field.Invalid(field.NewPath("spec", "externalTrafficPolicy"), service.Spec.ExternalTrafficPolicy,
"ExternalTrafficPolicy can only be set on NodePort and LoadBalancer service"))
}
if !apiservice.NeedsHealthCheck(service) &&
service.Spec.HealthCheckNodePort != 0 {
allErrs = append(allErrs, field.Invalid(field.NewPath("spec", "healthCheckNodePort"), service.Spec.HealthCheckNodePort,
"HealthCheckNodePort can only be set on LoadBalancer service with ExternalTrafficPolicy=Local"))
}
return allErrs
}
// ValidateServiceCreate validates Services as they are created.
func ValidateServiceCreate(service *core.Service) field.ErrorList {
return ValidateService(service)
@ -4591,6 +4605,8 @@ func ValidateServiceUpdate(service, oldService *core.Service) field.ErrorList {
upgradeDowngradeLoadBalancerClassErrs := validateLoadBalancerClassField(oldService, service)
allErrs = append(allErrs, upgradeDowngradeLoadBalancerClassErrs...)
allErrs = append(allErrs, validateServiceExternalTrafficFieldsUpdate(oldService, service)...)
return append(allErrs, ValidateService(service)...)
}
@ -6289,8 +6305,10 @@ func ValidateSpreadConstraintNotRepeat(fldPath *field.Path, constraint core.Topo
return nil
}
// validateServiceClusterIPsRelatedFields validates .spec.ClusterIPs,, .spec.IPFamilies, .spec.ipFamilyPolicy
func validateServiceClusterIPsRelatedFields(service *core.Service) field.ErrorList {
// ValidateServiceClusterIPsRelatedFields validates .spec.ClusterIPs,,
// .spec.IPFamilies, .spec.ipFamilyPolicy. This is exported because it is used
// during IP init and allocation.
func ValidateServiceClusterIPsRelatedFields(service *core.Service) field.ErrorList {
// ClusterIP, ClusterIPs, IPFamilyPolicy and IPFamilies are validated prior (all must be unset) for ExternalName service
if service.Spec.Type == core.ServiceTypeExternalName {
return field.ErrorList{}
@ -6312,12 +6330,12 @@ func validateServiceClusterIPsRelatedFields(service *core.Service) field.ErrorLi
if len(service.Spec.ClusterIPs) == 0 {
allErrs = append(allErrs, field.Required(clusterIPsField, ""))
} else if service.Spec.ClusterIPs[0] != service.Spec.ClusterIP {
allErrs = append(allErrs, field.Invalid(clusterIPsField, service.Spec.ClusterIPs, "element [0] must match clusterIP"))
allErrs = append(allErrs, field.Invalid(clusterIPsField, service.Spec.ClusterIPs, "first value must match `clusterIP`"))
}
} else { // ClusterIP == ""
// If ClusterIP is not set, ClusterIPs must also be unset.
if len(service.Spec.ClusterIPs) != 0 {
allErrs = append(allErrs, field.Invalid(clusterIPsField, service.Spec.ClusterIPs, "must be empty when clusterIP is empty"))
allErrs = append(allErrs, field.Invalid(clusterIPsField, service.Spec.ClusterIPs, "must be empty when `clusterIP` is not specified"))
}
}
@ -6454,7 +6472,7 @@ func validateUpgradeDowngradeClusterIPs(oldService, service *core.Service) field
// user *must* set IPFamilyPolicy == SingleStack
if len(service.Spec.ClusterIPs) == 1 {
if service.Spec.IPFamilyPolicy == nil || *(service.Spec.IPFamilyPolicy) != core.IPFamilyPolicySingleStack {
allErrs = append(allErrs, field.Invalid(field.NewPath("spec", "clusterIPs").Index(0), service.Spec.ClusterIPs, "`ipFamilyPolicy` must be set to 'SingleStack' when releasing the secondary clusterIP"))
allErrs = append(allErrs, field.Invalid(field.NewPath("spec", "ipFamilyPolicy"), service.Spec.IPFamilyPolicy, "must be set to 'SingleStack' when releasing the secondary clusterIP"))
}
}
case len(oldService.Spec.ClusterIPs) < len(service.Spec.ClusterIPs):
@ -6518,7 +6536,7 @@ func validateUpgradeDowngradeIPFamilies(oldService, service *core.Service) field
// user *must* set IPFamilyPolicy == SingleStack
if len(service.Spec.IPFamilies) == 1 {
if service.Spec.IPFamilyPolicy == nil || *(service.Spec.IPFamilyPolicy) != core.IPFamilyPolicySingleStack {
allErrs = append(allErrs, field.Invalid(field.NewPath("spec", "clusterIPs").Index(0), service.Spec.ClusterIPs, "`ipFamilyPolicy` must be set to 'SingleStack' when releasing the secondary ipFamily"))
allErrs = append(allErrs, field.Invalid(field.NewPath("spec", "ipFamilyPolicy"), service.Spec.IPFamilyPolicy, "must be set to 'SingleStack' when releasing the secondary ipFamily"))
}
}
case len(oldService.Spec.IPFamilies) < len(service.Spec.IPFamilies):

View File

@ -11003,6 +11003,7 @@ func TestValidateServiceCreate(t *testing.T) {
name: "valid load balancer protocol UDP 1",
tweakSvc: func(s *core.Service) {
s.Spec.Type = core.ServiceTypeLoadBalancer
s.Spec.ExternalTrafficPolicy = core.ServiceExternalTrafficPolicyTypeCluster
s.Spec.AllocateLoadBalancerNodePorts = utilpointer.BoolPtr(true)
s.Spec.Ports[0].Protocol = "UDP"
},
@ -11012,6 +11013,7 @@ func TestValidateServiceCreate(t *testing.T) {
name: "valid load balancer protocol UDP 2",
tweakSvc: func(s *core.Service) {
s.Spec.Type = core.ServiceTypeLoadBalancer
s.Spec.ExternalTrafficPolicy = core.ServiceExternalTrafficPolicyTypeCluster
s.Spec.AllocateLoadBalancerNodePorts = utilpointer.BoolPtr(true)
s.Spec.Ports[0] = core.ServicePort{Name: "q", Port: 12345, Protocol: "UDP", TargetPort: intstr.FromInt(12345)}
},
@ -11021,6 +11023,7 @@ func TestValidateServiceCreate(t *testing.T) {
name: "load balancer with mix protocol",
tweakSvc: func(s *core.Service) {
s.Spec.Type = core.ServiceTypeLoadBalancer
s.Spec.ExternalTrafficPolicy = core.ServiceExternalTrafficPolicyTypeCluster
s.Spec.AllocateLoadBalancerNodePorts = utilpointer.BoolPtr(true)
s.Spec.Ports = append(s.Spec.Ports, core.ServicePort{Name: "q", Port: 12345, Protocol: "UDP", TargetPort: intstr.FromInt(12345)})
},
@ -11075,6 +11078,7 @@ func TestValidateServiceCreate(t *testing.T) {
name: "valid type - loadbalancer",
tweakSvc: func(s *core.Service) {
s.Spec.Type = core.ServiceTypeLoadBalancer
s.Spec.ExternalTrafficPolicy = core.ServiceExternalTrafficPolicyTypeCluster
s.Spec.AllocateLoadBalancerNodePorts = utilpointer.BoolPtr(true)
},
numErrs: 0,
@ -11083,6 +11087,7 @@ func TestValidateServiceCreate(t *testing.T) {
name: "valid type - loadbalancer with allocateLoadBalancerNodePorts=false",
tweakSvc: func(s *core.Service) {
s.Spec.Type = core.ServiceTypeLoadBalancer
s.Spec.ExternalTrafficPolicy = core.ServiceExternalTrafficPolicyTypeCluster
s.Spec.AllocateLoadBalancerNodePorts = utilpointer.BoolPtr(false)
},
numErrs: 0,
@ -11091,6 +11096,7 @@ func TestValidateServiceCreate(t *testing.T) {
name: "invalid type - missing AllocateLoadBalancerNodePorts for loadbalancer type",
tweakSvc: func(s *core.Service) {
s.Spec.Type = core.ServiceTypeLoadBalancer
s.Spec.ExternalTrafficPolicy = core.ServiceExternalTrafficPolicyTypeCluster
},
numErrs: 1,
},
@ -11098,6 +11104,7 @@ func TestValidateServiceCreate(t *testing.T) {
name: "valid type loadbalancer 2 ports",
tweakSvc: func(s *core.Service) {
s.Spec.Type = core.ServiceTypeLoadBalancer
s.Spec.ExternalTrafficPolicy = core.ServiceExternalTrafficPolicyTypeCluster
s.Spec.AllocateLoadBalancerNodePorts = utilpointer.BoolPtr(true)
s.Spec.Ports = append(s.Spec.Ports, core.ServicePort{Name: "q", Port: 12345, Protocol: "TCP", TargetPort: intstr.FromInt(12345)})
},
@ -11107,6 +11114,7 @@ func TestValidateServiceCreate(t *testing.T) {
name: "valid external load balancer 2 ports",
tweakSvc: func(s *core.Service) {
s.Spec.Type = core.ServiceTypeLoadBalancer
s.Spec.ExternalTrafficPolicy = core.ServiceExternalTrafficPolicyTypeCluster
s.Spec.AllocateLoadBalancerNodePorts = utilpointer.BoolPtr(true)
s.Spec.Ports = append(s.Spec.Ports, core.ServicePort{Name: "q", Port: 12345, Protocol: "TCP", TargetPort: intstr.FromInt(12345)})
},
@ -11116,6 +11124,7 @@ func TestValidateServiceCreate(t *testing.T) {
name: "duplicate nodeports",
tweakSvc: func(s *core.Service) {
s.Spec.Type = core.ServiceTypeNodePort
s.Spec.ExternalTrafficPolicy = core.ServiceExternalTrafficPolicyTypeCluster
s.Spec.Ports = append(s.Spec.Ports, core.ServicePort{Name: "q", Port: 1, Protocol: "TCP", NodePort: 1, TargetPort: intstr.FromInt(1)})
s.Spec.Ports = append(s.Spec.Ports, core.ServicePort{Name: "r", Port: 2, Protocol: "TCP", NodePort: 1, TargetPort: intstr.FromInt(2)})
},
@ -11125,6 +11134,7 @@ func TestValidateServiceCreate(t *testing.T) {
name: "duplicate nodeports (different protocols)",
tweakSvc: func(s *core.Service) {
s.Spec.Type = core.ServiceTypeNodePort
s.Spec.ExternalTrafficPolicy = core.ServiceExternalTrafficPolicyTypeCluster
s.Spec.Ports = append(s.Spec.Ports, core.ServicePort{Name: "q", Port: 1, Protocol: "TCP", NodePort: 1, TargetPort: intstr.FromInt(1)})
s.Spec.Ports = append(s.Spec.Ports, core.ServicePort{Name: "r", Port: 2, Protocol: "UDP", NodePort: 1, TargetPort: intstr.FromInt(2)})
s.Spec.Ports = append(s.Spec.Ports, core.ServicePort{Name: "s", Port: 3, Protocol: "SCTP", NodePort: 1, TargetPort: intstr.FromInt(3)})
@ -11161,6 +11171,7 @@ func TestValidateServiceCreate(t *testing.T) {
name: "valid type - nodeport",
tweakSvc: func(s *core.Service) {
s.Spec.Type = core.ServiceTypeNodePort
s.Spec.ExternalTrafficPolicy = core.ServiceExternalTrafficPolicyTypeCluster
},
numErrs: 0,
},
@ -11168,6 +11179,7 @@ func TestValidateServiceCreate(t *testing.T) {
name: "valid type - loadbalancer with allocateLoadBalancerNodePorts=true",
tweakSvc: func(s *core.Service) {
s.Spec.Type = core.ServiceTypeLoadBalancer
s.Spec.ExternalTrafficPolicy = core.ServiceExternalTrafficPolicyTypeCluster
s.Spec.AllocateLoadBalancerNodePorts = utilpointer.BoolPtr(true)
},
numErrs: 0,
@ -11176,6 +11188,7 @@ func TestValidateServiceCreate(t *testing.T) {
name: "valid type loadbalancer 2 ports",
tweakSvc: func(s *core.Service) {
s.Spec.Type = core.ServiceTypeLoadBalancer
s.Spec.ExternalTrafficPolicy = core.ServiceExternalTrafficPolicyTypeCluster
s.Spec.AllocateLoadBalancerNodePorts = utilpointer.BoolPtr(true)
s.Spec.Ports = append(s.Spec.Ports, core.ServicePort{Name: "q", Port: 12345, Protocol: "TCP", TargetPort: intstr.FromInt(12345)})
},
@ -11185,6 +11198,7 @@ func TestValidateServiceCreate(t *testing.T) {
name: "valid type loadbalancer with NodePort",
tweakSvc: func(s *core.Service) {
s.Spec.Type = core.ServiceTypeLoadBalancer
s.Spec.ExternalTrafficPolicy = core.ServiceExternalTrafficPolicyTypeCluster
s.Spec.AllocateLoadBalancerNodePorts = utilpointer.BoolPtr(true)
s.Spec.Ports = append(s.Spec.Ports, core.ServicePort{Name: "q", Port: 12345, Protocol: "TCP", NodePort: 12345, TargetPort: intstr.FromInt(12345)})
},
@ -11194,6 +11208,7 @@ func TestValidateServiceCreate(t *testing.T) {
name: "valid type=NodePort service with NodePort",
tweakSvc: func(s *core.Service) {
s.Spec.Type = core.ServiceTypeNodePort
s.Spec.ExternalTrafficPolicy = core.ServiceExternalTrafficPolicyTypeCluster
s.Spec.Ports = append(s.Spec.Ports, core.ServicePort{Name: "q", Port: 12345, Protocol: "TCP", NodePort: 12345, TargetPort: intstr.FromInt(12345)})
},
numErrs: 0,
@ -11202,6 +11217,7 @@ func TestValidateServiceCreate(t *testing.T) {
name: "valid type=NodePort service without NodePort",
tweakSvc: func(s *core.Service) {
s.Spec.Type = core.ServiceTypeNodePort
s.Spec.ExternalTrafficPolicy = core.ServiceExternalTrafficPolicyTypeCluster
s.Spec.Ports = append(s.Spec.Ports, core.ServicePort{Name: "q", Port: 12345, Protocol: "TCP", TargetPort: intstr.FromInt(12345)})
},
numErrs: 0,
@ -11226,6 +11242,7 @@ func TestValidateServiceCreate(t *testing.T) {
name: "invalid public service with duplicate NodePort",
tweakSvc: func(s *core.Service) {
s.Spec.Type = core.ServiceTypeNodePort
s.Spec.ExternalTrafficPolicy = core.ServiceExternalTrafficPolicyTypeCluster
s.Spec.Ports = append(s.Spec.Ports, core.ServicePort{Name: "p1", Port: 1, Protocol: "TCP", NodePort: 1, TargetPort: intstr.FromInt(1)})
s.Spec.Ports = append(s.Spec.Ports, core.ServicePort{Name: "p2", Port: 2, Protocol: "TCP", NodePort: 1, TargetPort: intstr.FromInt(2)})
},
@ -11235,6 +11252,7 @@ func TestValidateServiceCreate(t *testing.T) {
name: "valid type=LoadBalancer",
tweakSvc: func(s *core.Service) {
s.Spec.Type = core.ServiceTypeLoadBalancer
s.Spec.ExternalTrafficPolicy = core.ServiceExternalTrafficPolicyTypeCluster
s.Spec.AllocateLoadBalancerNodePorts = utilpointer.BoolPtr(true)
s.Spec.Ports = append(s.Spec.Ports, core.ServicePort{Name: "q", Port: 12345, Protocol: "TCP", TargetPort: intstr.FromInt(12345)})
},
@ -11246,6 +11264,7 @@ func TestValidateServiceCreate(t *testing.T) {
name: "invalid port type=LoadBalancer",
tweakSvc: func(s *core.Service) {
s.Spec.Type = core.ServiceTypeLoadBalancer
s.Spec.ExternalTrafficPolicy = core.ServiceExternalTrafficPolicyTypeCluster
s.Spec.AllocateLoadBalancerNodePorts = utilpointer.BoolPtr(true)
s.Spec.Ports = append(s.Spec.Ports, core.ServicePort{Name: "kubelet", Port: 10250, Protocol: "TCP", TargetPort: intstr.FromInt(12345)})
},
@ -11255,6 +11274,7 @@ func TestValidateServiceCreate(t *testing.T) {
name: "valid LoadBalancer source range annotation",
tweakSvc: func(s *core.Service) {
s.Spec.Type = core.ServiceTypeLoadBalancer
s.Spec.ExternalTrafficPolicy = core.ServiceExternalTrafficPolicyTypeCluster
s.Spec.AllocateLoadBalancerNodePorts = utilpointer.BoolPtr(true)
s.Annotations[core.AnnotationLoadBalancerSourceRangesKey] = "1.2.3.4/8, 5.6.7.8/16"
},
@ -11264,6 +11284,7 @@ func TestValidateServiceCreate(t *testing.T) {
name: "empty LoadBalancer source range annotation",
tweakSvc: func(s *core.Service) {
s.Spec.Type = core.ServiceTypeLoadBalancer
s.Spec.ExternalTrafficPolicy = core.ServiceExternalTrafficPolicyTypeCluster
s.Spec.AllocateLoadBalancerNodePorts = utilpointer.BoolPtr(true)
s.Annotations[core.AnnotationLoadBalancerSourceRangesKey] = ""
},
@ -11280,6 +11301,7 @@ func TestValidateServiceCreate(t *testing.T) {
name: "invalid LoadBalancer source range annotation (invalid CIDR)",
tweakSvc: func(s *core.Service) {
s.Spec.Type = core.ServiceTypeLoadBalancer
s.Spec.ExternalTrafficPolicy = core.ServiceExternalTrafficPolicyTypeCluster
s.Spec.AllocateLoadBalancerNodePorts = utilpointer.BoolPtr(true)
s.Annotations[core.AnnotationLoadBalancerSourceRangesKey] = "1.2.3.4/33"
},
@ -11296,6 +11318,7 @@ func TestValidateServiceCreate(t *testing.T) {
name: "valid LoadBalancer source range",
tweakSvc: func(s *core.Service) {
s.Spec.Type = core.ServiceTypeLoadBalancer
s.Spec.ExternalTrafficPolicy = core.ServiceExternalTrafficPolicyTypeCluster
s.Spec.AllocateLoadBalancerNodePorts = utilpointer.BoolPtr(true)
s.Spec.LoadBalancerSourceRanges = []string{"1.2.3.4/8", "5.6.7.8/16"}
},
@ -11305,6 +11328,7 @@ func TestValidateServiceCreate(t *testing.T) {
name: "empty LoadBalancer source range",
tweakSvc: func(s *core.Service) {
s.Spec.Type = core.ServiceTypeLoadBalancer
s.Spec.ExternalTrafficPolicy = core.ServiceExternalTrafficPolicyTypeCluster
s.Spec.AllocateLoadBalancerNodePorts = utilpointer.BoolPtr(true)
s.Spec.LoadBalancerSourceRanges = []string{" "}
},
@ -11314,6 +11338,7 @@ func TestValidateServiceCreate(t *testing.T) {
name: "invalid LoadBalancer source range",
tweakSvc: func(s *core.Service) {
s.Spec.Type = core.ServiceTypeLoadBalancer
s.Spec.ExternalTrafficPolicy = core.ServiceExternalTrafficPolicyTypeCluster
s.Spec.AllocateLoadBalancerNodePorts = utilpointer.BoolPtr(true)
s.Spec.LoadBalancerSourceRanges = []string{"foo.bar"}
},
@ -11369,6 +11394,7 @@ func TestValidateServiceCreate(t *testing.T) {
s.Spec.ClusterIP = "None"
s.Spec.ClusterIPs = []string{"None"}
s.Spec.Type = core.ServiceTypeLoadBalancer
s.Spec.ExternalTrafficPolicy = core.ServiceExternalTrafficPolicyTypeCluster
s.Spec.AllocateLoadBalancerNodePorts = utilpointer.BoolPtr(true)
},
numErrs: 1,
@ -11377,6 +11403,7 @@ func TestValidateServiceCreate(t *testing.T) {
name: "invalid node port with clusterIP None",
tweakSvc: func(s *core.Service) {
s.Spec.Type = core.ServiceTypeNodePort
s.Spec.ExternalTrafficPolicy = core.ServiceExternalTrafficPolicyTypeCluster
s.Spec.Ports = append(s.Spec.Ports, core.ServicePort{Name: "q", Port: 1, Protocol: "TCP", NodePort: 1, TargetPort: intstr.FromInt(1)})
s.Spec.ClusterIP = "None"
s.Spec.ClusterIPs = []string{"None"}
@ -11463,6 +11490,7 @@ func TestValidateServiceCreate(t *testing.T) {
name: "sessionAffinityConfig can't be set when session affinity is None",
tweakSvc: func(s *core.Service) {
s.Spec.Type = core.ServiceTypeLoadBalancer
s.Spec.ExternalTrafficPolicy = core.ServiceExternalTrafficPolicyTypeCluster
s.Spec.AllocateLoadBalancerNodePorts = utilpointer.BoolPtr(true)
s.Spec.SessionAffinity = core.ServiceAffinityNone
s.Spec.SessionAffinityConfig = &core.SessionAffinityConfig{
@ -11916,6 +11944,7 @@ func TestValidateServiceCreate(t *testing.T) {
name: "valid LoadBalancerClass when type is LoadBalancer",
tweakSvc: func(s *core.Service) {
s.Spec.Type = core.ServiceTypeLoadBalancer
s.Spec.ExternalTrafficPolicy = core.ServiceExternalTrafficPolicyTypeCluster
s.Spec.AllocateLoadBalancerNodePorts = utilpointer.BoolPtr(true)
s.Spec.LoadBalancerClass = utilpointer.StringPtr("test.com/test-load-balancer-class")
},
@ -11925,6 +11954,7 @@ func TestValidateServiceCreate(t *testing.T) {
name: "invalid LoadBalancerClass",
tweakSvc: func(s *core.Service) {
s.Spec.Type = core.ServiceTypeLoadBalancer
s.Spec.ExternalTrafficPolicy = core.ServiceExternalTrafficPolicyTypeCluster
s.Spec.AllocateLoadBalancerNodePorts = utilpointer.BoolPtr(true)
s.Spec.LoadBalancerClass = utilpointer.StringPtr("Bad/LoadBalancerClass")
},
@ -11955,7 +11985,7 @@ func TestValidateServiceCreate(t *testing.T) {
}
}
func TestValidateServiceExternalTrafficFieldsCombination(t *testing.T) {
func TestValidateServiceExternalTrafficPolicy(t *testing.T) {
testCases := []struct {
name string
tweakSvc func(svc *core.Service) // Given a basic valid service, each test case can customize it.
@ -12014,12 +12044,26 @@ func TestValidateServiceExternalTrafficFieldsCombination(t *testing.T) {
},
numErrs: 2,
},
{
name: "externalTrafficPolicy is required on NodePort service",
tweakSvc: func(s *core.Service) {
s.Spec.Type = core.ServiceTypeNodePort
},
numErrs: 1,
},
{
name: "externalTrafficPolicy is required on LoadBalancer service",
tweakSvc: func(s *core.Service) {
s.Spec.Type = core.ServiceTypeLoadBalancer
},
numErrs: 1,
},
}
for _, tc := range testCases {
svc := makeValidService()
tc.tweakSvc(&svc)
errs := ValidateServiceExternalTrafficFieldsCombination(&svc)
errs := validateServiceExternalTrafficPolicy(&svc)
if len(errs) != tc.numErrs {
t.Errorf("Unexpected error list for case %q: %v", tc.name, errs.ToAggregate())
}
@ -13510,6 +13554,7 @@ func TestValidateServiceUpdate(t *testing.T) {
name: "change type",
tweakSvc: func(oldSvc, newSvc *core.Service) {
newSvc.Spec.Type = core.ServiceTypeLoadBalancer
newSvc.Spec.ExternalTrafficPolicy = core.ServiceExternalTrafficPolicyTypeCluster
newSvc.Spec.AllocateLoadBalancerNodePorts = utilpointer.BoolPtr(true)
},
numErrs: 0,
@ -13525,6 +13570,7 @@ func TestValidateServiceUpdate(t *testing.T) {
name: "change type -> nodeport",
tweakSvc: func(oldSvc, newSvc *core.Service) {
newSvc.Spec.Type = core.ServiceTypeNodePort
newSvc.Spec.ExternalTrafficPolicy = core.ServiceExternalTrafficPolicyTypeCluster
},
numErrs: 0,
},
@ -13534,6 +13580,7 @@ func TestValidateServiceUpdate(t *testing.T) {
oldSvc.Spec.Type = core.ServiceTypeLoadBalancer
oldSvc.Spec.AllocateLoadBalancerNodePorts = utilpointer.BoolPtr(true)
newSvc.Spec.Type = core.ServiceTypeLoadBalancer
newSvc.Spec.ExternalTrafficPolicy = core.ServiceExternalTrafficPolicyTypeCluster
newSvc.Spec.AllocateLoadBalancerNodePorts = utilpointer.BoolPtr(true)
newSvc.Spec.LoadBalancerSourceRanges = []string{"10.0.0.0/8"}
},
@ -13546,6 +13593,7 @@ func TestValidateServiceUpdate(t *testing.T) {
oldSvc.Spec.AllocateLoadBalancerNodePorts = utilpointer.BoolPtr(true)
oldSvc.Spec.LoadBalancerSourceRanges = []string{"10.0.0.0/8"}
newSvc.Spec.Type = core.ServiceTypeLoadBalancer
newSvc.Spec.ExternalTrafficPolicy = core.ServiceExternalTrafficPolicyTypeCluster
newSvc.Spec.AllocateLoadBalancerNodePorts = utilpointer.BoolPtr(true)
newSvc.Spec.LoadBalancerSourceRanges = []string{"10.100.0.0/16"}
},
@ -13557,6 +13605,7 @@ func TestValidateServiceUpdate(t *testing.T) {
newSvc.Spec.ClusterIP = "None"
newSvc.Spec.ClusterIPs = []string{"None"}
newSvc.Spec.Type = core.ServiceTypeLoadBalancer
newSvc.Spec.ExternalTrafficPolicy = core.ServiceExternalTrafficPolicyTypeCluster
newSvc.Spec.AllocateLoadBalancerNodePorts = utilpointer.BoolPtr(true)
},
numErrs: 1,
@ -13630,6 +13679,7 @@ func TestValidateServiceUpdate(t *testing.T) {
tweakSvc: func(oldSvc, newSvc *core.Service) {
oldSvc.Spec.Type = core.ServiceTypeClusterIP
newSvc.Spec.Type = core.ServiceTypeNodePort
newSvc.Spec.ExternalTrafficPolicy = core.ServiceExternalTrafficPolicyTypeCluster
oldSvc.Spec.ClusterIP = "1.2.3.4"
oldSvc.Spec.ClusterIPs = []string{"1.2.3.4"}
@ -13644,6 +13694,7 @@ func TestValidateServiceUpdate(t *testing.T) {
tweakSvc: func(oldSvc, newSvc *core.Service) {
oldSvc.Spec.Type = core.ServiceTypeClusterIP
newSvc.Spec.Type = core.ServiceTypeNodePort
newSvc.Spec.ExternalTrafficPolicy = core.ServiceExternalTrafficPolicyTypeCluster
oldSvc.Spec.ClusterIP = ""
oldSvc.Spec.ClusterIPs = nil
@ -13658,6 +13709,7 @@ func TestValidateServiceUpdate(t *testing.T) {
tweakSvc: func(oldSvc, newSvc *core.Service) {
oldSvc.Spec.Type = core.ServiceTypeClusterIP
newSvc.Spec.Type = core.ServiceTypeLoadBalancer
newSvc.Spec.ExternalTrafficPolicy = core.ServiceExternalTrafficPolicyTypeCluster
newSvc.Spec.AllocateLoadBalancerNodePorts = utilpointer.BoolPtr(true)
oldSvc.Spec.ClusterIP = "1.2.3.4"
@ -13673,6 +13725,7 @@ func TestValidateServiceUpdate(t *testing.T) {
tweakSvc: func(oldSvc, newSvc *core.Service) {
oldSvc.Spec.Type = core.ServiceTypeClusterIP
newSvc.Spec.Type = core.ServiceTypeLoadBalancer
newSvc.Spec.ExternalTrafficPolicy = core.ServiceExternalTrafficPolicyTypeCluster
newSvc.Spec.AllocateLoadBalancerNodePorts = utilpointer.BoolPtr(true)
oldSvc.Spec.ClusterIP = ""
@ -13689,6 +13742,7 @@ func TestValidateServiceUpdate(t *testing.T) {
oldSvc.Spec.Type = core.ServiceTypeLoadBalancer
oldSvc.Spec.AllocateLoadBalancerNodePorts = utilpointer.BoolPtr(true)
newSvc.Spec.Type = core.ServiceTypeLoadBalancer
newSvc.Spec.ExternalTrafficPolicy = core.ServiceExternalTrafficPolicyTypeCluster
newSvc.Spec.AllocateLoadBalancerNodePorts = utilpointer.BoolPtr(false)
},
numErrs: 0,
@ -13699,6 +13753,7 @@ func TestValidateServiceUpdate(t *testing.T) {
oldSvc.Spec.Type = core.ServiceTypeLoadBalancer
oldSvc.Spec.AllocateLoadBalancerNodePorts = utilpointer.BoolPtr(false)
newSvc.Spec.Type = core.ServiceTypeLoadBalancer
newSvc.Spec.ExternalTrafficPolicy = core.ServiceExternalTrafficPolicyTypeCluster
newSvc.Spec.AllocateLoadBalancerNodePorts = utilpointer.BoolPtr(true)
},
numErrs: 0,
@ -13708,6 +13763,7 @@ func TestValidateServiceUpdate(t *testing.T) {
tweakSvc: func(oldSvc, newSvc *core.Service) {
oldSvc.Spec.Type = core.ServiceTypeNodePort
newSvc.Spec.Type = core.ServiceTypeNodePort
newSvc.Spec.ExternalTrafficPolicy = core.ServiceExternalTrafficPolicyTypeCluster
oldSvc.Spec.ClusterIP = "1.2.3.4"
oldSvc.Spec.ClusterIPs = []string{"1.2.3.4"}
@ -13722,6 +13778,7 @@ func TestValidateServiceUpdate(t *testing.T) {
tweakSvc: func(oldSvc, newSvc *core.Service) {
oldSvc.Spec.Type = core.ServiceTypeNodePort
newSvc.Spec.Type = core.ServiceTypeNodePort
newSvc.Spec.ExternalTrafficPolicy = core.ServiceExternalTrafficPolicyTypeCluster
oldSvc.Spec.ClusterIP = ""
oldSvc.Spec.ClusterIPs = nil
@ -13764,6 +13821,7 @@ func TestValidateServiceUpdate(t *testing.T) {
tweakSvc: func(oldSvc, newSvc *core.Service) {
oldSvc.Spec.Type = core.ServiceTypeNodePort
newSvc.Spec.Type = core.ServiceTypeLoadBalancer
newSvc.Spec.ExternalTrafficPolicy = core.ServiceExternalTrafficPolicyTypeCluster
newSvc.Spec.AllocateLoadBalancerNodePorts = utilpointer.BoolPtr(true)
oldSvc.Spec.ClusterIP = "1.2.3.4"
@ -13779,6 +13837,7 @@ func TestValidateServiceUpdate(t *testing.T) {
tweakSvc: func(oldSvc, newSvc *core.Service) {
oldSvc.Spec.Type = core.ServiceTypeNodePort
newSvc.Spec.Type = core.ServiceTypeLoadBalancer
newSvc.Spec.ExternalTrafficPolicy = core.ServiceExternalTrafficPolicyTypeCluster
newSvc.Spec.AllocateLoadBalancerNodePorts = utilpointer.BoolPtr(true)
oldSvc.Spec.ClusterIP = ""
@ -13795,6 +13854,7 @@ func TestValidateServiceUpdate(t *testing.T) {
oldSvc.Spec.Type = core.ServiceTypeLoadBalancer
oldSvc.Spec.AllocateLoadBalancerNodePorts = utilpointer.BoolPtr(true)
newSvc.Spec.Type = core.ServiceTypeLoadBalancer
newSvc.Spec.ExternalTrafficPolicy = core.ServiceExternalTrafficPolicyTypeCluster
newSvc.Spec.AllocateLoadBalancerNodePorts = utilpointer.BoolPtr(true)
oldSvc.Spec.ClusterIP = "1.2.3.4"
@ -13811,6 +13871,7 @@ func TestValidateServiceUpdate(t *testing.T) {
oldSvc.Spec.Type = core.ServiceTypeLoadBalancer
oldSvc.Spec.AllocateLoadBalancerNodePorts = utilpointer.BoolPtr(true)
newSvc.Spec.Type = core.ServiceTypeLoadBalancer
newSvc.Spec.ExternalTrafficPolicy = core.ServiceExternalTrafficPolicyTypeCluster
newSvc.Spec.AllocateLoadBalancerNodePorts = utilpointer.BoolPtr(true)
oldSvc.Spec.ClusterIP = ""
@ -13857,6 +13918,7 @@ func TestValidateServiceUpdate(t *testing.T) {
oldSvc.Spec.Type = core.ServiceTypeLoadBalancer
oldSvc.Spec.AllocateLoadBalancerNodePorts = utilpointer.BoolPtr(true)
newSvc.Spec.Type = core.ServiceTypeNodePort
newSvc.Spec.ExternalTrafficPolicy = core.ServiceExternalTrafficPolicyTypeCluster
oldSvc.Spec.ClusterIP = "1.2.3.4"
oldSvc.Spec.ClusterIPs = []string{"1.2.3.4"}
@ -13872,6 +13934,7 @@ func TestValidateServiceUpdate(t *testing.T) {
oldSvc.Spec.Type = core.ServiceTypeLoadBalancer
oldSvc.Spec.AllocateLoadBalancerNodePorts = utilpointer.BoolPtr(true)
newSvc.Spec.Type = core.ServiceTypeNodePort
newSvc.Spec.ExternalTrafficPolicy = core.ServiceExternalTrafficPolicyTypeCluster
oldSvc.Spec.ClusterIP = ""
oldSvc.Spec.ClusterIPs = nil
@ -13914,6 +13977,7 @@ func TestValidateServiceUpdate(t *testing.T) {
tweakSvc: func(oldSvc, newSvc *core.Service) {
oldSvc.Spec.Type = core.ServiceTypeNodePort
newSvc.Spec.Type = core.ServiceTypeNodePort
newSvc.Spec.ExternalTrafficPolicy = core.ServiceExternalTrafficPolicyTypeCluster
oldSvc.Spec.Ports = append(oldSvc.Spec.Ports, core.ServicePort{Name: "q", Port: 1, Protocol: "TCP", NodePort: 1, TargetPort: intstr.FromInt(1)})
newSvc.Spec.Ports = append(newSvc.Spec.Ports, core.ServicePort{Name: "q", Port: 1, Protocol: "TCP", NodePort: 1, TargetPort: intstr.FromInt(1)})
@ -14366,6 +14430,7 @@ func TestValidateServiceUpdate(t *testing.T) {
oldSvc.Spec.LoadBalancerClass = utilpointer.StringPtr("test.com/test-old")
newSvc.Spec.Type = core.ServiceTypeLoadBalancer
newSvc.Spec.ExternalTrafficPolicy = core.ServiceExternalTrafficPolicyTypeCluster
newSvc.Spec.AllocateLoadBalancerNodePorts = utilpointer.BoolPtr(true)
newSvc.Spec.LoadBalancerClass = utilpointer.StringPtr("test.com/test-old")
},
@ -14379,6 +14444,7 @@ func TestValidateServiceUpdate(t *testing.T) {
oldSvc.Spec.LoadBalancerClass = utilpointer.StringPtr("test.com/test-old")
newSvc.Spec.Type = core.ServiceTypeLoadBalancer
newSvc.Spec.ExternalTrafficPolicy = core.ServiceExternalTrafficPolicyTypeCluster
newSvc.Spec.AllocateLoadBalancerNodePorts = utilpointer.BoolPtr(true)
newSvc.Spec.LoadBalancerClass = utilpointer.StringPtr("test.com/test-new")
},
@ -14392,6 +14458,7 @@ func TestValidateServiceUpdate(t *testing.T) {
oldSvc.Spec.LoadBalancerClass = utilpointer.StringPtr("test.com/test-old")
newSvc.Spec.Type = core.ServiceTypeLoadBalancer
newSvc.Spec.ExternalTrafficPolicy = core.ServiceExternalTrafficPolicyTypeCluster
newSvc.Spec.AllocateLoadBalancerNodePorts = utilpointer.BoolPtr(true)
newSvc.Spec.LoadBalancerClass = nil
},
@ -14405,6 +14472,7 @@ func TestValidateServiceUpdate(t *testing.T) {
oldSvc.Spec.LoadBalancerClass = nil
newSvc.Spec.Type = core.ServiceTypeLoadBalancer
newSvc.Spec.ExternalTrafficPolicy = core.ServiceExternalTrafficPolicyTypeCluster
newSvc.Spec.AllocateLoadBalancerNodePorts = utilpointer.BoolPtr(true)
newSvc.Spec.LoadBalancerClass = utilpointer.StringPtr("test.com/test-new")
},
@ -14416,6 +14484,7 @@ func TestValidateServiceUpdate(t *testing.T) {
oldSvc.Spec.Type = core.ServiceTypeClusterIP
newSvc.Spec.Type = core.ServiceTypeLoadBalancer
newSvc.Spec.ExternalTrafficPolicy = core.ServiceExternalTrafficPolicyTypeCluster
newSvc.Spec.AllocateLoadBalancerNodePorts = utilpointer.BoolPtr(true)
newSvc.Spec.LoadBalancerClass = utilpointer.StringPtr("test.com/test-load-balancer-class")
},
@ -14427,6 +14496,7 @@ func TestValidateServiceUpdate(t *testing.T) {
oldSvc.Spec.Type = core.ServiceTypeClusterIP
newSvc.Spec.Type = core.ServiceTypeLoadBalancer
newSvc.Spec.ExternalTrafficPolicy = core.ServiceExternalTrafficPolicyTypeCluster
newSvc.Spec.AllocateLoadBalancerNodePorts = utilpointer.BoolPtr(true)
newSvc.Spec.LoadBalancerClass = nil
},
@ -14438,6 +14508,7 @@ func TestValidateServiceUpdate(t *testing.T) {
oldSvc.Spec.Type = core.ServiceTypeClusterIP
newSvc.Spec.Type = core.ServiceTypeLoadBalancer
newSvc.Spec.ExternalTrafficPolicy = core.ServiceExternalTrafficPolicyTypeCluster
newSvc.Spec.AllocateLoadBalancerNodePorts = utilpointer.BoolPtr(true)
newSvc.Spec.LoadBalancerClass = utilpointer.StringPtr("Bad/LoadBalancerclass")
},
@ -14469,6 +14540,7 @@ func TestValidateServiceUpdate(t *testing.T) {
oldSvc.Spec.Type = core.ServiceTypeNodePort
newSvc.Spec.Type = core.ServiceTypeNodePort
newSvc.Spec.ExternalTrafficPolicy = core.ServiceExternalTrafficPolicyTypeCluster
newSvc.Spec.LoadBalancerClass = utilpointer.StringPtr("test.com/test-load-balancer-class")
},
numErrs: 2,
@ -14505,6 +14577,7 @@ func TestValidateServiceUpdate(t *testing.T) {
oldSvc.Spec.LoadBalancerClass = utilpointer.StringPtr("test.com/test-load-balancer-class")
newSvc.Spec.Type = core.ServiceTypeNodePort
newSvc.Spec.ExternalTrafficPolicy = core.ServiceExternalTrafficPolicyTypeCluster
newSvc.Spec.LoadBalancerClass = utilpointer.StringPtr("test.com/test-load-balancer-class")
},
numErrs: 2,

View File

@ -254,19 +254,25 @@ func (c LegacyRESTStorageProvider) NewLegacyRESTStorage(restOptionsGetter generi
return LegacyRESTStorage{}, genericapiserver.APIGroupInfo{}, err
}
serviceRESTStorage, serviceStatusStorage, err := servicestore.NewGenericREST(restOptionsGetter, serviceClusterIPRange, secondaryServiceClusterIPAllocator != nil)
serviceIPAllocators := map[api.IPFamily]ipallocator.Interface{
serviceClusterIPAllocator.IPFamily(): serviceClusterIPAllocator,
}
if secondaryServiceClusterIPAllocator != nil {
serviceIPAllocators[secondaryServiceClusterIPAllocator.IPFamily()] = secondaryServiceClusterIPAllocator
}
serviceRESTStorage, serviceStatusStorage, serviceRESTProxy, err := servicestore.NewREST(
restOptionsGetter,
serviceClusterIPAllocator.IPFamily(),
serviceIPAllocators,
serviceNodePortAllocator,
endpointsStorage,
podStorage.Pod,
c.ProxyTransport)
if err != nil {
return LegacyRESTStorage{}, genericapiserver.APIGroupInfo{}, err
}
serviceRest, serviceRestProxy := servicestore.NewREST(serviceRESTStorage,
endpointsStorage,
podStorage.Pod,
serviceClusterIPAllocator,
secondaryServiceClusterIPAllocator,
serviceNodePortAllocator,
c.ProxyTransport)
restStorageMap := map[string]rest.Storage{
"pods": podStorage.Pod,
"pods/attach": podStorage.Attach,
@ -283,8 +289,8 @@ func (c LegacyRESTStorageProvider) NewLegacyRESTStorage(restOptionsGetter generi
"replicationControllers": controllerStorage.Controller,
"replicationControllers/status": controllerStorage.Status,
"services": serviceRest,
"services/proxy": serviceRestProxy,
"services": serviceRESTStorage,
"services/proxy": serviceRESTProxy,
"services/status": serviceStatusStorage,
"endpoints": endpointsStorage,

View File

@ -37,6 +37,9 @@ type Interface interface {
CIDR() net.IPNet
IPFamily() api.IPFamily
Has(ip net.IP) bool
// DryRun offers a way to try operations without persisting them.
DryRun() Interface
}
var (
@ -46,11 +49,12 @@ var (
)
type ErrNotInRange struct {
IP net.IP
ValidRange string
}
func (e *ErrNotInRange) Error() string {
return fmt.Sprintf("provided IP is not in the valid range. The range of valid IPs is %s", e.ValidRange)
return fmt.Sprintf("the provided IP (%v) is not in the valid range. The range of valid IPs is %s", e.IP, e.ValidRange)
}
// Range is a contiguous block of IPs that can be allocated atomically.
@ -98,11 +102,13 @@ func New(cidr *net.IPNet, allocatorFactory allocator.AllocatorFactory) (*Range,
}
} else {
family = api.IPv4Protocol
// Don't use the IPv4 network's broadcast address.
// Don't use the IPv4 network's broadcast address, but don't just
// Allocate() it - we don't ever want to be able to release it.
max--
}
// Don't use the network's ".0" address.
// Don't use the network's ".0" address, but don't just Allocate() it - we
// don't ever want to be able to release it.
base.Add(base, big.NewInt(1))
max--
@ -114,6 +120,7 @@ func New(cidr *net.IPNet, allocatorFactory allocator.AllocatorFactory) (*Range,
}
var err error
r.alloc, err = allocatorFactory(r.max, rangeSpec)
return &r, err
}
@ -162,18 +169,35 @@ func (r *Range) CIDR() net.IPNet {
return *r.net
}
// DryRun returns a non-persisting form of this Range.
func (r *Range) DryRun() Interface {
return dryRunRange{r}
}
// For clearer code.
const dryRunTrue = true
const dryRunFalse = false
// Allocate attempts to reserve the provided IP. ErrNotInRange or
// ErrAllocated will be returned if the IP is not valid for this range
// or has already been reserved. ErrFull will be returned if there
// are no addresses left.
func (r *Range) Allocate(ip net.IP) error {
return r.allocate(ip, dryRunFalse)
}
func (r *Range) allocate(ip net.IP, dryRun bool) error {
label := r.CIDR()
ok, offset := r.contains(ip)
if !ok {
// update metrics
clusterIPAllocationErrors.WithLabelValues(label.String()).Inc()
return &ErrNotInRange{r.net.String()}
return &ErrNotInRange{ip, r.net.String()}
}
if dryRun {
// Don't bother to check whether the IP is actually free. It's racy and
// not worth the effort to plumb any further.
return nil
}
allocated, err := r.alloc.Allocate(offset)
@ -200,7 +224,17 @@ func (r *Range) Allocate(ip net.IP) error {
// AllocateNext reserves one of the IPs from the pool. ErrFull may
// be returned if there are no addresses left.
func (r *Range) AllocateNext() (net.IP, error) {
return r.allocateNext(dryRunFalse)
}
func (r *Range) allocateNext(dryRun bool) (net.IP, error) {
label := r.CIDR()
if dryRun {
// Don't bother finding a free value. It's racy and not worth the
// effort to plumb any further.
return r.CIDR().IP, nil
}
offset, ok, err := r.alloc.AllocateNext()
if err != nil {
// update metrics
@ -226,10 +260,17 @@ func (r *Range) AllocateNext() (net.IP, error) {
// unallocated IP or an IP out of the range is a no-op and
// returns no error.
func (r *Range) Release(ip net.IP) error {
return r.release(ip, dryRunFalse)
}
func (r *Range) release(ip net.IP, dryRun bool) error {
ok, offset := r.contains(ip)
if !ok {
return nil
}
if dryRun {
return nil
}
err := r.alloc.Release(offset)
if err == nil {
@ -312,3 +353,40 @@ func (r *Range) contains(ip net.IP) (bool, int) {
func calculateIPOffset(base *big.Int, ip net.IP) int {
return int(big.NewInt(0).Sub(netutils.BigForIP(ip), base).Int64())
}
// dryRunRange is a shim to satisfy Interface without persisting state.
type dryRunRange struct {
real *Range
}
func (dry dryRunRange) Allocate(ip net.IP) error {
return dry.real.allocate(ip, dryRunTrue)
}
func (dry dryRunRange) AllocateNext() (net.IP, error) {
return dry.real.allocateNext(dryRunTrue)
}
func (dry dryRunRange) Release(ip net.IP) error {
return dry.real.release(ip, dryRunTrue)
}
func (dry dryRunRange) ForEach(cb func(net.IP)) {
dry.real.ForEach(cb)
}
func (dry dryRunRange) CIDR() net.IPNet {
return dry.real.CIDR()
}
func (dry dryRunRange) IPFamily() api.IPFamily {
return dry.real.IPFamily()
}
func (dry dryRunRange) DryRun() Interface {
return dry
}
func (dry dryRunRange) Has(ip net.IP) bool {
return dry.real.Has(ip)
}

View File

@ -76,34 +76,34 @@ func TestAllocate(t *testing.T) {
}
t.Logf("base: %v", r.base.Bytes())
if f := r.Free(); f != tc.free {
t.Errorf("Test %s unexpected free %d", tc.name, f)
t.Errorf("[%s] wrong free: expected %d, got %d", tc.name, tc.free, f)
}
rCIDR := r.CIDR()
if rCIDR.String() != tc.cidr {
t.Errorf("allocator returned a different cidr")
t.Errorf("[%s] wrong CIDR: expected %v, got %v", tc.name, tc.cidr, rCIDR.String())
}
if r.IPFamily() != tc.family {
t.Errorf("allocator returned wrong IP family")
t.Errorf("[%s] wrong IP family: expected %v, got %v", tc.name, tc.family, r.IPFamily())
}
if f := r.Used(); f != 0 {
t.Errorf("Test %s unexpected used %d", tc.name, f)
t.Errorf("[%s]: wrong used: expected %d, got %d", tc.name, 0, f)
}
found := sets.NewString()
count := 0
for r.Free() > 0 {
ip, err := r.AllocateNext()
if err != nil {
t.Fatalf("Test %s error @ %d: %v", tc.name, count, err)
t.Fatalf("[%s] error @ %d: %v", tc.name, count, err)
}
count++
if !cidr.Contains(ip) {
t.Fatalf("Test %s allocated %s which is outside of %s", tc.name, ip, cidr)
t.Fatalf("[%s] allocated %s which is outside of %s", tc.name, ip, cidr)
}
if found.Has(ip.String()) {
t.Fatalf("Test %s allocated %s twice @ %d", tc.name, ip, count)
t.Fatalf("[%s] allocated %s twice @ %d", tc.name, ip, count)
}
found.Insert(ip.String())
}
@ -116,17 +116,17 @@ func TestAllocate(t *testing.T) {
t.Fatal(err)
}
if f := r.Free(); f != 1 {
t.Errorf("Test %s unexpected free %d", tc.name, f)
t.Errorf("[%s] wrong free: expected %d, got %d", tc.name, 1, f)
}
if f := r.Used(); f != (tc.free - 1) {
t.Errorf("Test %s unexpected free %d", tc.name, f)
t.Errorf("[%s] wrong free: expected %d, got %d", tc.name, tc.free-1, f)
}
ip, err := r.AllocateNext()
if err != nil {
t.Fatal(err)
}
if !released.Equal(ip) {
t.Errorf("Test %s unexpected %s : %s", tc.name, ip, released)
t.Errorf("[%s] unexpected %s : %s", tc.name, ip, released)
}
if err := r.Release(released); err != nil {
@ -142,19 +142,19 @@ func TestAllocate(t *testing.T) {
t.Fatal(err)
}
if f := r.Free(); f != 1 {
t.Errorf("Test %s unexpected free %d", tc.name, f)
t.Errorf("[%s] wrong free: expected %d, got %d", tc.name, 1, f)
}
if f := r.Used(); f != (tc.free - 1) {
t.Errorf("Test %s unexpected free %d", tc.name, f)
t.Errorf("[%s] wrong free: expected %d, got %d", tc.name, tc.free-1, f)
}
if err := r.Allocate(released); err != nil {
t.Fatal(err)
}
if f := r.Free(); f != 0 {
t.Errorf("Test %s unexpected free %d", tc.name, f)
t.Errorf("[%s] wrong free: expected %d, got %d", tc.name, 0, f)
}
if f := r.Used(); f != tc.free {
t.Errorf("Test %s unexpected free %d", tc.name, f)
t.Errorf("[%s] wrong free: expected %d, got %d", tc.name, tc.free, f)
}
}
}
@ -514,3 +514,67 @@ func expectMetrics(t *testing.T, label string, em testMetrics) {
t.Fatalf("metrics error: expected %v, received %v", em, m)
}
}
func TestDryRun(t *testing.T) {
testCases := []struct {
name string
cidr string
family api.IPFamily
}{{
name: "IPv4",
cidr: "192.168.1.0/24",
family: api.IPv4Protocol,
}, {
name: "IPv6",
cidr: "2001:db8:1::/48",
family: api.IPv6Protocol,
}}
for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
_, cidr, err := netutils.ParseCIDRSloppy(tc.cidr)
if err != nil {
t.Fatalf("unexpected failure: %v", err)
}
r, err := NewInMemory(cidr)
if err != nil {
t.Fatalf("unexpected failure: %v", err)
}
baseUsed := r.Used()
rCIDR := r.DryRun().CIDR()
if rCIDR.String() != tc.cidr {
t.Errorf("allocator returned a different cidr")
}
if r.DryRun().IPFamily() != tc.family {
t.Errorf("allocator returned wrong IP family")
}
expectUsed := func(t *testing.T, r *Range, expect int) {
t.Helper()
if u := r.Used(); u != expect {
t.Errorf("unexpected used count: got %d, wanted %d", u, expect)
}
}
expectUsed(t, r, baseUsed)
err = r.DryRun().Allocate(netutils.AddIPOffset(netutils.BigForIP(cidr.IP), 1))
if err != nil {
t.Fatalf("unexpected failure: %v", err)
}
expectUsed(t, r, baseUsed)
_, err = r.DryRun().AllocateNext()
if err != nil {
t.Fatalf("unexpected failure: %v", err)
}
expectUsed(t, r, baseUsed)
if err := r.DryRun().Release(cidr.IP); err != nil {
t.Fatalf("unexpected failure: %v", err)
}
expectUsed(t, r, baseUsed)
})
}
}

File diff suppressed because it is too large Load Diff

File diff suppressed because it is too large Load Diff

File diff suppressed because it is too large Load Diff

View File

@ -18,36 +18,78 @@ package storage
import (
"context"
"fmt"
"math/rand"
"net"
"net/http"
"net/url"
"strconv"
"k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
utilnet "k8s.io/apimachinery/pkg/util/net"
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
"k8s.io/apimachinery/pkg/util/sets"
genericapirequest "k8s.io/apiserver/pkg/endpoints/request"
"k8s.io/apiserver/pkg/registry/generic"
genericregistry "k8s.io/apiserver/pkg/registry/generic/registry"
"k8s.io/apiserver/pkg/registry/rest"
"k8s.io/apiserver/pkg/util/dryrun"
utilfeature "k8s.io/apiserver/pkg/util/feature"
"k8s.io/klog/v2"
api "k8s.io/kubernetes/pkg/apis/core"
"k8s.io/kubernetes/pkg/features"
"k8s.io/kubernetes/pkg/printers"
printersinternal "k8s.io/kubernetes/pkg/printers/internalversion"
printerstorage "k8s.io/kubernetes/pkg/printers/storage"
"k8s.io/kubernetes/pkg/registry/core/service"
registry "k8s.io/kubernetes/pkg/registry/core/service"
svcreg "k8s.io/kubernetes/pkg/registry/core/service"
"k8s.io/kubernetes/pkg/registry/core/service/ipallocator"
"k8s.io/kubernetes/pkg/registry/core/service/portallocator"
"sigs.k8s.io/structured-merge-diff/v4/fieldpath"
netutil "k8s.io/utils/net"
)
type GenericREST struct {
*genericregistry.Store
primaryIPFamily *api.IPFamily
secondaryFamily *api.IPFamily
type EndpointsStorage interface {
rest.Getter
rest.GracefulDeleter
}
// NewGenericREST returns a RESTStorage object that will work against services.
func NewGenericREST(optsGetter generic.RESTOptionsGetter, serviceCIDR net.IPNet, hasSecondary bool) (*GenericREST, *StatusREST, error) {
strategy, _ := registry.StrategyForServiceCIDRs(serviceCIDR, hasSecondary)
type PodStorage interface {
rest.Getter
}
type REST struct {
*genericregistry.Store
primaryIPFamily api.IPFamily
secondaryIPFamily api.IPFamily
alloc Allocators
endpoints EndpointsStorage
pods PodStorage
proxyTransport http.RoundTripper
}
var (
_ rest.CategoriesProvider = &REST{}
_ rest.ShortNamesProvider = &REST{}
_ rest.StorageVersionProvider = &REST{}
_ rest.ResetFieldsStrategy = &REST{}
_ rest.Redirector = &REST{}
)
// NewREST returns a REST object that will work against services.
func NewREST(
optsGetter generic.RESTOptionsGetter,
serviceIPFamily api.IPFamily,
ipAllocs map[api.IPFamily]ipallocator.Interface,
portAlloc portallocator.Interface,
endpoints EndpointsStorage,
pods PodStorage,
proxyTransport http.RoundTripper) (*REST, *StatusREST, *svcreg.ProxyREST, error) {
strategy, _ := svcreg.StrategyForServiceCIDRs(ipAllocs[serviceIPFamily].CIDR(), len(ipAllocs) > 1)
store := &genericregistry.Store{
NewFunc: func() runtime.Object { return &api.Service{} },
@ -64,7 +106,7 @@ func NewGenericREST(optsGetter generic.RESTOptionsGetter, serviceCIDR net.IPNet,
}
options := &generic.StoreOptions{RESTOptions: optsGetter}
if err := store.CompleteWithOptions(options); err != nil {
return nil, nil, err
return nil, nil, nil, err
}
statusStore := *store
@ -72,43 +114,53 @@ func NewGenericREST(optsGetter generic.RESTOptionsGetter, serviceCIDR net.IPNet,
statusStore.UpdateStrategy = statusStrategy
statusStore.ResetFieldsStrategy = statusStrategy
ipv4 := api.IPv4Protocol
ipv6 := api.IPv6Protocol
var primaryIPFamily *api.IPFamily
var secondaryFamily *api.IPFamily
if netutil.IsIPv6CIDR(&serviceCIDR) {
primaryIPFamily = &ipv6
if hasSecondary {
secondaryFamily = &ipv4
}
} else {
primaryIPFamily = &ipv4
if hasSecondary {
secondaryFamily = &ipv6
}
var primaryIPFamily api.IPFamily = serviceIPFamily
var secondaryIPFamily api.IPFamily = "" // sentinel value
if len(ipAllocs) > 1 {
secondaryIPFamily = otherFamily(serviceIPFamily)
}
genericStore := &REST{
Store: store,
primaryIPFamily: primaryIPFamily,
secondaryIPFamily: secondaryIPFamily,
alloc: makeAlloc(serviceIPFamily, ipAllocs, portAlloc),
endpoints: endpoints,
pods: pods,
proxyTransport: proxyTransport,
}
genericStore := &GenericREST{store, primaryIPFamily, secondaryFamily}
store.Decorator = genericStore.defaultOnRead
store.AfterDelete = genericStore.afterDelete
store.BeginCreate = genericStore.beginCreate
store.BeginUpdate = genericStore.beginUpdate
return genericStore, &StatusREST{store: &statusStore}, nil
return genericStore, &StatusREST{store: &statusStore}, &svcreg.ProxyREST{Redirector: genericStore, ProxyTransport: proxyTransport}, nil
}
// otherFamily returns the non-selected IPFamily. This assumes the input is
// valid.
func otherFamily(fam api.IPFamily) api.IPFamily {
if fam == api.IPv4Protocol {
return api.IPv6Protocol
}
return api.IPv4Protocol
}
var (
_ rest.ShortNamesProvider = &GenericREST{}
_ rest.CategoriesProvider = &GenericREST{}
_ rest.ShortNamesProvider = &REST{}
_ rest.CategoriesProvider = &REST{}
)
// ShortNames implements the ShortNamesProvider interface. Returns a list of short names for a resource.
func (r *GenericREST) ShortNames() []string {
func (r *REST) ShortNames() []string {
return []string{"svc"}
}
// Categories implements the CategoriesProvider interface. Returns a list of categories a resource is part of.
func (r *GenericREST) Categories() []string {
func (r *REST) Categories() []string {
return []string{"all"}
}
// StatusREST implements the GenericREST endpoint for changing the status of a service.
// StatusREST implements the REST endpoint for changing the status of a service.
type StatusREST struct {
store *genericregistry.Store
}
@ -134,12 +186,31 @@ func (r *StatusREST) GetResetFields() map[fieldpath.APIVersion]*fieldpath.Set {
return r.store.GetResetFields()
}
// We have a lot of functions that take a pair of "before" and "after" or
// "oldSvc" and "newSvc" args. Convention across the codebase is to pass them
// as (new, old), but it's easy to screw up when they are the same type.
//
// These types force us to pay attention. If the order of the arguments
// matters, please receive them as:
// func something(after After, before Before) {
// oldSvc, newSvc := before.Service, after.Service
//
// If the order of arguments DOES NOT matter, please receive them as:
// func something(lhs, rhs *api.Service) {
type Before struct {
*api.Service
}
type After struct {
*api.Service
}
// defaultOnRead sets interlinked fields that were not previously set on read.
// We can't do this in the normal defaulting path because that same logic
// applies on Get, Create, and Update, but we need to distinguish between them.
//
// This will be called on both Service and ServiceList types.
func (r *GenericREST) defaultOnRead(obj runtime.Object) {
func (r *REST) defaultOnRead(obj runtime.Object) {
switch s := obj.(type) {
case *api.Service:
r.defaultOnReadService(s)
@ -152,7 +223,7 @@ func (r *GenericREST) defaultOnRead(obj runtime.Object) {
}
// defaultOnReadServiceList defaults a ServiceList.
func (r *GenericREST) defaultOnReadServiceList(serviceList *api.ServiceList) {
func (r *REST) defaultOnReadServiceList(serviceList *api.ServiceList) {
if serviceList == nil {
return
}
@ -163,15 +234,14 @@ func (r *GenericREST) defaultOnReadServiceList(serviceList *api.ServiceList) {
}
// defaultOnReadService defaults a single Service.
func (r *GenericREST) defaultOnReadService(service *api.Service) {
func (r *REST) defaultOnReadService(service *api.Service) {
if service == nil {
return
}
// We might find Services that were written before ClusterIP became plural.
// We still want to present a consistent view of them.
// NOTE: the args are (old, new)
svcreg.NormalizeClusterIPs(nil, service)
normalizeClusterIPs(After{service}, Before{nil})
// The rest of this does not apply unless dual-stack is enabled.
if !utilfeature.DefaultFeatureGate.Enabled(features.IPv6DualStack) {
@ -194,7 +264,7 @@ func (r *GenericREST) defaultOnReadService(service *api.Service) {
preferDualStack := api.IPFamilyPolicyPreferDualStack
// headless services
if len(service.Spec.ClusterIPs) == 1 && service.Spec.ClusterIPs[0] == api.ClusterIPNone {
service.Spec.IPFamilies = []api.IPFamily{*r.primaryIPFamily}
service.Spec.IPFamilies = []api.IPFamily{r.primaryIPFamily}
// headless+selectorless
// headless+selectorless takes both families. Why?
@ -203,7 +273,7 @@ func (r *GenericREST) defaultOnReadService(service *api.Service) {
// it to PreferDualStack on any cluster (single or dualstack configured).
if len(service.Spec.Selector) == 0 {
service.Spec.IPFamilyPolicy = &preferDualStack
if *r.primaryIPFamily == api.IPv4Protocol {
if r.primaryIPFamily == api.IPv4Protocol {
service.Spec.IPFamilies = append(service.Spec.IPFamilies, api.IPv6Protocol)
} else {
service.Spec.IPFamilies = append(service.Spec.IPFamilies, api.IPv4Protocol)
@ -214,8 +284,8 @@ func (r *GenericREST) defaultOnReadService(service *api.Service) {
// selector and will have to follow how the cluster is configured. If the cluster is
// configured to dual stack then the service defaults to PreferDualStack. Otherwise we
// default it to SingleStack.
if r.secondaryFamily != nil {
service.Spec.IPFamilies = append(service.Spec.IPFamilies, *r.secondaryFamily)
if r.secondaryIPFamily != "" {
service.Spec.IPFamilies = append(service.Spec.IPFamilies, r.secondaryIPFamily)
service.Spec.IPFamilyPolicy = &preferDualStack
} else {
service.Spec.IPFamilyPolicy = &singleStack
@ -240,3 +310,336 @@ func (r *GenericREST) defaultOnReadService(service *api.Service) {
}
}
}
func (r *REST) afterDelete(obj runtime.Object, options *metav1.DeleteOptions) {
svc := obj.(*api.Service)
// Normally this defaulting is done automatically, but the hook (Decorator)
// is called at the end of this process, and we want the fully-formed
// object.
r.defaultOnReadService(svc)
// Only perform the cleanup if this is a non-dryrun deletion
if !dryrun.IsDryRun(options.DryRun) {
// It would be better if we had the caller context, but that changes
// this hook signature.
ctx := genericapirequest.WithNamespace(genericapirequest.NewContext(), svc.Namespace)
// TODO: This is clumsy. It was added for fear that the endpoints
// controller might lag, and we could end up rusing the service name
// with old endpoints. We should solve that better and remove this, or
// else we should do this for EndpointSlice, too.
_, _, err := r.endpoints.Delete(ctx, svc.Name, rest.ValidateAllObjectFunc, &metav1.DeleteOptions{})
if err != nil && !errors.IsNotFound(err) {
klog.Errorf("delete service endpoints %s/%s failed: %v", svc.Name, svc.Namespace, err)
}
r.alloc.releaseAllocatedResources(svc)
}
}
func (r *REST) beginCreate(ctx context.Context, obj runtime.Object, options *metav1.CreateOptions) (genericregistry.FinishFunc, error) {
svc := obj.(*api.Service)
// Make sure ClusterIP and ClusterIPs are in sync. This has to happen
// early, before anyone looks at them.
normalizeClusterIPs(After{svc}, Before{nil})
// Allocate IPs and ports. If we had a transactional store, this would just
// be part of the larger transaction. We don't have that, so we have to do
// it manually. This has to happen here and not in any earlier hooks (e.g.
// defaulting) because it needs to be aware of flags and be able to access
// API storage.
txn, err := r.alloc.allocateCreate(svc, dryrun.IsDryRun(options.DryRun))
if err != nil {
return nil, err
}
// Our cleanup callback
finish := func(_ context.Context, success bool) {
if success {
txn.Commit()
} else {
txn.Revert()
}
}
return finish, nil
}
func (r *REST) beginUpdate(ctx context.Context, obj, oldObj runtime.Object, options *metav1.UpdateOptions) (genericregistry.FinishFunc, error) {
newSvc := obj.(*api.Service)
oldSvc := oldObj.(*api.Service)
// Fix up allocated values that the client may have not specified (for
// idempotence).
patchAllocatedValues(After{newSvc}, Before{oldSvc})
// Make sure ClusterIP and ClusterIPs are in sync. This has to happen
// early, before anyone looks at them.
normalizeClusterIPs(After{newSvc}, Before{oldSvc})
// Allocate and initialize fields.
txn, err := r.alloc.allocateUpdate(After{newSvc}, Before{oldSvc}, dryrun.IsDryRun(options.DryRun))
if err != nil {
return nil, err
}
// Our cleanup callback
finish := func(_ context.Context, success bool) {
if success {
txn.Commit()
} else {
txn.Revert()
}
}
return finish, nil
}
// ResourceLocation returns a URL to which one can send traffic for the specified service.
func (r *REST) ResourceLocation(ctx context.Context, id string) (*url.URL, http.RoundTripper, error) {
// Allow ID as "svcname", "svcname:port", or "scheme:svcname:port".
svcScheme, svcName, portStr, valid := utilnet.SplitSchemeNamePort(id)
if !valid {
return nil, nil, errors.NewBadRequest(fmt.Sprintf("invalid service request %q", id))
}
// If a port *number* was specified, find the corresponding service port name
if portNum, err := strconv.ParseInt(portStr, 10, 64); err == nil {
obj, err := r.Get(ctx, svcName, &metav1.GetOptions{})
if err != nil {
return nil, nil, err
}
svc := obj.(*api.Service)
found := false
for _, svcPort := range svc.Spec.Ports {
if int64(svcPort.Port) == portNum {
// use the declared port's name
portStr = svcPort.Name
found = true
break
}
}
if !found {
return nil, nil, errors.NewServiceUnavailable(fmt.Sprintf("no service port %d found for service %q", portNum, svcName))
}
}
obj, err := r.endpoints.Get(ctx, svcName, &metav1.GetOptions{})
if err != nil {
return nil, nil, err
}
eps := obj.(*api.Endpoints)
if len(eps.Subsets) == 0 {
return nil, nil, errors.NewServiceUnavailable(fmt.Sprintf("no endpoints available for service %q", svcName))
}
// Pick a random Subset to start searching from.
ssSeed := rand.Intn(len(eps.Subsets))
// Find a Subset that has the port.
for ssi := 0; ssi < len(eps.Subsets); ssi++ {
ss := &eps.Subsets[(ssSeed+ssi)%len(eps.Subsets)]
if len(ss.Addresses) == 0 {
continue
}
for i := range ss.Ports {
if ss.Ports[i].Name == portStr {
addrSeed := rand.Intn(len(ss.Addresses))
// This is a little wonky, but it's expensive to test for the presence of a Pod
// So we repeatedly try at random and validate it, this means that for an invalid
// service with a lot of endpoints we're going to potentially make a lot of calls,
// but in the expected case we'll only make one.
for try := 0; try < len(ss.Addresses); try++ {
addr := ss.Addresses[(addrSeed+try)%len(ss.Addresses)]
// TODO(thockin): do we really need this check?
if err := isValidAddress(ctx, &addr, r.pods); err != nil {
utilruntime.HandleError(fmt.Errorf("Address %v isn't valid (%v)", addr, err))
continue
}
ip := addr.IP
port := int(ss.Ports[i].Port)
return &url.URL{
Scheme: svcScheme,
Host: net.JoinHostPort(ip, strconv.Itoa(port)),
}, r.proxyTransport, nil
}
utilruntime.HandleError(fmt.Errorf("Failed to find a valid address, skipping subset: %v", ss))
}
}
}
return nil, nil, errors.NewServiceUnavailable(fmt.Sprintf("no endpoints available for service %q", id))
}
func isValidAddress(ctx context.Context, addr *api.EndpointAddress, pods rest.Getter) error {
if addr.TargetRef == nil {
return fmt.Errorf("Address has no target ref, skipping: %v", addr)
}
if genericapirequest.NamespaceValue(ctx) != addr.TargetRef.Namespace {
return fmt.Errorf("Address namespace doesn't match context namespace")
}
obj, err := pods.Get(ctx, addr.TargetRef.Name, &metav1.GetOptions{})
if err != nil {
return err
}
pod, ok := obj.(*api.Pod)
if !ok {
return fmt.Errorf("failed to cast to pod: %v", obj)
}
if pod == nil {
return fmt.Errorf("pod is missing, skipping (%s/%s)", addr.TargetRef.Namespace, addr.TargetRef.Name)
}
for _, podIP := range pod.Status.PodIPs {
if podIP.IP == addr.IP {
return nil
}
}
return fmt.Errorf("pod ip(s) doesn't match endpoint ip, skipping: %v vs %s (%s/%s)", pod.Status.PodIPs, addr.IP, addr.TargetRef.Namespace, addr.TargetRef.Name)
}
// normalizeClusterIPs adjust clusterIPs based on ClusterIP. This must not
// consider any other fields.
func normalizeClusterIPs(after After, before Before) {
oldSvc, newSvc := before.Service, after.Service
// In all cases here, we don't need to over-think the inputs. Validation
// will be called on the new object soon enough. All this needs to do is
// try to divine what user meant with these linked fields. The below
// is verbosely written for clarity.
// **** IMPORTANT *****
// as a governing rule. User must (either)
// -- Use singular only (old client)
// -- singular and plural fields (new clients)
if oldSvc == nil {
// This was a create operation.
// User specified singular and not plural (e.g. an old client), so init
// plural for them.
if len(newSvc.Spec.ClusterIP) > 0 && len(newSvc.Spec.ClusterIPs) == 0 {
newSvc.Spec.ClusterIPs = []string{newSvc.Spec.ClusterIP}
return
}
// we don't init singular based on plural because
// new client must use both fields
// Either both were not specified (will be allocated) or both were
// specified (will be validated).
return
}
// This was an update operation
// ClusterIPs were cleared by an old client which was trying to patch
// some field and didn't provide ClusterIPs
if len(oldSvc.Spec.ClusterIPs) > 0 && len(newSvc.Spec.ClusterIPs) == 0 {
// if ClusterIP is the same, then it is an old client trying to
// patch service and didn't provide ClusterIPs
if oldSvc.Spec.ClusterIP == newSvc.Spec.ClusterIP {
newSvc.Spec.ClusterIPs = oldSvc.Spec.ClusterIPs
}
}
// clusterIP is not the same
if oldSvc.Spec.ClusterIP != newSvc.Spec.ClusterIP {
// this is a client trying to clear it
if len(oldSvc.Spec.ClusterIP) > 0 && len(newSvc.Spec.ClusterIP) == 0 {
// if clusterIPs are the same, then clear on their behalf
if sameClusterIPs(oldSvc, newSvc) {
newSvc.Spec.ClusterIPs = nil
}
// if they provided nil, then we are fine (handled by patching case above)
// if they changed it then validation will catch it
} else {
// ClusterIP has changed but not cleared *and* ClusterIPs are the same
// then we set ClusterIPs based on ClusterIP
if sameClusterIPs(oldSvc, newSvc) {
newSvc.Spec.ClusterIPs = []string{newSvc.Spec.ClusterIP}
}
}
}
}
// patchAllocatedValues allows clients to avoid a read-modify-write cycle while
// preserving values that we allocated on their behalf. For example, they
// might create a Service without specifying the ClusterIP, in which case we
// allocate one. If they resubmit that same YAML, we want it to succeed.
func patchAllocatedValues(after After, before Before) {
oldSvc, newSvc := before.Service, after.Service
if needsClusterIP(oldSvc) && needsClusterIP(newSvc) {
if newSvc.Spec.ClusterIP == "" {
newSvc.Spec.ClusterIP = oldSvc.Spec.ClusterIP
}
if len(newSvc.Spec.ClusterIPs) == 0 && len(oldSvc.Spec.ClusterIPs) > 0 {
newSvc.Spec.ClusterIPs = oldSvc.Spec.ClusterIPs
}
}
if needsNodePort(oldSvc) && needsNodePort(newSvc) {
nodePortsUsed := func(svc *api.Service) sets.Int32 {
used := sets.NewInt32()
for _, p := range svc.Spec.Ports {
if p.NodePort != 0 {
used.Insert(p.NodePort)
}
}
return used
}
// Build a set of all the ports in oldSvc that are also in newSvc. We know
// we can't patch these values.
used := nodePortsUsed(oldSvc).Intersection(nodePortsUsed(newSvc))
// Map NodePorts by name. The user may have changed other properties
// of the port, but we won't see that here.
np := map[string]int32{}
for i := range oldSvc.Spec.Ports {
p := &oldSvc.Spec.Ports[i]
np[p.Name] = p.NodePort
}
// If newSvc is missing values, try to patch them in when we know them and
// they haven't been used for another port.
for i := range newSvc.Spec.Ports {
p := &newSvc.Spec.Ports[i]
if p.NodePort == 0 {
oldVal := np[p.Name]
if !used.Has(oldVal) {
p.NodePort = oldVal
}
}
}
}
if needsHCNodePort(oldSvc) && needsHCNodePort(newSvc) {
if newSvc.Spec.HealthCheckNodePort == 0 {
newSvc.Spec.HealthCheckNodePort = oldSvc.Spec.HealthCheckNodePort
}
}
}
func needsClusterIP(svc *api.Service) bool {
if svc.Spec.Type == api.ServiceTypeExternalName {
return false
}
return true
}
func needsNodePort(svc *api.Service) bool {
if svc.Spec.Type == api.ServiceTypeNodePort || svc.Spec.Type == api.ServiceTypeLoadBalancer {
return true
}
return false
}
func needsHCNodePort(svc *api.Service) bool {
if svc.Spec.Type != api.ServiceTypeLoadBalancer {
return false
}
if svc.Spec.ExternalTrafficPolicy != api.ServiceExternalTrafficPolicyTypeLocal {
return false
}
return true
}

File diff suppressed because it is too large Load Diff

View File

@ -0,0 +1,62 @@
/*
Copyright 2020 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package storage
// transaction represents something that may need to be finalized on success or
// failure of the larger transaction.
type transaction interface {
// Commit tells the transaction to finalize any changes it may have
// pending. This cannot fail, so errors must be handled internally.
Commit()
// Revert tells the transaction to abandon or undo any changes it may have
// pending. This cannot fail, so errors must be handled internally.
Revert()
}
// metaTransaction is a collection of transactions.
type metaTransaction []transaction
func (mt metaTransaction) Commit() {
for _, t := range mt {
t.Commit()
}
}
func (mt metaTransaction) Revert() {
for _, t := range mt {
t.Revert()
}
}
// callbackTransaction is a transaction which calls arbitrary functions.
type callbackTransaction struct {
commit func()
revert func()
}
func (cb callbackTransaction) Commit() {
if cb.commit != nil {
cb.commit()
}
}
func (cb callbackTransaction) Revert() {
if cb.revert != nil {
cb.revert()
}
}

View File

@ -0,0 +1,167 @@
/*
Copyright 2021 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package storage
import (
"testing"
)
func Test_metaTransaction(t *testing.T) {
const initial = 10
var temp int
tests := []struct {
name string
mt metaTransaction
start int
want int
}{{
name: "commit and revert match",
mt: metaTransaction{
callbackTransaction{
commit: func() {
temp = temp + 1
},
revert: func() {
temp = temp - 1
},
},
},
want: 10,
}, {
name: "commit and revert match multiple times",
mt: metaTransaction{
callbackTransaction{
commit: func() {
temp = temp + 1
},
revert: func() {
temp = temp - 1
},
},
callbackTransaction{
commit: func() {
temp = temp + 2
},
revert: func() {
temp = temp - 2
},
},
callbackTransaction{
commit: func() {
temp = temp + 3
},
revert: func() {
temp = temp - 3
},
},
},
want: 10,
}, {
name: "missing revert",
mt: metaTransaction{
callbackTransaction{
commit: func() {
temp = temp + 1
},
revert: func() {
temp = temp - 1
},
},
callbackTransaction{
commit: func() {
temp = temp + 2
},
},
callbackTransaction{
commit: func() {
temp = temp + 3
},
revert: func() {
temp = temp - 3
},
},
},
want: 12,
}, {
name: "missing commit",
mt: metaTransaction{
callbackTransaction{
commit: func() {
temp = temp + 1
},
revert: func() {
temp = temp - 1
},
},
callbackTransaction{
revert: func() {
temp = temp - 2
},
},
callbackTransaction{
commit: func() {
temp = temp + 3
},
revert: func() {
temp = temp - 3
},
},
},
want: 8,
}, {
name: "commit and revert match multiple but different order",
mt: metaTransaction{
callbackTransaction{
commit: func() {
temp = temp + 1
},
revert: func() {
temp = temp - 2
},
},
callbackTransaction{
commit: func() {
temp = temp + 2
},
revert: func() {
temp = temp - 1
},
},
callbackTransaction{
commit: func() {
temp = temp + 3
},
revert: func() {
temp = temp - 3
},
},
},
want: 10,
}}
t.Parallel()
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
temp = initial
tt.mt.Commit()
tt.mt.Revert()
if temp != tt.want {
t.Fatalf("expected %d got %d", tt.want, temp)
}
})
}
}

View File

@ -109,7 +109,6 @@ func (strategy svcStrategy) PrepareForCreate(ctx context.Context, obj runtime.Ob
service := obj.(*api.Service)
service.Status = api.ServiceStatus{}
NormalizeClusterIPs(nil, service)
dropServiceDisabledFields(service, nil)
}
@ -119,11 +118,8 @@ func (strategy svcStrategy) PrepareForUpdate(ctx context.Context, obj, old runti
oldService := old.(*api.Service)
newService.Status = oldService.Status
patchAllocatedValues(newService, oldService)
NormalizeClusterIPs(oldService, newService)
dropServiceDisabledFields(newService, oldService)
dropTypeDependentFields(newService, oldService)
trimFieldsForDualStackDowngrade(newService, oldService)
}
// Validate validates a new service.
@ -303,126 +299,6 @@ func (serviceStatusStrategy) WarningsOnUpdate(ctx context.Context, obj, old runt
return nil
}
// patchAllocatedValues allows clients to avoid a read-modify-write cycle while
// preserving values that we allocated on their behalf. For example, they
// might create a Service without specifying the ClusterIP, in which case we
// allocate one. If they resubmit that same YAML, we want it to succeed.
func patchAllocatedValues(newSvc, oldSvc *api.Service) {
if needsClusterIP(oldSvc) && needsClusterIP(newSvc) {
if newSvc.Spec.ClusterIP == "" {
newSvc.Spec.ClusterIP = oldSvc.Spec.ClusterIP
}
if len(newSvc.Spec.ClusterIPs) == 0 {
newSvc.Spec.ClusterIPs = oldSvc.Spec.ClusterIPs
}
}
if needsNodePort(oldSvc) && needsNodePort(newSvc) {
nodePortsUsed := func(svc *api.Service) sets.Int32 {
used := sets.NewInt32()
for _, p := range svc.Spec.Ports {
if p.NodePort != 0 {
used.Insert(p.NodePort)
}
}
return used
}
// Build a set of all the ports in oldSvc that are also in newSvc. We know
// we can't patch these values.
used := nodePortsUsed(oldSvc).Intersection(nodePortsUsed(newSvc))
// Map NodePorts by name. The user may have changed other properties
// of the port, but we won't see that here.
np := map[string]int32{}
for i := range oldSvc.Spec.Ports {
p := &oldSvc.Spec.Ports[i]
np[p.Name] = p.NodePort
}
// If newSvc is missing values, try to patch them in when we know them and
// they haven't been used for another port.
for i := range newSvc.Spec.Ports {
p := &newSvc.Spec.Ports[i]
if p.NodePort == 0 {
oldVal := np[p.Name]
if !used.Has(oldVal) {
p.NodePort = oldVal
}
}
}
}
if needsHCNodePort(oldSvc) && needsHCNodePort(newSvc) {
if newSvc.Spec.HealthCheckNodePort == 0 {
newSvc.Spec.HealthCheckNodePort = oldSvc.Spec.HealthCheckNodePort
}
}
}
// NormalizeClusterIPs adjust clusterIPs based on ClusterIP. This must not
// consider any other fields.
func NormalizeClusterIPs(oldSvc, newSvc *api.Service) {
// In all cases here, we don't need to over-think the inputs. Validation
// will be called on the new object soon enough. All this needs to do is
// try to divine what user meant with these linked fields. The below
// is verbosely written for clarity.
// **** IMPORTANT *****
// as a governing rule. User must (either)
// -- Use singular only (old client)
// -- singular and plural fields (new clients)
if oldSvc == nil {
// This was a create operation.
// User specified singular and not plural (e.g. an old client), so init
// plural for them.
if len(newSvc.Spec.ClusterIP) > 0 && len(newSvc.Spec.ClusterIPs) == 0 {
newSvc.Spec.ClusterIPs = []string{newSvc.Spec.ClusterIP}
return
}
// we don't init singular based on plural because
// new client must use both fields
// Either both were not specified (will be allocated) or both were
// specified (will be validated).
return
}
// This was an update operation
// ClusterIPs were cleared by an old client which was trying to patch
// some field and didn't provide ClusterIPs
if len(oldSvc.Spec.ClusterIPs) > 0 && len(newSvc.Spec.ClusterIPs) == 0 {
// if ClusterIP is the same, then it is an old client trying to
// patch service and didn't provide ClusterIPs
if oldSvc.Spec.ClusterIP == newSvc.Spec.ClusterIP {
newSvc.Spec.ClusterIPs = oldSvc.Spec.ClusterIPs
}
}
// clusterIP is not the same
if oldSvc.Spec.ClusterIP != newSvc.Spec.ClusterIP {
// this is a client trying to clear it
if len(oldSvc.Spec.ClusterIP) > 0 && len(newSvc.Spec.ClusterIP) == 0 {
// if clusterIPs are the same, then clear on their behalf
if sameStringSlice(oldSvc.Spec.ClusterIPs, newSvc.Spec.ClusterIPs) {
newSvc.Spec.ClusterIPs = nil
}
// if they provided nil, then we are fine (handled by patching case above)
// if they changed it then validation will catch it
} else {
// ClusterIP has changed but not cleared *and* ClusterIPs are the same
// then we set ClusterIPs based on ClusterIP
if sameStringSlice(oldSvc.Spec.ClusterIPs, newSvc.Spec.ClusterIPs) {
newSvc.Spec.ClusterIPs = []string{newSvc.Spec.ClusterIP}
}
}
}
}
func sameStringSlice(a []string, b []string) bool {
if len(a) != len(b) {
return false
@ -509,9 +385,22 @@ func dropTypeDependentFields(newSvc *api.Service, oldSvc *api.Service) {
newSvc.Spec.LoadBalancerClass = nil
}
// If a user is switching to a type that doesn't need ExternalTrafficPolicy
// AND they did not change this field, it is safe to drop it.
if needsExternalTrafficPolicy(oldSvc) && !needsExternalTrafficPolicy(newSvc) && sameExternalTrafficPolicy(oldSvc, newSvc) {
newSvc.Spec.ExternalTrafficPolicy = api.ServiceExternalTrafficPolicyType("")
}
// NOTE: there are other fields like `selector` which we could wipe.
// Historically we did not wipe them and they are not allocated from
// finite pools, so we are (currently) choosing to leave them alone.
// Clear the load-balancer status if it is no longer appropriate. Although
// LB de-provisioning is actually asynchronous, we don't need to expose the
// user to that complexity.
if newSvc.Spec.Type != api.ServiceTypeLoadBalancer {
newSvc.Status.LoadBalancer = api.LoadBalancerStatus{}
}
}
func needsClusterIP(svc *api.Service) bool {
@ -597,32 +486,10 @@ func sameLoadBalancerClass(oldSvc, newSvc *api.Service) bool {
return *oldSvc.Spec.LoadBalancerClass == *newSvc.Spec.LoadBalancerClass
}
// this func allows user to downgrade a service by just changing
// IPFamilyPolicy to SingleStack
func trimFieldsForDualStackDowngrade(newService, oldService *api.Service) {
if !utilfeature.DefaultFeatureGate.Enabled(features.IPv6DualStack) {
return
}
// not an update
if oldService == nil {
return
}
oldIsDualStack := oldService.Spec.IPFamilyPolicy != nil &&
(*oldService.Spec.IPFamilyPolicy == api.IPFamilyPolicyRequireDualStack ||
*oldService.Spec.IPFamilyPolicy == api.IPFamilyPolicyPreferDualStack)
newIsNotDualStack := newService.Spec.IPFamilyPolicy != nil && *newService.Spec.IPFamilyPolicy == api.IPFamilyPolicySingleStack
// if user want to downgrade then we auto remove secondary ip and family
if oldIsDualStack && newIsNotDualStack {
if len(newService.Spec.ClusterIPs) > 1 {
newService.Spec.ClusterIPs = newService.Spec.ClusterIPs[0:1]
}
if len(newService.Spec.IPFamilies) > 1 {
newService.Spec.IPFamilies = newService.Spec.IPFamilies[0:1]
}
}
func needsExternalTrafficPolicy(svc *api.Service) bool {
return svc.Spec.Type == api.ServiceTypeNodePort || svc.Spec.Type == api.ServiceTypeLoadBalancer
}
func sameExternalTrafficPolicy(oldSvc, newSvc *api.Service) bool {
return oldSvc.Spec.ExternalTrafficPolicy == newSvc.Spec.ExternalTrafficPolicy
}

View File

@ -22,16 +22,14 @@ import (
"k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/util/diff"
"k8s.io/apimachinery/pkg/util/intstr"
genericapirequest "k8s.io/apiserver/pkg/endpoints/request"
"k8s.io/apiserver/pkg/registry/rest"
api "k8s.io/kubernetes/pkg/apis/core"
_ "k8s.io/kubernetes/pkg/apis/core/install"
utilfeature "k8s.io/apiserver/pkg/util/feature"
featuregatetesting "k8s.io/component-base/featuregate/testing"
api "k8s.io/kubernetes/pkg/apis/core"
_ "k8s.io/kubernetes/pkg/apis/core/install"
"k8s.io/kubernetes/pkg/features"
netutils "k8s.io/utils/net"
utilpointer "k8s.io/utils/pointer"
@ -115,84 +113,6 @@ func makeValidServiceCustom(tweaks ...func(svc *api.Service)) *api.Service {
return svc
}
func makeServiceWithClusterIp(clusterIP string, clusterIPs []string) *api.Service {
return &api.Service{
Spec: api.ServiceSpec{
ClusterIP: clusterIP,
ClusterIPs: clusterIPs,
},
}
}
// TODO: This should be done on types that are not part of our API
func TestBeforeUpdate(t *testing.T) {
testCases := []struct {
name string
tweakSvc func(oldSvc, newSvc *api.Service) // given basic valid services, each test case can customize them
expectErr bool
}{
{
name: "no change",
tweakSvc: func(oldSvc, newSvc *api.Service) {
// nothing
},
expectErr: false,
},
{
name: "change port",
tweakSvc: func(oldSvc, newSvc *api.Service) {
newSvc.Spec.Ports[0].Port++
},
expectErr: false,
},
{
name: "bad namespace",
tweakSvc: func(oldSvc, newSvc *api.Service) {
newSvc.Namespace = "#$%%invalid"
},
expectErr: true,
},
{
name: "change name",
tweakSvc: func(oldSvc, newSvc *api.Service) {
newSvc.Name += "2"
},
expectErr: true,
},
{
name: "change ClusterIP",
tweakSvc: func(oldSvc, newSvc *api.Service) {
oldSvc.Spec.ClusterIPs = []string{"1.2.3.4"}
newSvc.Spec.ClusterIPs = []string{"4.3.2.1"}
},
expectErr: true,
},
{
name: "change selector",
tweakSvc: func(oldSvc, newSvc *api.Service) {
newSvc.Spec.Selector = map[string]string{"newkey": "newvalue"}
},
expectErr: false,
},
}
for _, tc := range testCases {
strategy, _ := newStrategy("172.30.0.0/16", false)
oldSvc := makeValidService()
newSvc := makeValidService()
tc.tweakSvc(oldSvc, newSvc)
ctx := genericapirequest.NewDefaultContext()
err := rest.BeforeUpdate(strategy, ctx, runtime.Object(oldSvc), runtime.Object(newSvc))
if tc.expectErr && err == nil {
t.Errorf("unexpected non-error for %q", tc.name)
}
if !tc.expectErr && err != nil {
t.Errorf("unexpected error for %q: %v", tc.name, err)
}
}
}
func TestServiceStatusStrategy(t *testing.T) {
_, testStatusStrategy := newStrategy("10.0.0.0/16", false)
ctx := genericapirequest.NewDefaultContext()
@ -565,155 +485,6 @@ func TestDropDisabledField(t *testing.T) {
}
func TestNormalizeClusterIPs(t *testing.T) {
testCases := []struct {
name string
oldService *api.Service
newService *api.Service
expectedClusterIP string
expectedClusterIPs []string
}{
{
name: "new - only clusterip used",
oldService: nil,
newService: makeServiceWithClusterIp("10.0.0.10", nil),
expectedClusterIP: "10.0.0.10",
expectedClusterIPs: []string{"10.0.0.10"},
},
{
name: "new - only clusterips used",
oldService: nil,
newService: makeServiceWithClusterIp("", []string{"10.0.0.10"}),
expectedClusterIP: "", // this is a validation issue, and validation will catch it
expectedClusterIPs: []string{"10.0.0.10"},
},
{
name: "new - both used",
oldService: nil,
newService: makeServiceWithClusterIp("10.0.0.10", []string{"10.0.0.10"}),
expectedClusterIP: "10.0.0.10",
expectedClusterIPs: []string{"10.0.0.10"},
},
{
name: "update - no change",
oldService: makeServiceWithClusterIp("10.0.0.10", []string{"10.0.0.10"}),
newService: makeServiceWithClusterIp("10.0.0.10", []string{"10.0.0.10"}),
expectedClusterIP: "10.0.0.10",
expectedClusterIPs: []string{"10.0.0.10"},
},
{
name: "update - malformed change",
oldService: makeServiceWithClusterIp("10.0.0.10", []string{"10.0.0.10"}),
newService: makeServiceWithClusterIp("10.0.0.11", []string{"10.0.0.11"}),
expectedClusterIP: "10.0.0.11",
expectedClusterIPs: []string{"10.0.0.11"},
},
{
name: "update - malformed change on secondary ip",
oldService: makeServiceWithClusterIp("10.0.0.10", []string{"10.0.0.10", "2000::1"}),
newService: makeServiceWithClusterIp("10.0.0.11", []string{"10.0.0.11", "3000::1"}),
expectedClusterIP: "10.0.0.11",
expectedClusterIPs: []string{"10.0.0.11", "3000::1"},
},
{
name: "update - upgrade",
oldService: makeServiceWithClusterIp("10.0.0.10", []string{"10.0.0.10"}),
newService: makeServiceWithClusterIp("10.0.0.10", []string{"10.0.0.10", "2000::1"}),
expectedClusterIP: "10.0.0.10",
expectedClusterIPs: []string{"10.0.0.10", "2000::1"},
},
{
name: "update - downgrade",
oldService: makeServiceWithClusterIp("10.0.0.10", []string{"10.0.0.10", "2000::1"}),
newService: makeServiceWithClusterIp("10.0.0.10", []string{"10.0.0.10"}),
expectedClusterIP: "10.0.0.10",
expectedClusterIPs: []string{"10.0.0.10"},
},
{
name: "update - user cleared cluster IP",
oldService: makeServiceWithClusterIp("10.0.0.10", []string{"10.0.0.10"}),
newService: makeServiceWithClusterIp("", []string{"10.0.0.10"}),
expectedClusterIP: "",
expectedClusterIPs: nil,
},
{
name: "update - user cleared clusterIPs", // *MUST* REMAIN FOR OLD CLIENTS
oldService: makeServiceWithClusterIp("10.0.0.10", []string{"10.0.0.10"}),
newService: makeServiceWithClusterIp("10.0.0.10", nil),
expectedClusterIP: "10.0.0.10",
expectedClusterIPs: []string{"10.0.0.10"},
},
{
name: "update - user cleared both",
oldService: makeServiceWithClusterIp("10.0.0.10", []string{"10.0.0.10"}),
newService: makeServiceWithClusterIp("", nil),
expectedClusterIP: "",
expectedClusterIPs: nil,
},
{
name: "update - user cleared ClusterIP but changed clusterIPs",
oldService: makeServiceWithClusterIp("10.0.0.10", []string{"10.0.0.10"}),
newService: makeServiceWithClusterIp("", []string{"10.0.0.11"}),
expectedClusterIP: "", /* validation catches this */
expectedClusterIPs: []string{"10.0.0.11"},
},
{
name: "update - user cleared ClusterIPs but changed ClusterIP",
oldService: makeServiceWithClusterIp("10.0.0.10", []string{"10.0.0.10", "2000::1"}),
newService: makeServiceWithClusterIp("10.0.0.11", nil),
expectedClusterIP: "10.0.0.11",
expectedClusterIPs: nil,
},
{
name: "update - user changed from None to ClusterIP",
oldService: makeServiceWithClusterIp("None", []string{"None"}),
newService: makeServiceWithClusterIp("10.0.0.10", []string{"None"}),
expectedClusterIP: "10.0.0.10",
expectedClusterIPs: []string{"10.0.0.10"},
},
{
name: "update - user changed from ClusterIP to None",
oldService: makeServiceWithClusterIp("10.0.0.10", []string{"10.0.0.10"}),
newService: makeServiceWithClusterIp("None", []string{"10.0.0.10"}),
expectedClusterIP: "None",
expectedClusterIPs: []string{"None"},
},
{
name: "update - user changed from ClusterIP to None and changed ClusterIPs in a dual stack (new client making a mistake)",
oldService: makeServiceWithClusterIp("10.0.0.10", []string{"10.0.0.10", "2000::1"}),
newService: makeServiceWithClusterIp("None", []string{"10.0.0.11", "2000::1"}),
expectedClusterIP: "None",
expectedClusterIPs: []string{"10.0.0.11", "2000::1"},
},
}
for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
NormalizeClusterIPs(tc.oldService, tc.newService)
if tc.newService == nil {
t.Fatalf("unexpected new service to be nil")
}
if tc.newService.Spec.ClusterIP != tc.expectedClusterIP {
t.Fatalf("expected clusterIP [%v] got [%v]", tc.expectedClusterIP, tc.newService.Spec.ClusterIP)
}
if len(tc.newService.Spec.ClusterIPs) != len(tc.expectedClusterIPs) {
t.Fatalf("expected clusterIPs %v got %v", tc.expectedClusterIPs, tc.newService.Spec.ClusterIPs)
}
for idx, clusterIP := range tc.newService.Spec.ClusterIPs {
if clusterIP != tc.expectedClusterIPs[idx] {
t.Fatalf("expected clusterIP [%v] at index[%v] got [%v]", tc.expectedClusterIPs[idx], idx, tc.newService.Spec.ClusterIPs[idx])
}
}
})
}
}
func TestDropTypeDependentFields(t *testing.T) {
// Tweaks used below.
setTypeExternalName := func(svc *api.Service) {
@ -998,112 +769,3 @@ func TestDropTypeDependentFields(t *testing.T) {
})
}
}
func TestTrimFieldsForDualStackDowngrade(t *testing.T) {
singleStack := api.IPFamilyPolicySingleStack
preferDualStack := api.IPFamilyPolicyPreferDualStack
requireDualStack := api.IPFamilyPolicyRequireDualStack
testCases := []struct {
name string
oldPolicy *api.IPFamilyPolicyType
oldClusterIPs []string
oldFamilies []api.IPFamily
newPolicy *api.IPFamilyPolicyType
expectedClusterIPs []string
expectedIPFamilies []api.IPFamily
}{
{
name: "no change single to single",
oldPolicy: &singleStack,
oldClusterIPs: []string{"10.10.10.10"},
oldFamilies: []api.IPFamily{api.IPv4Protocol},
newPolicy: &singleStack,
expectedClusterIPs: []string{"10.10.10.10"},
expectedIPFamilies: []api.IPFamily{api.IPv4Protocol},
},
{
name: "dualstack to dualstack (preferred)",
oldPolicy: &preferDualStack,
oldClusterIPs: []string{"10.10.10.10", "2000::1"},
oldFamilies: []api.IPFamily{api.IPv4Protocol, api.IPv6Protocol},
newPolicy: &preferDualStack,
expectedClusterIPs: []string{"10.10.10.10", "2000::1"},
expectedIPFamilies: []api.IPFamily{api.IPv4Protocol, api.IPv6Protocol},
},
{
name: "dualstack to dualstack (required)",
oldPolicy: &requireDualStack,
oldClusterIPs: []string{"10.10.10.10", "2000::1"},
oldFamilies: []api.IPFamily{api.IPv4Protocol, api.IPv6Protocol},
newPolicy: &preferDualStack,
expectedClusterIPs: []string{"10.10.10.10", "2000::1"},
expectedIPFamilies: []api.IPFamily{api.IPv4Protocol, api.IPv6Protocol},
},
{
name: "dualstack (preferred) to single",
oldPolicy: &preferDualStack,
oldClusterIPs: []string{"10.10.10.10", "2000::1"},
oldFamilies: []api.IPFamily{api.IPv4Protocol, api.IPv6Protocol},
newPolicy: &singleStack,
expectedClusterIPs: []string{"10.10.10.10"},
expectedIPFamilies: []api.IPFamily{api.IPv4Protocol},
},
{
name: "dualstack (require) to single",
oldPolicy: &requireDualStack,
oldClusterIPs: []string{"2000::1", "10.10.10.10"},
oldFamilies: []api.IPFamily{api.IPv6Protocol, api.IPv4Protocol},
newPolicy: &singleStack,
expectedClusterIPs: []string{"2000::1"},
expectedIPFamilies: []api.IPFamily{api.IPv6Protocol},
},
}
// only when gate is on
defer featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.IPv6DualStack, true)()
for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
oldService := &api.Service{
Spec: api.ServiceSpec{
IPFamilyPolicy: tc.oldPolicy,
ClusterIPs: tc.oldClusterIPs,
IPFamilies: tc.oldFamilies,
},
}
newService := oldService.DeepCopy()
newService.Spec.IPFamilyPolicy = tc.newPolicy
trimFieldsForDualStackDowngrade(newService, oldService)
if len(newService.Spec.ClusterIPs) != len(tc.expectedClusterIPs) {
t.Fatalf("unexpected clusterIPs. expected %v and got %v", tc.expectedClusterIPs, newService.Spec.ClusterIPs)
}
// compare clusterIPS
for i, expectedIP := range tc.expectedClusterIPs {
if expectedIP != newService.Spec.ClusterIPs[i] {
t.Fatalf("unexpected clusterIPs. expected %v and got %v", tc.expectedClusterIPs, newService.Spec.ClusterIPs)
}
}
// families
if len(newService.Spec.IPFamilies) != len(tc.expectedIPFamilies) {
t.Fatalf("unexpected ipfamilies. expected %v and got %v", tc.expectedIPFamilies, newService.Spec.IPFamilies)
}
// compare clusterIPS
for i, expectedIPFamily := range tc.expectedIPFamilies {
if expectedIPFamily != newService.Spec.IPFamilies[i] {
t.Fatalf("unexpected ipfamilies. expected %v and got %v", tc.expectedIPFamilies, newService.Spec.IPFamilies)
}
}
})
}
}

View File

@ -367,7 +367,7 @@ var _ = common.SIGDescribe("[Feature:IPv6DualStack]", func() {
expectedPolicy := v1.IPFamilyPolicyRequireDualStack
expectedFamilies := []v1.IPFamily{v1.IPv4Protocol, v1.IPv6Protocol}
service := createService(t.ServiceName, t.Namespace, t.Labels, nil, expectedFamilies)
service := createService(t.ServiceName, t.Namespace, t.Labels, &expectedPolicy, expectedFamilies)
jig.Labels = t.Labels
err := jig.CreateServicePods(2)
@ -412,7 +412,7 @@ var _ = common.SIGDescribe("[Feature:IPv6DualStack]", func() {
expectedPolicy := v1.IPFamilyPolicyRequireDualStack
expectedFamilies := []v1.IPFamily{v1.IPv6Protocol, v1.IPv4Protocol}
service := createService(t.ServiceName, t.Namespace, t.Labels, nil, expectedFamilies)
service := createService(t.ServiceName, t.Namespace, t.Labels, &expectedPolicy, expectedFamilies)
jig.Labels = t.Labels
err := jig.CreateServicePods(2)