mirror of
https://github.com/k3s-io/kubernetes.git
synced 2025-07-23 03:41:45 +00:00
Merge pull request #96600 from maplain/internal-traffic-policy
Service Internal Traffic Policy
This commit is contained in:
commit
2783f2f76e
6
api/openapi-spec/swagger.json
generated
6
api/openapi-spec/swagger.json
generated
@ -10382,6 +10382,10 @@
|
||||
"format": "int32",
|
||||
"type": "integer"
|
||||
},
|
||||
"internalTrafficPolicy": {
|
||||
"description": "InternalTrafficPolicy specifies if the cluster internal traffic should be routed to all endpoints or node-local endpoints only. \"Cluster\" routes internal traffic to a Service to all endpoints. \"Local\" routes traffic to node-local endpoints only, traffic is dropped if no node-local endpoints are ready. The default value is \"Cluster\".",
|
||||
"type": "string"
|
||||
},
|
||||
"ipFamilies": {
|
||||
"description": "IPFamilies is a list of IP families (e.g. IPv4, IPv6) assigned to this service, and is gated by the \"IPv6DualStack\" feature gate. This field is usually assigned automatically based on cluster configuration and the ipFamilyPolicy field. If this field is specified manually, the requested family is available in the cluster, and ipFamilyPolicy allows it, it will be used; otherwise creation of the service will fail. This field is conditionally mutable: it allows for adding or removing a secondary IP family, but it does not allow changing the primary IP family of the Service. Valid values are \"IPv4\" and \"IPv6\". This field only applies to Services of types ClusterIP, NodePort, and LoadBalancer, and does apply to \"headless\" services. This field will be wiped when updating a Service to type ExternalName.\n\nThis field may hold a maximum of two entries (dual-stack families, in either order). These families must correspond to the values of the clusterIPs field, if specified. Both clusterIPs and ipFamilies are governed by the ipFamilyPolicy field.",
|
||||
"items": {
|
||||
@ -10395,7 +10399,7 @@
|
||||
"type": "string"
|
||||
},
|
||||
"loadBalancerClass": {
|
||||
"description": "loadBalancerClass is the class of the load balancer implementation this Service belongs to. If specified, the value of this field must be a label-style identifier, with an optional prefix, e.g. \"internal-vip\" or \"example.com/internal-vip\". Unprefixed names are reserved for end-users. This field can only be set when the Service type is 'LoadBalancer'. If not set, the default load balancer implementation is used, today this is typically done through the cloud provider integration, but should apply for any default implementation. If set, it is assumed that a load balancer implementation is watching for Services with a matching class. Any default load balancer implementation (e.g. cloud providers) should ignore Services that set this field. This field can only be set when creating or updating a Service to type 'LoadBalancer'. Once set, it can not be changed. This field will be wiped when a service is updated to a non 'LoadBalancer' type. featureGate=LoadBalancerClass",
|
||||
"description": "loadBalancerClass is the class of the load balancer implementation this Service belongs to. If specified, the value of this field must be a label-style identifier, with an optional prefix, e.g. \"internal-vip\" or \"example.com/internal-vip\". Unprefixed names are reserved for end-users. This field can only be set when the Service type is 'LoadBalancer'. If not set, the default load balancer implementation is used, today this is typically done through the cloud provider integration, but should apply for any default implementation. If set, it is assumed that a load balancer implementation is watching for Services with a matching class. Any default load balancer implementation (e.g. cloud providers) should ignore Services that set this field. This field can only be set when creating or updating a Service to type 'LoadBalancer'. Once set, it can not be changed. This field will be wiped when a service is updated to a non 'LoadBalancer' type.",
|
||||
"type": "string"
|
||||
},
|
||||
"loadBalancerIP": {
|
||||
|
@ -20,7 +20,7 @@ import (
|
||||
"fmt"
|
||||
"strings"
|
||||
|
||||
"k8s.io/api/core/v1"
|
||||
v1 "k8s.io/api/core/v1"
|
||||
utilnet "k8s.io/utils/net"
|
||||
)
|
||||
|
||||
@ -76,6 +76,15 @@ func RequestsOnlyLocalTraffic(service *v1.Service) bool {
|
||||
return service.Spec.ExternalTrafficPolicy == v1.ServiceExternalTrafficPolicyTypeLocal
|
||||
}
|
||||
|
||||
// RequestsOnlyLocalTrafficForInternal checks if service prefers Node Local
|
||||
// endpoints for internal traffic
|
||||
func RequestsOnlyLocalTrafficForInternal(service *v1.Service) bool {
|
||||
if service.Spec.InternalTrafficPolicy == nil {
|
||||
return false
|
||||
}
|
||||
return *service.Spec.InternalTrafficPolicy == v1.ServiceInternalTrafficPolicyLocal
|
||||
}
|
||||
|
||||
// NeedsHealthCheck checks if service needs health check.
|
||||
func NeedsHealthCheck(service *v1.Service) bool {
|
||||
if service.Spec.Type != v1.ServiceTypeLoadBalancer {
|
||||
|
@ -20,7 +20,7 @@ import (
|
||||
"strings"
|
||||
"testing"
|
||||
|
||||
"k8s.io/api/core/v1"
|
||||
v1 "k8s.io/api/core/v1"
|
||||
utilnet "k8s.io/utils/net"
|
||||
)
|
||||
|
||||
@ -214,3 +214,30 @@ func TestNeedsHealthCheck(t *testing.T) {
|
||||
},
|
||||
})
|
||||
}
|
||||
|
||||
func TestRequestsOnlyLocalTrafficForInternal(t *testing.T) {
|
||||
checkRequestsOnlyLocalTrafficForInternal := func(expected bool, service *v1.Service) {
|
||||
res := RequestsOnlyLocalTrafficForInternal(service)
|
||||
if res != expected {
|
||||
t.Errorf("Expected internal local traffic = %v, got %v",
|
||||
expected, res)
|
||||
}
|
||||
}
|
||||
|
||||
// default InternalTrafficPolicy is nil
|
||||
checkRequestsOnlyLocalTrafficForInternal(false, &v1.Service{})
|
||||
|
||||
local := v1.ServiceInternalTrafficPolicyLocal
|
||||
checkRequestsOnlyLocalTrafficForInternal(true, &v1.Service{
|
||||
Spec: v1.ServiceSpec{
|
||||
InternalTrafficPolicy: &local,
|
||||
},
|
||||
})
|
||||
|
||||
cluster := v1.ServiceInternalTrafficPolicyCluster
|
||||
checkRequestsOnlyLocalTrafficForInternal(false, &v1.Service{
|
||||
Spec: v1.ServiceSpec{
|
||||
InternalTrafficPolicy: &cluster,
|
||||
},
|
||||
})
|
||||
}
|
||||
|
@ -3475,6 +3475,19 @@ const (
|
||||
ServiceTypeExternalName ServiceType = "ExternalName"
|
||||
)
|
||||
|
||||
// ServiceInternalTrafficPolicyType describes the type of traffic routing for
|
||||
// internal traffic
|
||||
type ServiceInternalTrafficPolicyType string
|
||||
|
||||
const (
|
||||
// ServiceInternalTrafficPolicyCluster routes traffic to all endpoints
|
||||
ServiceInternalTrafficPolicyCluster ServiceInternalTrafficPolicyType = "Cluster"
|
||||
|
||||
// ServiceInternalTrafficPolicyLocal only routes to node-local
|
||||
// endpoints, otherwise drops the traffic
|
||||
ServiceInternalTrafficPolicyLocal ServiceInternalTrafficPolicyType = "Local"
|
||||
)
|
||||
|
||||
// ServiceExternalTrafficPolicyType string
|
||||
type ServiceExternalTrafficPolicyType string
|
||||
|
||||
@ -3739,9 +3752,19 @@ type ServiceSpec struct {
|
||||
// implementation (e.g. cloud providers) should ignore Services that set this field.
|
||||
// This field can only be set when creating or updating a Service to type 'LoadBalancer'.
|
||||
// Once set, it can not be changed. This field will be wiped when a service is updated to a non 'LoadBalancer' type.
|
||||
// featureGate=LoadBalancerClass
|
||||
// +featureGate=LoadBalancerClass
|
||||
// +optional
|
||||
LoadBalancerClass *string
|
||||
|
||||
// InternalTrafficPolicy specifies if the cluster internal traffic
|
||||
// should be routed to all endpoints or node-local endpoints only.
|
||||
// "Cluster" routes internal traffic to a Service to all endpoints.
|
||||
// "Local" routes traffic to node-local endpoints only, traffic is
|
||||
// dropped if no node-local endpoints are ready.
|
||||
// The default value is "Cluster".
|
||||
// +featureGate=ServiceInternalTrafficPolicy
|
||||
// +optional
|
||||
InternalTrafficPolicy *ServiceInternalTrafficPolicyType
|
||||
}
|
||||
|
||||
// ServicePort represents the port on which the service is exposed
|
||||
|
@ -131,6 +131,11 @@ func SetDefaults_Service(obj *v1.Service) {
|
||||
obj.Spec.ExternalTrafficPolicy = v1.ServiceExternalTrafficPolicyTypeCluster
|
||||
}
|
||||
|
||||
if utilfeature.DefaultFeatureGate.Enabled(features.ServiceInternalTrafficPolicy) && obj.Spec.InternalTrafficPolicy == nil {
|
||||
serviceInternalTrafficPolicyCluster := v1.ServiceInternalTrafficPolicyCluster
|
||||
obj.Spec.InternalTrafficPolicy = &serviceInternalTrafficPolicyCluster
|
||||
}
|
||||
|
||||
if utilfeature.DefaultFeatureGate.Enabled(features.ServiceLBNodePortControl) {
|
||||
if obj.Spec.Type == v1.ServiceTypeLoadBalancer {
|
||||
if obj.Spec.AllocateLoadBalancerNodePorts == nil {
|
||||
|
@ -29,8 +29,11 @@ import (
|
||||
"k8s.io/apimachinery/pkg/runtime"
|
||||
"k8s.io/apimachinery/pkg/util/diff"
|
||||
"k8s.io/apimachinery/pkg/util/intstr"
|
||||
utilfeature "k8s.io/apiserver/pkg/util/feature"
|
||||
featuregatetesting "k8s.io/component-base/featuregate/testing"
|
||||
"k8s.io/kubernetes/pkg/api/legacyscheme"
|
||||
corev1 "k8s.io/kubernetes/pkg/apis/core/v1"
|
||||
"k8s.io/kubernetes/pkg/features"
|
||||
utilpointer "k8s.io/utils/pointer"
|
||||
|
||||
// ensure types are installed
|
||||
@ -1798,3 +1801,64 @@ func TestSetDefaultEnableServiceLinks(t *testing.T) {
|
||||
t.Errorf("Expected enableServiceLinks value: %+v\ngot: %+v\n", v1.DefaultEnableServiceLinks, *output.Spec.EnableServiceLinks)
|
||||
}
|
||||
}
|
||||
|
||||
func TestSetDefaultServiceInternalTrafficPolicy(t *testing.T) {
|
||||
cluster := v1.ServiceInternalTrafficPolicyCluster
|
||||
local := v1.ServiceInternalTrafficPolicyLocal
|
||||
testCases := []struct {
|
||||
name string
|
||||
expectedInternalTrafficPolicy v1.ServiceInternalTrafficPolicyType
|
||||
svc v1.Service
|
||||
featureGateOn bool
|
||||
}{
|
||||
{
|
||||
name: "must set default internalTrafficPolicy",
|
||||
expectedInternalTrafficPolicy: v1.ServiceInternalTrafficPolicyCluster,
|
||||
svc: v1.Service{},
|
||||
featureGateOn: true,
|
||||
},
|
||||
{
|
||||
name: "must not set default internalTrafficPolicy when it's cluster",
|
||||
expectedInternalTrafficPolicy: v1.ServiceInternalTrafficPolicyCluster,
|
||||
svc: v1.Service{
|
||||
Spec: v1.ServiceSpec{
|
||||
InternalTrafficPolicy: &cluster,
|
||||
},
|
||||
},
|
||||
featureGateOn: true,
|
||||
},
|
||||
{
|
||||
name: "must not set default internalTrafficPolicy when it's local",
|
||||
expectedInternalTrafficPolicy: v1.ServiceInternalTrafficPolicyLocal,
|
||||
svc: v1.Service{
|
||||
Spec: v1.ServiceSpec{
|
||||
InternalTrafficPolicy: &local,
|
||||
},
|
||||
},
|
||||
featureGateOn: true,
|
||||
},
|
||||
{
|
||||
name: "must not set default internalTrafficPolicy when gate is disabled",
|
||||
expectedInternalTrafficPolicy: "",
|
||||
svc: v1.Service{},
|
||||
featureGateOn: false,
|
||||
},
|
||||
}
|
||||
for _, test := range testCases {
|
||||
t.Run(test.name, func(t *testing.T) {
|
||||
defer featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.ServiceInternalTrafficPolicy, test.featureGateOn)()
|
||||
obj := roundTrip(t, runtime.Object(&test.svc))
|
||||
svc := obj.(*v1.Service)
|
||||
|
||||
if test.expectedInternalTrafficPolicy == "" {
|
||||
if svc.Spec.InternalTrafficPolicy != nil {
|
||||
t.Fatalf("expected .spec.internalTrafficPolicy: null, got %v", *svc.Spec.InternalTrafficPolicy)
|
||||
}
|
||||
} else {
|
||||
if *svc.Spec.InternalTrafficPolicy != test.expectedInternalTrafficPolicy {
|
||||
t.Fatalf("expected .spec.internalTrafficPolicy: %v got %v", test.expectedInternalTrafficPolicy, *svc.Spec.InternalTrafficPolicy)
|
||||
}
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
||||
|
2
pkg/apis/core/v1/zz_generated.conversion.go
generated
2
pkg/apis/core/v1/zz_generated.conversion.go
generated
@ -7636,6 +7636,7 @@ func autoConvert_v1_ServiceSpec_To_core_ServiceSpec(in *v1.ServiceSpec, out *cor
|
||||
out.IPFamilyPolicy = (*core.IPFamilyPolicyType)(unsafe.Pointer(in.IPFamilyPolicy))
|
||||
out.AllocateLoadBalancerNodePorts = (*bool)(unsafe.Pointer(in.AllocateLoadBalancerNodePorts))
|
||||
out.LoadBalancerClass = (*string)(unsafe.Pointer(in.LoadBalancerClass))
|
||||
out.InternalTrafficPolicy = (*core.ServiceInternalTrafficPolicyType)(unsafe.Pointer(in.InternalTrafficPolicy))
|
||||
return nil
|
||||
}
|
||||
|
||||
@ -7664,6 +7665,7 @@ func autoConvert_core_ServiceSpec_To_v1_ServiceSpec(in *core.ServiceSpec, out *v
|
||||
out.TopologyKeys = *(*[]string)(unsafe.Pointer(&in.TopologyKeys))
|
||||
out.AllocateLoadBalancerNodePorts = (*bool)(unsafe.Pointer(in.AllocateLoadBalancerNodePorts))
|
||||
out.LoadBalancerClass = (*string)(unsafe.Pointer(in.LoadBalancerClass))
|
||||
out.InternalTrafficPolicy = (*v1.ServiceInternalTrafficPolicyType)(unsafe.Pointer(in.InternalTrafficPolicy))
|
||||
return nil
|
||||
}
|
||||
|
||||
|
@ -4171,6 +4171,8 @@ var supportedSessionAffinityType = sets.NewString(string(core.ServiceAffinityCli
|
||||
var supportedServiceType = sets.NewString(string(core.ServiceTypeClusterIP), string(core.ServiceTypeNodePort),
|
||||
string(core.ServiceTypeLoadBalancer), string(core.ServiceTypeExternalName))
|
||||
|
||||
var supportedServiceInternalTrafficPolicy = sets.NewString(string(core.ServiceInternalTrafficPolicyCluster), string(core.ServiceExternalTrafficPolicyTypeLocal))
|
||||
|
||||
var supportedServiceIPFamily = sets.NewString(string(core.IPv4Protocol), string(core.IPv6Protocol))
|
||||
var supportedServiceIPFamilyPolicy = sets.NewString(string(core.IPFamilyPolicySingleStack), string(core.IPFamilyPolicyPreferDualStack), string(core.IPFamilyPolicyRequireDualStack))
|
||||
|
||||
@ -4378,6 +4380,10 @@ func ValidateService(service *core.Service) field.ErrorList {
|
||||
|
||||
// external traffic fields
|
||||
allErrs = append(allErrs, validateServiceExternalTrafficFieldsValue(service)...)
|
||||
|
||||
// internal traffic policy field
|
||||
allErrs = append(allErrs, validateServiceInternalTrafficFieldsValue(service)...)
|
||||
|
||||
return allErrs
|
||||
}
|
||||
|
||||
@ -4446,6 +4452,24 @@ func validateServiceExternalTrafficFieldsValue(service *core.Service) field.Erro
|
||||
return allErrs
|
||||
}
|
||||
|
||||
// validateServiceInternalTrafficFieldsValue validates InternalTraffic related
|
||||
// spec have legal value.
|
||||
func validateServiceInternalTrafficFieldsValue(service *core.Service) field.ErrorList {
|
||||
allErrs := field.ErrorList{}
|
||||
|
||||
if utilfeature.DefaultFeatureGate.Enabled(features.ServiceInternalTrafficPolicy) {
|
||||
if service.Spec.InternalTrafficPolicy == nil {
|
||||
allErrs = append(allErrs, field.Required(field.NewPath("spec").Child("internalTrafficPolicy"), ""))
|
||||
}
|
||||
}
|
||||
|
||||
if service.Spec.InternalTrafficPolicy != nil && !supportedServiceInternalTrafficPolicy.Has(string(*service.Spec.InternalTrafficPolicy)) {
|
||||
allErrs = append(allErrs, field.NotSupported(field.NewPath("spec").Child("internalTrafficPolicy"), *service.Spec.InternalTrafficPolicy, supportedServiceInternalTrafficPolicy.List()))
|
||||
}
|
||||
|
||||
return allErrs
|
||||
}
|
||||
|
||||
// ValidateServiceExternalTrafficFieldsCombination validates if ExternalTrafficPolicy,
|
||||
// HealthCheckNodePort and Type combination are legal. For update, it should be called
|
||||
// after clearing externalTraffic related fields for the ease of transitioning between
|
||||
|
@ -33,6 +33,7 @@ import (
|
||||
"k8s.io/apimachinery/pkg/util/validation"
|
||||
"k8s.io/apimachinery/pkg/util/validation/field"
|
||||
utilfeature "k8s.io/apiserver/pkg/util/feature"
|
||||
"k8s.io/component-base/featuregate"
|
||||
featuregatetesting "k8s.io/component-base/featuregate/testing"
|
||||
"k8s.io/kubernetes/pkg/apis/core"
|
||||
"k8s.io/kubernetes/pkg/capabilities"
|
||||
@ -10135,9 +10136,10 @@ func TestValidateServiceCreate(t *testing.T) {
|
||||
preferDualStack := core.IPFamilyPolicyPreferDualStack
|
||||
|
||||
testCases := []struct {
|
||||
name string
|
||||
tweakSvc func(svc *core.Service) // given a basic valid service, each test case can customize it
|
||||
numErrs int
|
||||
name string
|
||||
tweakSvc func(svc *core.Service) // given a basic valid service, each test case can customize it
|
||||
numErrs int
|
||||
featureGates []featuregate.Feature
|
||||
}{
|
||||
{
|
||||
name: "missing namespace",
|
||||
@ -10750,6 +10752,22 @@ func TestValidateServiceCreate(t *testing.T) {
|
||||
},
|
||||
numErrs: 1,
|
||||
},
|
||||
{
|
||||
name: "nil internalTraffic field when feature gate is on",
|
||||
tweakSvc: func(s *core.Service) {
|
||||
s.Spec.InternalTrafficPolicy = nil
|
||||
},
|
||||
featureGates: []featuregate.Feature{features.ServiceInternalTrafficPolicy},
|
||||
numErrs: 1,
|
||||
},
|
||||
{
|
||||
name: "invalid internalTraffic field",
|
||||
tweakSvc: func(s *core.Service) {
|
||||
invalid := core.ServiceInternalTrafficPolicyType("invalid")
|
||||
s.Spec.InternalTrafficPolicy = &invalid
|
||||
},
|
||||
numErrs: 1,
|
||||
},
|
||||
{
|
||||
name: "nagative healthCheckNodePort field",
|
||||
tweakSvc: func(s *core.Service) {
|
||||
@ -11323,6 +11341,9 @@ func TestValidateServiceCreate(t *testing.T) {
|
||||
|
||||
for _, tc := range testCases {
|
||||
t.Run(tc.name, func(t *testing.T) {
|
||||
for i := range tc.featureGates {
|
||||
defer featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, tc.featureGates[i], true)()
|
||||
}
|
||||
svc := makeValidService()
|
||||
tc.tweakSvc(&svc)
|
||||
errs := ValidateServiceCreate(&svc)
|
||||
|
5
pkg/apis/core/zz_generated.deepcopy.go
generated
5
pkg/apis/core/zz_generated.deepcopy.go
generated
@ -5335,6 +5335,11 @@ func (in *ServiceSpec) DeepCopyInto(out *ServiceSpec) {
|
||||
*out = new(string)
|
||||
**out = **in
|
||||
}
|
||||
if in.InternalTrafficPolicy != nil {
|
||||
in, out := &in.InternalTrafficPolicy, &out.InternalTrafficPolicy
|
||||
*out = new(ServiceInternalTrafficPolicyType)
|
||||
**out = **in
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
|
@ -709,6 +709,12 @@ const (
|
||||
//
|
||||
// Enable Scope and Namespace fields on IngressClassParametersReference.
|
||||
IngressClassNamespacedParams featuregate.Feature = "IngressClassNamespacedParams"
|
||||
|
||||
// owner: @maplain @andrewsykim
|
||||
// alpha: v1.21
|
||||
//
|
||||
// Enables node-local routing for Service internal traffic
|
||||
ServiceInternalTrafficPolicy featuregate.Feature = "ServiceInternalTrafficPolicy"
|
||||
)
|
||||
|
||||
func init() {
|
||||
@ -816,6 +822,7 @@ var defaultKubernetesFeatureGates = map[featuregate.Feature]featuregate.FeatureS
|
||||
ServiceLoadBalancerClass: {Default: false, PreRelease: featuregate.Alpha},
|
||||
LogarithmicScaleDown: {Default: false, PreRelease: featuregate.Alpha},
|
||||
IngressClassNamespacedParams: {Default: false, PreRelease: featuregate.Alpha},
|
||||
ServiceInternalTrafficPolicy: {Default: false, PreRelease: featuregate.Alpha},
|
||||
|
||||
// inherited features from generic apiserver, relisted here to get a conflict if it is changed
|
||||
// unintentionally on either side:
|
||||
|
@ -1025,10 +1025,17 @@ func (proxier *Proxier) syncProxyRules() {
|
||||
// 2. ServiceTopology is not enabled.
|
||||
// 3. EndpointSlice is not enabled (service topology depends on endpoint slice
|
||||
// to get topology information).
|
||||
if !svcInfo.OnlyNodeLocalEndpoints() && utilfeature.DefaultFeatureGate.Enabled(features.ServiceTopology) && utilfeature.DefaultFeatureGate.Enabled(features.EndpointSliceProxying) {
|
||||
if !svcInfo.NodeLocalExternal() && utilfeature.DefaultFeatureGate.Enabled(features.ServiceTopology) && utilfeature.DefaultFeatureGate.Enabled(features.EndpointSliceProxying) {
|
||||
allEndpoints = proxy.FilterTopologyEndpoint(proxier.nodeLabels, svcInfo.TopologyKeys(), allEndpoints)
|
||||
}
|
||||
|
||||
// Service InternalTrafficPolicy is only enabled when all of the
|
||||
// following are true:
|
||||
// 1. InternalTrafficPolicy is Local
|
||||
// 2. ServiceInternalTrafficPolicy feature gate is on
|
||||
if utilfeature.DefaultFeatureGate.Enabled(features.ServiceInternalTrafficPolicy) && svcInfo.NodeLocalInternal() {
|
||||
allEndpoints = proxy.FilterLocalEndpoint(svcInfo.InternalTrafficPolicy(), allEndpoints)
|
||||
}
|
||||
readyEndpoints := make([]proxy.Endpoint, 0, len(allEndpoints))
|
||||
for _, endpoint := range allEndpoints {
|
||||
if !endpoint.IsReady() {
|
||||
@ -1051,7 +1058,7 @@ func (proxier *Proxier) syncProxyRules() {
|
||||
}
|
||||
|
||||
svcXlbChain := svcInfo.serviceLBChainName
|
||||
if svcInfo.OnlyNodeLocalEndpoints() {
|
||||
if svcInfo.NodeLocalExternal() {
|
||||
// Only for services request OnlyLocal traffic
|
||||
// create the per-service LB chain, retaining counters if possible.
|
||||
if lbChain, ok := existingNATChains[svcXlbChain]; ok {
|
||||
@ -1144,7 +1151,7 @@ func (proxier *Proxier) syncProxyRules() {
|
||||
// and the traffic is NOT Local. Local traffic coming from Pods and Nodes will
|
||||
// be always forwarded to the corresponding Service, so no need to SNAT
|
||||
// If we can't differentiate the local traffic we always SNAT.
|
||||
if !svcInfo.OnlyNodeLocalEndpoints() {
|
||||
if !svcInfo.NodeLocalExternal() {
|
||||
destChain = svcChain
|
||||
// This masquerades off-cluster traffic to a External IP.
|
||||
if proxier.localDetector.IsImplemented() {
|
||||
@ -1204,7 +1211,7 @@ func (proxier *Proxier) syncProxyRules() {
|
||||
chosenChain := svcXlbChain
|
||||
// If we are proxying globally, we need to masquerade in case we cross nodes.
|
||||
// If we are proxying only locally, we can retain the source IP.
|
||||
if !svcInfo.OnlyNodeLocalEndpoints() {
|
||||
if !svcInfo.NodeLocalExternal() {
|
||||
utilproxy.WriteLine(proxier.natRules, append(args, "-j", string(KubeMarkMasqChain))...)
|
||||
chosenChain = svcChain
|
||||
}
|
||||
@ -1301,7 +1308,7 @@ func (proxier *Proxier) syncProxyRules() {
|
||||
"-m", protocol, "-p", protocol,
|
||||
"--dport", strconv.Itoa(svcInfo.NodePort()),
|
||||
)
|
||||
if !svcInfo.OnlyNodeLocalEndpoints() {
|
||||
if !svcInfo.NodeLocalExternal() {
|
||||
// Nodeports need SNAT, unless they're local.
|
||||
utilproxy.WriteLine(proxier.natRules, append(args, "-j", string(KubeMarkMasqChain))...)
|
||||
// Jump to the service chain.
|
||||
@ -1395,7 +1402,7 @@ func (proxier *Proxier) syncProxyRules() {
|
||||
localEndpointChains := make([]utiliptables.Chain, 0)
|
||||
for i, endpointChain := range endpointChains {
|
||||
// Write ingress loadbalancing & DNAT rules only for services that request OnlyLocal traffic.
|
||||
if svcInfo.OnlyNodeLocalEndpoints() && endpoints[i].IsLocal {
|
||||
if svcInfo.NodeLocalExternal() && endpoints[i].IsLocal {
|
||||
localEndpointChains = append(localEndpointChains, endpointChains[i])
|
||||
}
|
||||
|
||||
@ -1436,7 +1443,7 @@ func (proxier *Proxier) syncProxyRules() {
|
||||
}
|
||||
|
||||
// The logic below this applies only if this service is marked as OnlyLocal
|
||||
if !svcInfo.OnlyNodeLocalEndpoints() {
|
||||
if !svcInfo.NodeLocalExternal() {
|
||||
continue
|
||||
}
|
||||
|
||||
|
@ -32,8 +32,11 @@ import (
|
||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||
"k8s.io/apimachinery/pkg/types"
|
||||
"k8s.io/apimachinery/pkg/util/intstr"
|
||||
utilfeature "k8s.io/apiserver/pkg/util/feature"
|
||||
featuregatetesting "k8s.io/component-base/featuregate/testing"
|
||||
"k8s.io/component-base/metrics/testutil"
|
||||
"k8s.io/klog/v2"
|
||||
"k8s.io/kubernetes/pkg/features"
|
||||
"k8s.io/kubernetes/pkg/proxy"
|
||||
"k8s.io/kubernetes/pkg/proxy/metrics"
|
||||
|
||||
@ -3056,3 +3059,219 @@ func TestProxierMetricsIptablesTotalRules(t *testing.T) {
|
||||
}
|
||||
|
||||
// TODO(thockin): add *more* tests for syncProxyRules() or break it down further and test the pieces.
|
||||
|
||||
// This test ensures that the iptables proxier supports translating Endpoints to
|
||||
// iptables output when internalTrafficPolicy is specified
|
||||
func TestInternalTrafficPolicyE2E(t *testing.T) {
|
||||
type endpoint struct {
|
||||
ip string
|
||||
hostname string
|
||||
}
|
||||
|
||||
cluster := v1.ServiceInternalTrafficPolicyCluster
|
||||
local := v1.ServiceInternalTrafficPolicyLocal
|
||||
|
||||
clusterExpectedIPTables := `*filter
|
||||
:KUBE-SERVICES - [0:0]
|
||||
:KUBE-EXTERNAL-SERVICES - [0:0]
|
||||
:KUBE-FORWARD - [0:0]
|
||||
:KUBE-NODEPORTS - [0:0]
|
||||
-A KUBE-FORWARD -m conntrack --ctstate INVALID -j DROP
|
||||
-A KUBE-FORWARD -m comment --comment "kubernetes forwarding rules" -m mark --mark 0x4000/0x4000 -j ACCEPT
|
||||
-A KUBE-FORWARD -m comment --comment "kubernetes forwarding conntrack pod source rule" -m conntrack --ctstate RELATED,ESTABLISHED -j ACCEPT
|
||||
-A KUBE-FORWARD -m comment --comment "kubernetes forwarding conntrack pod destination rule" -m conntrack --ctstate RELATED,ESTABLISHED -j ACCEPT
|
||||
COMMIT
|
||||
*nat
|
||||
:KUBE-SERVICES - [0:0]
|
||||
:KUBE-NODEPORTS - [0:0]
|
||||
:KUBE-POSTROUTING - [0:0]
|
||||
:KUBE-MARK-MASQ - [0:0]
|
||||
:KUBE-SVC-AQI2S6QIMU7PVVRP - [0:0]
|
||||
:KUBE-SEP-3JOIVZTXZZRGORX4 - [0:0]
|
||||
:KUBE-SEP-IO5XOSKPAXIFQXAJ - [0:0]
|
||||
:KUBE-SEP-XGJFVO3L2O5SRFNT - [0:0]
|
||||
-A KUBE-POSTROUTING -m mark ! --mark 0x4000/0x4000 -j RETURN
|
||||
-A KUBE-POSTROUTING -j MARK --xor-mark 0x4000
|
||||
-A KUBE-POSTROUTING -m comment --comment "kubernetes service traffic requiring SNAT" -j MASQUERADE
|
||||
-A KUBE-MARK-MASQ -j MARK --or-mark 0x4000
|
||||
-A KUBE-SERVICES -m comment --comment "ns1/svc1 cluster IP" -m tcp -p tcp -d 172.20.1.1/32 --dport 80 ! -s 10.0.0.0/24 -j KUBE-MARK-MASQ
|
||||
-A KUBE-SERVICES -m comment --comment "ns1/svc1 cluster IP" -m tcp -p tcp -d 172.20.1.1/32 --dport 80 -j KUBE-SVC-AQI2S6QIMU7PVVRP
|
||||
-A KUBE-SVC-AQI2S6QIMU7PVVRP -m comment --comment ns1/svc1 -m statistic --mode random --probability 0.3333333333 -j KUBE-SEP-3JOIVZTXZZRGORX4
|
||||
-A KUBE-SEP-3JOIVZTXZZRGORX4 -m comment --comment ns1/svc1 -s 10.0.1.1/32 -j KUBE-MARK-MASQ
|
||||
-A KUBE-SEP-3JOIVZTXZZRGORX4 -m comment --comment ns1/svc1 -m tcp -p tcp -j DNAT --to-destination 10.0.1.1:80
|
||||
-A KUBE-SVC-AQI2S6QIMU7PVVRP -m comment --comment ns1/svc1 -m statistic --mode random --probability 0.5000000000 -j KUBE-SEP-IO5XOSKPAXIFQXAJ
|
||||
-A KUBE-SEP-IO5XOSKPAXIFQXAJ -m comment --comment ns1/svc1 -s 10.0.1.2/32 -j KUBE-MARK-MASQ
|
||||
-A KUBE-SEP-IO5XOSKPAXIFQXAJ -m comment --comment ns1/svc1 -m tcp -p tcp -j DNAT --to-destination 10.0.1.2:80
|
||||
-A KUBE-SVC-AQI2S6QIMU7PVVRP -m comment --comment ns1/svc1 -j KUBE-SEP-XGJFVO3L2O5SRFNT
|
||||
-A KUBE-SEP-XGJFVO3L2O5SRFNT -m comment --comment ns1/svc1 -s 10.0.1.3/32 -j KUBE-MARK-MASQ
|
||||
-A KUBE-SEP-XGJFVO3L2O5SRFNT -m comment --comment ns1/svc1 -m tcp -p tcp -j DNAT --to-destination 10.0.1.3:80
|
||||
-A KUBE-SERVICES -m comment --comment "kubernetes service nodeports; NOTE: this must be the last rule in this chain" -m addrtype --dst-type LOCAL -j KUBE-NODEPORTS
|
||||
COMMIT
|
||||
`
|
||||
|
||||
testCases := []struct {
|
||||
name string
|
||||
internalTrafficPolicy *v1.ServiceInternalTrafficPolicyType
|
||||
featureGateOn bool
|
||||
endpoints []endpoint
|
||||
expectEndpointRule bool
|
||||
expectedIPTablesWithSlice string
|
||||
}{
|
||||
{
|
||||
name: "internalTrafficPolicy is cluster",
|
||||
internalTrafficPolicy: &cluster,
|
||||
featureGateOn: true,
|
||||
endpoints: []endpoint{
|
||||
{"10.0.1.1", testHostname},
|
||||
{"10.0.1.2", "host1"},
|
||||
{"10.0.1.3", "host2"},
|
||||
},
|
||||
expectEndpointRule: true,
|
||||
expectedIPTablesWithSlice: clusterExpectedIPTables,
|
||||
},
|
||||
{
|
||||
name: "internalTrafficPolicy is local and there is non-zero local endpoints",
|
||||
internalTrafficPolicy: &local,
|
||||
featureGateOn: true,
|
||||
endpoints: []endpoint{
|
||||
{"10.0.1.1", testHostname},
|
||||
{"10.0.1.2", "host1"},
|
||||
{"10.0.1.3", "host2"},
|
||||
},
|
||||
expectEndpointRule: true,
|
||||
expectedIPTablesWithSlice: `*filter
|
||||
:KUBE-SERVICES - [0:0]
|
||||
:KUBE-EXTERNAL-SERVICES - [0:0]
|
||||
:KUBE-FORWARD - [0:0]
|
||||
:KUBE-NODEPORTS - [0:0]
|
||||
-A KUBE-FORWARD -m conntrack --ctstate INVALID -j DROP
|
||||
-A KUBE-FORWARD -m comment --comment "kubernetes forwarding rules" -m mark --mark 0x4000/0x4000 -j ACCEPT
|
||||
-A KUBE-FORWARD -m comment --comment "kubernetes forwarding conntrack pod source rule" -m conntrack --ctstate RELATED,ESTABLISHED -j ACCEPT
|
||||
-A KUBE-FORWARD -m comment --comment "kubernetes forwarding conntrack pod destination rule" -m conntrack --ctstate RELATED,ESTABLISHED -j ACCEPT
|
||||
COMMIT
|
||||
*nat
|
||||
:KUBE-SERVICES - [0:0]
|
||||
:KUBE-NODEPORTS - [0:0]
|
||||
:KUBE-POSTROUTING - [0:0]
|
||||
:KUBE-MARK-MASQ - [0:0]
|
||||
:KUBE-SVC-AQI2S6QIMU7PVVRP - [0:0]
|
||||
:KUBE-SEP-3JOIVZTXZZRGORX4 - [0:0]
|
||||
-A KUBE-POSTROUTING -m mark ! --mark 0x4000/0x4000 -j RETURN
|
||||
-A KUBE-POSTROUTING -j MARK --xor-mark 0x4000
|
||||
-A KUBE-POSTROUTING -m comment --comment "kubernetes service traffic requiring SNAT" -j MASQUERADE
|
||||
-A KUBE-MARK-MASQ -j MARK --or-mark 0x4000
|
||||
-A KUBE-SERVICES -m comment --comment "ns1/svc1 cluster IP" -m tcp -p tcp -d 172.20.1.1/32 --dport 80 ! -s 10.0.0.0/24 -j KUBE-MARK-MASQ
|
||||
-A KUBE-SERVICES -m comment --comment "ns1/svc1 cluster IP" -m tcp -p tcp -d 172.20.1.1/32 --dport 80 -j KUBE-SVC-AQI2S6QIMU7PVVRP
|
||||
-A KUBE-SVC-AQI2S6QIMU7PVVRP -m comment --comment ns1/svc1 -j KUBE-SEP-3JOIVZTXZZRGORX4
|
||||
-A KUBE-SEP-3JOIVZTXZZRGORX4 -m comment --comment ns1/svc1 -s 10.0.1.1/32 -j KUBE-MARK-MASQ
|
||||
-A KUBE-SEP-3JOIVZTXZZRGORX4 -m comment --comment ns1/svc1 -m tcp -p tcp -j DNAT --to-destination 10.0.1.1:80
|
||||
-A KUBE-SERVICES -m comment --comment "kubernetes service nodeports; NOTE: this must be the last rule in this chain" -m addrtype --dst-type LOCAL -j KUBE-NODEPORTS
|
||||
COMMIT
|
||||
`,
|
||||
},
|
||||
{
|
||||
name: "internalTrafficPolicy is local and there is zero local endpoint",
|
||||
internalTrafficPolicy: &local,
|
||||
featureGateOn: true,
|
||||
endpoints: []endpoint{
|
||||
{"10.0.1.1", "host0"},
|
||||
{"10.0.1.2", "host1"},
|
||||
{"10.0.1.3", "host2"},
|
||||
},
|
||||
expectEndpointRule: false,
|
||||
expectedIPTablesWithSlice: `*filter
|
||||
:KUBE-SERVICES - [0:0]
|
||||
:KUBE-EXTERNAL-SERVICES - [0:0]
|
||||
:KUBE-FORWARD - [0:0]
|
||||
:KUBE-NODEPORTS - [0:0]
|
||||
-A KUBE-SERVICES -m comment --comment "ns1/svc1 has no endpoints" -m tcp -p tcp -d 172.20.1.1/32 --dport 80 -j REJECT
|
||||
-A KUBE-FORWARD -m conntrack --ctstate INVALID -j DROP
|
||||
-A KUBE-FORWARD -m comment --comment "kubernetes forwarding rules" -m mark --mark 0x4000/0x4000 -j ACCEPT
|
||||
-A KUBE-FORWARD -m comment --comment "kubernetes forwarding conntrack pod source rule" -m conntrack --ctstate RELATED,ESTABLISHED -j ACCEPT
|
||||
-A KUBE-FORWARD -m comment --comment "kubernetes forwarding conntrack pod destination rule" -m conntrack --ctstate RELATED,ESTABLISHED -j ACCEPT
|
||||
COMMIT
|
||||
*nat
|
||||
:KUBE-SERVICES - [0:0]
|
||||
:KUBE-NODEPORTS - [0:0]
|
||||
:KUBE-POSTROUTING - [0:0]
|
||||
:KUBE-MARK-MASQ - [0:0]
|
||||
-A KUBE-POSTROUTING -m mark ! --mark 0x4000/0x4000 -j RETURN
|
||||
-A KUBE-POSTROUTING -j MARK --xor-mark 0x4000
|
||||
-A KUBE-POSTROUTING -m comment --comment "kubernetes service traffic requiring SNAT" -j MASQUERADE
|
||||
-A KUBE-MARK-MASQ -j MARK --or-mark 0x4000
|
||||
-A KUBE-SERVICES -m comment --comment "kubernetes service nodeports; NOTE: this must be the last rule in this chain" -m addrtype --dst-type LOCAL -j KUBE-NODEPORTS
|
||||
COMMIT
|
||||
`,
|
||||
},
|
||||
{
|
||||
name: "internalTrafficPolicy is local and there is non-zero local endpoint with feature gate off",
|
||||
internalTrafficPolicy: &local,
|
||||
featureGateOn: false,
|
||||
endpoints: []endpoint{
|
||||
{"10.0.1.1", testHostname},
|
||||
{"10.0.1.2", "host1"},
|
||||
{"10.0.1.3", "host2"},
|
||||
},
|
||||
expectEndpointRule: false,
|
||||
expectedIPTablesWithSlice: clusterExpectedIPTables,
|
||||
},
|
||||
}
|
||||
|
||||
for _, tc := range testCases {
|
||||
defer featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.ServiceInternalTrafficPolicy, tc.featureGateOn)()
|
||||
ipt := iptablestest.NewFake()
|
||||
fp := NewFakeProxier(ipt, true)
|
||||
fp.OnServiceSynced()
|
||||
fp.OnEndpointsSynced()
|
||||
fp.OnEndpointSlicesSynced()
|
||||
|
||||
serviceName := "svc1"
|
||||
namespaceName := "ns1"
|
||||
|
||||
svc := &v1.Service{
|
||||
ObjectMeta: metav1.ObjectMeta{Name: serviceName, Namespace: namespaceName},
|
||||
Spec: v1.ServiceSpec{
|
||||
ClusterIP: "172.20.1.1",
|
||||
Selector: map[string]string{"foo": "bar"},
|
||||
Ports: []v1.ServicePort{{Name: "", Port: 80, Protocol: v1.ProtocolTCP}},
|
||||
},
|
||||
}
|
||||
if tc.internalTrafficPolicy != nil {
|
||||
svc.Spec.InternalTrafficPolicy = tc.internalTrafficPolicy
|
||||
}
|
||||
|
||||
fp.OnServiceAdd(svc)
|
||||
|
||||
tcpProtocol := v1.ProtocolTCP
|
||||
endpointSlice := &discovery.EndpointSlice{
|
||||
ObjectMeta: metav1.ObjectMeta{
|
||||
Name: fmt.Sprintf("%s-1", serviceName),
|
||||
Namespace: namespaceName,
|
||||
Labels: map[string]string{discovery.LabelServiceName: serviceName},
|
||||
},
|
||||
Ports: []discovery.EndpointPort{{
|
||||
Name: utilpointer.StringPtr(""),
|
||||
Port: utilpointer.Int32Ptr(80),
|
||||
Protocol: &tcpProtocol,
|
||||
}},
|
||||
AddressType: discovery.AddressTypeIPv4,
|
||||
}
|
||||
for _, ep := range tc.endpoints {
|
||||
endpointSlice.Endpoints = append(endpointSlice.Endpoints, discovery.Endpoint{
|
||||
Addresses: []string{ep.ip},
|
||||
Conditions: discovery.EndpointConditions{Ready: utilpointer.BoolPtr(true)},
|
||||
Topology: map[string]string{"kubernetes.io/hostname": ep.hostname},
|
||||
})
|
||||
}
|
||||
|
||||
fp.OnEndpointSliceAdd(endpointSlice)
|
||||
fp.syncProxyRules()
|
||||
assert.Equal(t, tc.expectedIPTablesWithSlice, fp.iptablesData.String())
|
||||
|
||||
if tc.expectEndpointRule {
|
||||
fp.OnEndpointSliceDelete(endpointSlice)
|
||||
fp.syncProxyRules()
|
||||
assert.NotEqual(t, tc.expectedIPTablesWithSlice, fp.iptablesData.String())
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -1236,7 +1236,7 @@ func (proxier *Proxier) syncProxyRules() {
|
||||
activeBindAddrs[serv.Address.String()] = true
|
||||
// ExternalTrafficPolicy only works for NodePort and external LB traffic, does not affect ClusterIP
|
||||
// So we still need clusterIP rules in onlyNodeLocalEndpoints mode.
|
||||
if err := proxier.syncEndpoint(svcName, false, serv); err != nil {
|
||||
if err := proxier.syncEndpoint(svcName, false, svcInfo.NodeLocalInternal(), serv); err != nil {
|
||||
klog.Errorf("Failed to sync endpoint for service: %v, err: %v", serv, err)
|
||||
}
|
||||
} else {
|
||||
@ -1288,7 +1288,7 @@ func (proxier *Proxier) syncProxyRules() {
|
||||
SetType: utilipset.HashIPPort,
|
||||
}
|
||||
|
||||
if svcInfo.OnlyNodeLocalEndpoints() {
|
||||
if svcInfo.NodeLocalExternal() {
|
||||
if valid := proxier.ipsetList[kubeExternalIPLocalSet].validateEntry(entry); !valid {
|
||||
klog.Errorf("%s", fmt.Sprintf(EntryInvalidErr, entry, proxier.ipsetList[kubeExternalIPLocalSet].Name))
|
||||
continue
|
||||
@ -1318,8 +1318,9 @@ func (proxier *Proxier) syncProxyRules() {
|
||||
activeIPVSServices[serv.String()] = true
|
||||
activeBindAddrs[serv.Address.String()] = true
|
||||
|
||||
onlyNodeLocalEndpoints := svcInfo.OnlyNodeLocalEndpoints()
|
||||
if err := proxier.syncEndpoint(svcName, onlyNodeLocalEndpoints, serv); err != nil {
|
||||
onlyNodeLocalEndpoints := svcInfo.NodeLocalExternal()
|
||||
onlyNodeLocalEndpointsForInternal := svcInfo.NodeLocalInternal()
|
||||
if err := proxier.syncEndpoint(svcName, onlyNodeLocalEndpoints, onlyNodeLocalEndpointsForInternal, serv); err != nil {
|
||||
klog.Errorf("Failed to sync endpoint for service: %v, err: %v", serv, err)
|
||||
}
|
||||
} else {
|
||||
@ -1347,7 +1348,7 @@ func (proxier *Proxier) syncProxyRules() {
|
||||
}
|
||||
proxier.ipsetList[kubeLoadBalancerSet].activeEntries.Insert(entry.String())
|
||||
// insert loadbalancer entry to lbIngressLocalSet if service externaltrafficpolicy=local
|
||||
if svcInfo.OnlyNodeLocalEndpoints() {
|
||||
if svcInfo.NodeLocalExternal() {
|
||||
if valid := proxier.ipsetList[kubeLoadBalancerLocalSet].validateEntry(entry); !valid {
|
||||
klog.Errorf("%s", fmt.Sprintf(EntryInvalidErr, entry, proxier.ipsetList[kubeLoadBalancerLocalSet].Name))
|
||||
continue
|
||||
@ -1420,7 +1421,7 @@ func (proxier *Proxier) syncProxyRules() {
|
||||
if err := proxier.syncService(svcNameString, serv, true, bindedAddresses); err == nil {
|
||||
activeIPVSServices[serv.String()] = true
|
||||
activeBindAddrs[serv.Address.String()] = true
|
||||
if err := proxier.syncEndpoint(svcName, svcInfo.OnlyNodeLocalEndpoints(), serv); err != nil {
|
||||
if err := proxier.syncEndpoint(svcName, svcInfo.NodeLocalExternal(), svcInfo.NodeLocalInternal(), serv); err != nil {
|
||||
klog.Errorf("Failed to sync endpoint for service: %v, err: %v", serv, err)
|
||||
}
|
||||
} else {
|
||||
@ -1534,7 +1535,7 @@ func (proxier *Proxier) syncProxyRules() {
|
||||
}
|
||||
|
||||
// Add externaltrafficpolicy=local type nodeport entry
|
||||
if svcInfo.OnlyNodeLocalEndpoints() {
|
||||
if svcInfo.NodeLocalExternal() {
|
||||
var nodePortLocalSet *IPSet
|
||||
switch protocol {
|
||||
case utilipset.ProtocolTCP:
|
||||
@ -1579,7 +1580,7 @@ func (proxier *Proxier) syncProxyRules() {
|
||||
// There is no need to bind Node IP to dummy interface, so set parameter `bindAddr` to `false`.
|
||||
if err := proxier.syncService(svcNameString, serv, false, bindedAddresses); err == nil {
|
||||
activeIPVSServices[serv.String()] = true
|
||||
if err := proxier.syncEndpoint(svcName, svcInfo.OnlyNodeLocalEndpoints(), serv); err != nil {
|
||||
if err := proxier.syncEndpoint(svcName, svcInfo.NodeLocalExternal(), svcInfo.NodeLocalInternal(), serv); err != nil {
|
||||
klog.Errorf("Failed to sync endpoint for service: %v, err: %v", serv, err)
|
||||
}
|
||||
} else {
|
||||
@ -2030,7 +2031,7 @@ func (proxier *Proxier) syncService(svcName string, vs *utilipvs.VirtualServer,
|
||||
return nil
|
||||
}
|
||||
|
||||
func (proxier *Proxier) syncEndpoint(svcPortName proxy.ServicePortName, onlyNodeLocalEndpoints bool, vs *utilipvs.VirtualServer) error {
|
||||
func (proxier *Proxier) syncEndpoint(svcPortName proxy.ServicePortName, onlyNodeLocalEndpoints bool, onlyNodeLocalEndpointsForInternal bool, vs *utilipvs.VirtualServer) error {
|
||||
appliedVirtualServer, err := proxier.ipvs.GetVirtualServer(vs)
|
||||
if err != nil {
|
||||
klog.Errorf("Failed to get IPVS service, error: %v", err)
|
||||
@ -2065,6 +2066,14 @@ func (proxier *Proxier) syncEndpoint(svcPortName proxy.ServicePortName, onlyNode
|
||||
endpoints = proxy.FilterTopologyEndpoint(proxier.nodeLabels, proxier.serviceMap[svcPortName].TopologyKeys(), endpoints)
|
||||
}
|
||||
|
||||
// Service InternalTrafficPolicy is only enabled when all of the
|
||||
// following are true:
|
||||
// 1. InternalTrafficPolicy is PreferLocal or Local
|
||||
// 2. ServiceInternalTrafficPolicy feature gate is on
|
||||
if utilfeature.DefaultFeatureGate.Enabled(features.ServiceInternalTrafficPolicy) && onlyNodeLocalEndpointsForInternal {
|
||||
endpoints = proxy.FilterLocalEndpoint(proxier.serviceMap[svcPortName].InternalTrafficPolicy(), endpoints)
|
||||
}
|
||||
|
||||
for _, epInfo := range endpoints {
|
||||
if !epInfo.IsReady() {
|
||||
continue
|
||||
|
@ -33,6 +33,9 @@ import (
|
||||
"k8s.io/apimachinery/pkg/types"
|
||||
"k8s.io/apimachinery/pkg/util/intstr"
|
||||
"k8s.io/apimachinery/pkg/util/sets"
|
||||
utilfeature "k8s.io/apiserver/pkg/util/feature"
|
||||
featuregatetesting "k8s.io/component-base/featuregate/testing"
|
||||
"k8s.io/kubernetes/pkg/features"
|
||||
"k8s.io/kubernetes/pkg/proxy"
|
||||
"k8s.io/kubernetes/pkg/proxy/healthcheck"
|
||||
netlinktest "k8s.io/kubernetes/pkg/proxy/ipvs/testing"
|
||||
@ -4467,3 +4470,183 @@ func TestCreateAndLinkKubeChain(t *testing.T) {
|
||||
assert.Equal(t, expectedNATChains, fp.natChains.String())
|
||||
assert.Equal(t, expectedFilterChains, fp.filterChains.String())
|
||||
}
|
||||
|
||||
// This test ensures that the iptables proxier supports translating Endpoints to
|
||||
// iptables output when internalTrafficPolicy is specified
|
||||
func TestTestInternalTrafficPolicyE2E(t *testing.T) {
|
||||
type endpoint struct {
|
||||
ip string
|
||||
hostname string
|
||||
}
|
||||
|
||||
cluster := v1.ServiceInternalTrafficPolicyCluster
|
||||
local := v1.ServiceInternalTrafficPolicyLocal
|
||||
|
||||
testCases := []struct {
|
||||
name string
|
||||
internalTrafficPolicy *v1.ServiceInternalTrafficPolicyType
|
||||
endpoints []endpoint
|
||||
expectVirtualServer bool
|
||||
expectLocalEntries bool
|
||||
expectLocalRealServerNum int
|
||||
expectLocalRealServers []string
|
||||
}{
|
||||
{
|
||||
name: "internalTrafficPolicy is cluster with non-zero local endpoints",
|
||||
internalTrafficPolicy: &cluster,
|
||||
endpoints: []endpoint{
|
||||
{"10.0.1.1", testHostname},
|
||||
{"10.0.1.2", "host1"},
|
||||
{"10.0.1.3", "host2"},
|
||||
},
|
||||
expectVirtualServer: true,
|
||||
expectLocalEntries: true,
|
||||
expectLocalRealServerNum: 3,
|
||||
expectLocalRealServers: []string{
|
||||
"10.0.1.1:80",
|
||||
"10.0.1.2:80",
|
||||
"10.0.1.3:80",
|
||||
},
|
||||
},
|
||||
{
|
||||
name: "internalTrafficPolicy is cluster with zero local endpoints",
|
||||
internalTrafficPolicy: &cluster,
|
||||
endpoints: []endpoint{
|
||||
{"10.0.1.1", "host0"},
|
||||
{"10.0.1.2", "host1"},
|
||||
{"10.0.1.3", "host2"},
|
||||
},
|
||||
expectVirtualServer: false,
|
||||
expectLocalEntries: false,
|
||||
expectLocalRealServerNum: 3,
|
||||
expectLocalRealServers: []string{
|
||||
"10.0.1.1:80",
|
||||
"10.0.1.2:80",
|
||||
"10.0.1.3:80",
|
||||
},
|
||||
},
|
||||
{
|
||||
name: "internalTrafficPolicy is local with non-zero local endpoints",
|
||||
internalTrafficPolicy: &local,
|
||||
endpoints: []endpoint{
|
||||
{"10.0.1.1", testHostname},
|
||||
{"10.0.1.2", "host1"},
|
||||
{"10.0.1.3", "host2"},
|
||||
},
|
||||
expectVirtualServer: true,
|
||||
expectLocalEntries: true,
|
||||
expectLocalRealServerNum: 1,
|
||||
expectLocalRealServers: []string{
|
||||
"10.0.1.1:80",
|
||||
},
|
||||
},
|
||||
{
|
||||
name: "internalTrafficPolicy is local with zero local endpoints",
|
||||
internalTrafficPolicy: &local,
|
||||
endpoints: []endpoint{
|
||||
{"10.0.1.1", "host0"},
|
||||
{"10.0.1.2", "host1"},
|
||||
{"10.0.1.3", "host2"},
|
||||
},
|
||||
expectVirtualServer: false,
|
||||
expectLocalEntries: false,
|
||||
expectLocalRealServerNum: 0,
|
||||
expectLocalRealServers: []string{},
|
||||
},
|
||||
}
|
||||
for _, tc := range testCases {
|
||||
defer featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.ServiceInternalTrafficPolicy, true)()
|
||||
|
||||
ipt := iptablestest.NewFake()
|
||||
ipvs := ipvstest.NewFake()
|
||||
ipset := ipsettest.NewFake(testIPSetVersion)
|
||||
fp := NewFakeProxier(ipt, ipvs, ipset, nil, nil, true, v1.IPv4Protocol)
|
||||
fp.servicesSynced = true
|
||||
fp.endpointsSynced = true
|
||||
fp.endpointSlicesSynced = true
|
||||
|
||||
// Add initial service
|
||||
serviceName := "svc1"
|
||||
namespaceName := "ns1"
|
||||
|
||||
svc := &v1.Service{
|
||||
ObjectMeta: metav1.ObjectMeta{Name: serviceName, Namespace: namespaceName},
|
||||
Spec: v1.ServiceSpec{
|
||||
ClusterIP: "172.20.1.1",
|
||||
Selector: map[string]string{"foo": "bar"},
|
||||
Ports: []v1.ServicePort{{Name: "", TargetPort: intstr.FromInt(80), Protocol: v1.ProtocolTCP}},
|
||||
},
|
||||
}
|
||||
if tc.internalTrafficPolicy != nil {
|
||||
svc.Spec.InternalTrafficPolicy = tc.internalTrafficPolicy
|
||||
}
|
||||
|
||||
fp.OnServiceAdd(svc)
|
||||
|
||||
// Add initial endpoint slice
|
||||
tcpProtocol := v1.ProtocolTCP
|
||||
endpointSlice := &discovery.EndpointSlice{
|
||||
ObjectMeta: metav1.ObjectMeta{
|
||||
Name: fmt.Sprintf("%s-1", serviceName),
|
||||
Namespace: namespaceName,
|
||||
Labels: map[string]string{discovery.LabelServiceName: serviceName},
|
||||
},
|
||||
Ports: []discovery.EndpointPort{{
|
||||
Name: utilpointer.StringPtr(""),
|
||||
Port: utilpointer.Int32Ptr(80),
|
||||
Protocol: &tcpProtocol,
|
||||
}},
|
||||
AddressType: discovery.AddressTypeIPv4,
|
||||
}
|
||||
|
||||
for _, ep := range tc.endpoints {
|
||||
endpointSlice.Endpoints = append(endpointSlice.Endpoints, discovery.Endpoint{
|
||||
Addresses: []string{ep.ip},
|
||||
Conditions: discovery.EndpointConditions{Ready: utilpointer.BoolPtr(true)},
|
||||
Topology: map[string]string{"kubernetes.io/hostname": ep.hostname},
|
||||
})
|
||||
}
|
||||
|
||||
fp.OnEndpointSliceAdd(endpointSlice)
|
||||
fp.syncProxyRules()
|
||||
|
||||
// Ensure that Proxier updates ipvs appropriately after EndpointSlice update
|
||||
assert.NotNil(t, fp.ipsetList["KUBE-LOOP-BACK"])
|
||||
|
||||
activeEntries1 := fp.ipsetList["KUBE-LOOP-BACK"].activeEntries
|
||||
|
||||
if tc.expectLocalEntries {
|
||||
assert.Equal(t, 1, activeEntries1.Len(), "Expected 1 active entry in KUBE-LOOP-BACK")
|
||||
} else {
|
||||
assert.Equal(t, 0, activeEntries1.Len(), "Expected no active entry in KUBE-LOOP-BACK")
|
||||
}
|
||||
|
||||
if tc.expectVirtualServer {
|
||||
virtualServers1, vsErr1 := ipvs.GetVirtualServers()
|
||||
assert.Nil(t, vsErr1, "Expected no error getting virtual servers")
|
||||
|
||||
assert.Len(t, virtualServers1, 1, "Expected 1 virtual server")
|
||||
realServers1, rsErr1 := ipvs.GetRealServers(virtualServers1[0])
|
||||
assert.Nil(t, rsErr1, "Expected no error getting real servers")
|
||||
|
||||
assert.Len(t, realServers1, tc.expectLocalRealServerNum, fmt.Sprintf("Expected %d real servers", tc.expectLocalRealServerNum))
|
||||
for i := 0; i < tc.expectLocalRealServerNum; i++ {
|
||||
assert.Equal(t, realServers1[i].String(), tc.expectLocalRealServers[i])
|
||||
}
|
||||
}
|
||||
|
||||
fp.OnEndpointSliceDelete(endpointSlice)
|
||||
fp.syncProxyRules()
|
||||
|
||||
// Ensure that Proxier updates ipvs appropriately after EndpointSlice delete
|
||||
assert.NotNil(t, fp.ipsetList["KUBE-LOOP-BACK"])
|
||||
activeEntries3 := fp.ipsetList["KUBE-LOOP-BACK"].activeEntries
|
||||
assert.Equal(t, 0, activeEntries3.Len(), "Expected 0 active entries in KUBE-LOOP-BACK")
|
||||
virtualServers2, vsErr2 := ipvs.GetVirtualServers()
|
||||
assert.Nil(t, vsErr2, "Expected no error getting virtual servers")
|
||||
assert.Len(t, virtualServers2, 1, "Expected 1 virtual server")
|
||||
realServers2, rsErr2 := ipvs.GetRealServers(virtualServers2[0])
|
||||
assert.Nil(t, rsErr2, "Expected no error getting real servers")
|
||||
assert.Len(t, realServers2, 0, "Expected 0 real servers")
|
||||
}
|
||||
}
|
||||
|
@ -28,8 +28,10 @@ import (
|
||||
v1 "k8s.io/api/core/v1"
|
||||
"k8s.io/apimachinery/pkg/types"
|
||||
"k8s.io/apimachinery/pkg/util/sets"
|
||||
utilfeature "k8s.io/apiserver/pkg/util/feature"
|
||||
"k8s.io/client-go/tools/record"
|
||||
apiservice "k8s.io/kubernetes/pkg/api/v1/service"
|
||||
"k8s.io/kubernetes/pkg/features"
|
||||
"k8s.io/kubernetes/pkg/proxy/metrics"
|
||||
utilproxy "k8s.io/kubernetes/pkg/proxy/util"
|
||||
)
|
||||
@ -49,7 +51,9 @@ type BaseServiceInfo struct {
|
||||
externalIPs []string
|
||||
loadBalancerSourceRanges []string
|
||||
healthCheckNodePort int
|
||||
onlyNodeLocalEndpoints bool
|
||||
nodeLocalExternal bool
|
||||
nodeLocalInternal bool
|
||||
internalTrafficPolicy *v1.ServiceInternalTrafficPolicyType
|
||||
topologyKeys []string
|
||||
}
|
||||
|
||||
@ -114,9 +118,19 @@ func (info *BaseServiceInfo) LoadBalancerIPStrings() []string {
|
||||
return ips
|
||||
}
|
||||
|
||||
// OnlyNodeLocalEndpoints is part of ServicePort interface.
|
||||
func (info *BaseServiceInfo) OnlyNodeLocalEndpoints() bool {
|
||||
return info.onlyNodeLocalEndpoints
|
||||
// NodeLocalExternal is part of ServicePort interface.
|
||||
func (info *BaseServiceInfo) NodeLocalExternal() bool {
|
||||
return info.nodeLocalExternal
|
||||
}
|
||||
|
||||
// NodeLocalInternal is part of ServicePort interface
|
||||
func (info *BaseServiceInfo) NodeLocalInternal() bool {
|
||||
return info.nodeLocalInternal
|
||||
}
|
||||
|
||||
// InternalTrafficPolicy is part of ServicePort interface
|
||||
func (info *BaseServiceInfo) InternalTrafficPolicy() *v1.ServiceInternalTrafficPolicyType {
|
||||
return info.internalTrafficPolicy
|
||||
}
|
||||
|
||||
// TopologyKeys is part of ServicePort interface.
|
||||
@ -125,9 +139,13 @@ func (info *BaseServiceInfo) TopologyKeys() []string {
|
||||
}
|
||||
|
||||
func (sct *ServiceChangeTracker) newBaseServiceInfo(port *v1.ServicePort, service *v1.Service) *BaseServiceInfo {
|
||||
onlyNodeLocalEndpoints := false
|
||||
nodeLocalExternal := false
|
||||
if apiservice.RequestsOnlyLocalTraffic(service) {
|
||||
onlyNodeLocalEndpoints = true
|
||||
nodeLocalExternal = true
|
||||
}
|
||||
nodeLocalInternal := false
|
||||
if utilfeature.DefaultFeatureGate.Enabled(features.ServiceInternalTrafficPolicy) {
|
||||
nodeLocalInternal = apiservice.RequestsOnlyLocalTrafficForInternal(service)
|
||||
}
|
||||
var stickyMaxAgeSeconds int
|
||||
if service.Spec.SessionAffinity == v1.ServiceAffinityClientIP {
|
||||
@ -137,14 +155,16 @@ func (sct *ServiceChangeTracker) newBaseServiceInfo(port *v1.ServicePort, servic
|
||||
|
||||
clusterIP := utilproxy.GetClusterIPByFamily(sct.ipFamily, service)
|
||||
info := &BaseServiceInfo{
|
||||
clusterIP: net.ParseIP(clusterIP),
|
||||
port: int(port.Port),
|
||||
protocol: port.Protocol,
|
||||
nodePort: int(port.NodePort),
|
||||
sessionAffinityType: service.Spec.SessionAffinity,
|
||||
stickyMaxAgeSeconds: stickyMaxAgeSeconds,
|
||||
onlyNodeLocalEndpoints: onlyNodeLocalEndpoints,
|
||||
topologyKeys: service.Spec.TopologyKeys,
|
||||
clusterIP: net.ParseIP(clusterIP),
|
||||
port: int(port.Port),
|
||||
protocol: port.Protocol,
|
||||
nodePort: int(port.NodePort),
|
||||
sessionAffinityType: service.Spec.SessionAffinity,
|
||||
stickyMaxAgeSeconds: stickyMaxAgeSeconds,
|
||||
nodeLocalExternal: nodeLocalExternal,
|
||||
nodeLocalInternal: nodeLocalInternal,
|
||||
internalTrafficPolicy: service.Spec.InternalTrafficPolicy,
|
||||
topologyKeys: service.Spec.TopologyKeys,
|
||||
}
|
||||
|
||||
loadBalancerSourceRanges := make([]string, len(service.Spec.LoadBalancerSourceRanges))
|
||||
|
@ -18,6 +18,8 @@ package proxy
|
||||
|
||||
import (
|
||||
v1 "k8s.io/api/core/v1"
|
||||
utilfeature "k8s.io/apiserver/pkg/util/feature"
|
||||
"k8s.io/kubernetes/pkg/features"
|
||||
)
|
||||
|
||||
// FilterTopologyEndpoint returns the appropriate endpoints based on the cluster
|
||||
@ -44,7 +46,7 @@ func FilterTopologyEndpoint(nodeLabels map[string]string, topologyKeys []string,
|
||||
return endpoints
|
||||
}
|
||||
|
||||
filteredEndpoint := []Endpoint{}
|
||||
filteredEndpoints := []Endpoint{}
|
||||
|
||||
if len(nodeLabels) == 0 {
|
||||
if topologyKeys[len(topologyKeys)-1] == v1.TopologyKeyAny {
|
||||
@ -54,7 +56,7 @@ func FilterTopologyEndpoint(nodeLabels map[string]string, topologyKeys []string,
|
||||
}
|
||||
// edge case: do not include any endpoints if topology key "Any" is
|
||||
// not specified when we cannot determine current node's topology.
|
||||
return filteredEndpoint
|
||||
return filteredEndpoints
|
||||
}
|
||||
|
||||
for _, key := range topologyKeys {
|
||||
@ -69,12 +71,40 @@ func FilterTopologyEndpoint(nodeLabels map[string]string, topologyKeys []string,
|
||||
for _, ep := range endpoints {
|
||||
topology := ep.GetTopology()
|
||||
if value, found := topology[key]; found && value == topologyValue {
|
||||
filteredEndpoint = append(filteredEndpoint, ep)
|
||||
filteredEndpoints = append(filteredEndpoints, ep)
|
||||
}
|
||||
}
|
||||
if len(filteredEndpoint) > 0 {
|
||||
return filteredEndpoint
|
||||
if len(filteredEndpoints) > 0 {
|
||||
return filteredEndpoints
|
||||
}
|
||||
}
|
||||
return filteredEndpoint
|
||||
return filteredEndpoints
|
||||
}
|
||||
|
||||
// FilterLocalEndpoint returns the node local endpoints based on configured
|
||||
// InternalTrafficPolicy.
|
||||
//
|
||||
// If ServiceInternalTrafficPolicy feature gate is off, returns the original
|
||||
// endpoints slice.
|
||||
// Otherwise, if InternalTrafficPolicy is Local, only return the node local endpoints.
|
||||
func FilterLocalEndpoint(internalTrafficPolicy *v1.ServiceInternalTrafficPolicyType, endpoints []Endpoint) []Endpoint {
|
||||
if !utilfeature.DefaultFeatureGate.Enabled(features.ServiceInternalTrafficPolicy) {
|
||||
return endpoints
|
||||
}
|
||||
if internalTrafficPolicy == nil || *internalTrafficPolicy == v1.ServiceInternalTrafficPolicyCluster {
|
||||
return endpoints
|
||||
}
|
||||
|
||||
var filteredEndpoints []Endpoint
|
||||
|
||||
// Get all the local endpoints
|
||||
for _, ep := range endpoints {
|
||||
if ep.GetIsLocal() {
|
||||
filteredEndpoints = append(filteredEndpoints, ep)
|
||||
}
|
||||
}
|
||||
|
||||
// When internalTrafficPolicy is Local, only return the node local
|
||||
// endpoints
|
||||
return filteredEndpoints
|
||||
}
|
||||
|
@ -22,6 +22,9 @@ import (
|
||||
|
||||
v1 "k8s.io/api/core/v1"
|
||||
"k8s.io/apimachinery/pkg/types"
|
||||
utilfeature "k8s.io/apiserver/pkg/util/feature"
|
||||
featuregatetesting "k8s.io/component-base/featuregate/testing"
|
||||
"k8s.io/kubernetes/pkg/features"
|
||||
)
|
||||
|
||||
func TestFilterTopologyEndpoint(t *testing.T) {
|
||||
@ -476,3 +479,97 @@ func TestFilterTopologyEndpoint(t *testing.T) {
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
func TestFilterLocalEndpoint(t *testing.T) {
|
||||
cluster := v1.ServiceInternalTrafficPolicyCluster
|
||||
local := v1.ServiceInternalTrafficPolicyLocal
|
||||
|
||||
testCases := []struct {
|
||||
name string
|
||||
internalTrafficPolicy *v1.ServiceInternalTrafficPolicyType
|
||||
endpoints []Endpoint
|
||||
expected []Endpoint
|
||||
featureGateOn bool
|
||||
}{
|
||||
{
|
||||
name: "no internalTrafficPolicy with empty endpoints",
|
||||
internalTrafficPolicy: nil,
|
||||
endpoints: []Endpoint{},
|
||||
expected: []Endpoint{},
|
||||
featureGateOn: true,
|
||||
},
|
||||
{
|
||||
name: "no internalTrafficPolicy with non-empty endpoints",
|
||||
internalTrafficPolicy: nil,
|
||||
endpoints: []Endpoint{
|
||||
&BaseEndpointInfo{Endpoint: "10.0.0.0:80", IsLocal: true},
|
||||
&BaseEndpointInfo{Endpoint: "10.0.0.1:80", IsLocal: false},
|
||||
},
|
||||
expected: []Endpoint{
|
||||
&BaseEndpointInfo{Endpoint: "10.0.0.0:80", IsLocal: true},
|
||||
&BaseEndpointInfo{Endpoint: "10.0.0.1:80", IsLocal: false},
|
||||
},
|
||||
featureGateOn: true,
|
||||
},
|
||||
{
|
||||
name: "internalTrafficPolicy is cluster",
|
||||
internalTrafficPolicy: &cluster,
|
||||
endpoints: []Endpoint{
|
||||
&BaseEndpointInfo{Endpoint: "10.0.0.0:80", IsLocal: true},
|
||||
&BaseEndpointInfo{Endpoint: "10.0.0.1:80", IsLocal: false},
|
||||
},
|
||||
expected: []Endpoint{
|
||||
&BaseEndpointInfo{Endpoint: "10.0.0.0:80", IsLocal: true},
|
||||
&BaseEndpointInfo{Endpoint: "10.0.0.1:80", IsLocal: false},
|
||||
},
|
||||
featureGateOn: true,
|
||||
},
|
||||
{
|
||||
name: "internalTrafficPolicy is local with non-zero local endpoints",
|
||||
internalTrafficPolicy: &local,
|
||||
endpoints: []Endpoint{
|
||||
&BaseEndpointInfo{Endpoint: "10.0.0.0:80", IsLocal: true},
|
||||
&BaseEndpointInfo{Endpoint: "10.0.0.1:80", IsLocal: false},
|
||||
},
|
||||
expected: []Endpoint{
|
||||
&BaseEndpointInfo{Endpoint: "10.0.0.0:80", IsLocal: true},
|
||||
},
|
||||
featureGateOn: true,
|
||||
},
|
||||
{
|
||||
name: "internalTrafficPolicy is local with zero local endpoints",
|
||||
internalTrafficPolicy: &local,
|
||||
endpoints: []Endpoint{
|
||||
&BaseEndpointInfo{Endpoint: "10.0.0.0:80", IsLocal: false},
|
||||
&BaseEndpointInfo{Endpoint: "10.0.0.1:80", IsLocal: false},
|
||||
&BaseEndpointInfo{Endpoint: "10.0.0.2:80", IsLocal: false},
|
||||
},
|
||||
expected: nil,
|
||||
featureGateOn: true,
|
||||
},
|
||||
{
|
||||
name: "feature gate is off, internalTrafficPolicy is local with non-empty endpoints",
|
||||
internalTrafficPolicy: &local,
|
||||
endpoints: []Endpoint{
|
||||
&BaseEndpointInfo{Endpoint: "10.0.0.0:80", IsLocal: true},
|
||||
&BaseEndpointInfo{Endpoint: "10.0.0.1:80", IsLocal: false},
|
||||
&BaseEndpointInfo{Endpoint: "10.0.0.2:80", IsLocal: false},
|
||||
},
|
||||
expected: []Endpoint{
|
||||
&BaseEndpointInfo{Endpoint: "10.0.0.0:80", IsLocal: true},
|
||||
&BaseEndpointInfo{Endpoint: "10.0.0.1:80", IsLocal: false},
|
||||
&BaseEndpointInfo{Endpoint: "10.0.0.2:80", IsLocal: false},
|
||||
},
|
||||
featureGateOn: false,
|
||||
},
|
||||
}
|
||||
for _, tc := range testCases {
|
||||
defer featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.ServiceInternalTrafficPolicy, tc.featureGateOn)()
|
||||
t.Run(tc.name, func(t *testing.T) {
|
||||
filteredEndpoint := FilterLocalEndpoint(tc.internalTrafficPolicy, tc.endpoints)
|
||||
if !reflect.DeepEqual(filteredEndpoint, tc.expected) {
|
||||
t.Errorf("expected %v, got %v", tc.expected, filteredEndpoint)
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
||||
|
@ -83,8 +83,12 @@ type ServicePort interface {
|
||||
HealthCheckNodePort() int
|
||||
// GetNodePort returns a service Node port if present. If return 0, it means not present.
|
||||
NodePort() int
|
||||
// GetOnlyNodeLocalEndpoints returns if a service has only node local endpoints
|
||||
OnlyNodeLocalEndpoints() bool
|
||||
// NodeLocalExternal returns if a service has only node local endpoints for external traffic.
|
||||
NodeLocalExternal() bool
|
||||
// NodeLocalInternal returns if a service has only node local endpoints for internal traffic.
|
||||
NodeLocalInternal() bool
|
||||
// InternalTrafficPolicy returns service InternalTrafficPolicy
|
||||
InternalTrafficPolicy() *v1.ServiceInternalTrafficPolicyType
|
||||
// TopologyKeys returns service TopologyKeys as a string array.
|
||||
TopologyKeys() []string
|
||||
}
|
||||
|
@ -156,7 +156,7 @@ func dropServiceDisabledFields(newSvc *api.Service, oldSvc *api.Service) {
|
||||
newSvc.Spec.TopologyKeys = nil
|
||||
}
|
||||
|
||||
// Clear AllocateLoadBalancerNodePorts if ServiceLBNodePortControl if not enabled
|
||||
// Clear AllocateLoadBalancerNodePorts if ServiceLBNodePortControl is not enabled
|
||||
if !utilfeature.DefaultFeatureGate.Enabled(features.ServiceLBNodePortControl) {
|
||||
if !allocateLoadBalancerNodePortsInUse(oldSvc) {
|
||||
newSvc.Spec.AllocateLoadBalancerNodePorts = nil
|
||||
@ -180,6 +180,13 @@ func dropServiceDisabledFields(newSvc *api.Service, oldSvc *api.Service) {
|
||||
newSvc.Spec.LoadBalancerClass = nil
|
||||
}
|
||||
}
|
||||
|
||||
// Clear InternalTrafficPolicy if not enabled
|
||||
if !utilfeature.DefaultFeatureGate.Enabled(features.ServiceInternalTrafficPolicy) {
|
||||
if !serviceInternalTrafficPolicyInUse(oldSvc) {
|
||||
newSvc.Spec.InternalTrafficPolicy = nil
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// returns true if svc.Spec.AllocateLoadBalancerNodePorts field is in use
|
||||
@ -240,6 +247,13 @@ func loadBalancerClassInUse(svc *api.Service) bool {
|
||||
return svc.Spec.LoadBalancerClass != nil
|
||||
}
|
||||
|
||||
func serviceInternalTrafficPolicyInUse(svc *api.Service) bool {
|
||||
if svc == nil {
|
||||
return false
|
||||
}
|
||||
return svc.Spec.InternalTrafficPolicy != nil
|
||||
}
|
||||
|
||||
type serviceStatusStrategy struct {
|
||||
Strategy
|
||||
}
|
||||
|
@ -257,19 +257,30 @@ func makeServiceWithLoadBalancerClass(loadBalancerClass *string) *api.Service {
|
||||
}
|
||||
}
|
||||
|
||||
func makeServiceWithInternalTrafficPolicy(policy *api.ServiceInternalTrafficPolicyType) *api.Service {
|
||||
return &api.Service{
|
||||
Spec: api.ServiceSpec{
|
||||
InternalTrafficPolicy: policy,
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
func TestDropDisabledField(t *testing.T) {
|
||||
requireDualStack := api.IPFamilyPolicyRequireDualStack
|
||||
preferDualStack := api.IPFamilyPolicyPreferDualStack
|
||||
singleStack := api.IPFamilyPolicySingleStack
|
||||
|
||||
localInternalTrafficPolicy := api.ServiceInternalTrafficPolicyLocal
|
||||
|
||||
testCases := []struct {
|
||||
name string
|
||||
enableDualStack bool
|
||||
enableMixedProtocol bool
|
||||
enableLoadBalancerClass bool
|
||||
svc *api.Service
|
||||
oldSvc *api.Service
|
||||
compareSvc *api.Service
|
||||
name string
|
||||
enableDualStack bool
|
||||
enableMixedProtocol bool
|
||||
enableLoadBalancerClass bool
|
||||
enableInternalTrafficPolicy bool
|
||||
svc *api.Service
|
||||
oldSvc *api.Service
|
||||
compareSvc *api.Service
|
||||
}{
|
||||
{
|
||||
name: "not dual stack, field not used",
|
||||
@ -500,6 +511,28 @@ func TestDropDisabledField(t *testing.T) {
|
||||
oldSvc: makeServiceWithLoadBalancerClass(utilpointer.StringPtr("test.com/test")),
|
||||
compareSvc: makeServiceWithLoadBalancerClass(nil),
|
||||
},
|
||||
/* svc.spec.internalTrafficPolicy */
|
||||
{
|
||||
name: "internal traffic policy not enabled, field used in old, not used in new",
|
||||
enableInternalTrafficPolicy: false,
|
||||
svc: makeServiceWithInternalTrafficPolicy(nil),
|
||||
oldSvc: makeServiceWithInternalTrafficPolicy(&localInternalTrafficPolicy),
|
||||
compareSvc: makeServiceWithInternalTrafficPolicy(nil),
|
||||
},
|
||||
{
|
||||
name: "internal traffic policy not enabled, field not used in old, used in new",
|
||||
enableInternalTrafficPolicy: false,
|
||||
svc: makeServiceWithInternalTrafficPolicy(&localInternalTrafficPolicy),
|
||||
oldSvc: makeServiceWithInternalTrafficPolicy(nil),
|
||||
compareSvc: makeServiceWithInternalTrafficPolicy(nil),
|
||||
},
|
||||
{
|
||||
name: "internal traffic policy enabled, field not used in old, used in new",
|
||||
enableInternalTrafficPolicy: true,
|
||||
svc: makeServiceWithInternalTrafficPolicy(&localInternalTrafficPolicy),
|
||||
oldSvc: makeServiceWithInternalTrafficPolicy(nil),
|
||||
compareSvc: makeServiceWithInternalTrafficPolicy(&localInternalTrafficPolicy),
|
||||
},
|
||||
/* add more tests for other dropped fields as needed */
|
||||
}
|
||||
for _, tc := range testCases {
|
||||
@ -507,12 +540,13 @@ func TestDropDisabledField(t *testing.T) {
|
||||
defer featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.IPv6DualStack, tc.enableDualStack)()
|
||||
defer featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.MixedProtocolLBService, tc.enableMixedProtocol)()
|
||||
defer featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.ServiceLoadBalancerClass, tc.enableLoadBalancerClass)()
|
||||
defer featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.ServiceInternalTrafficPolicy, tc.enableInternalTrafficPolicy)()
|
||||
old := tc.oldSvc.DeepCopy()
|
||||
|
||||
// to test against user using IPFamily not set on cluster
|
||||
dropServiceDisabledFields(tc.svc, tc.oldSvc)
|
||||
|
||||
// old node should never be changed
|
||||
// old node should never be changed
|
||||
if !reflect.DeepEqual(tc.oldSvc, old) {
|
||||
t.Errorf("%v: old svc changed: %v", tc.name, diff.ObjectReflectDiff(tc.oldSvc, old))
|
||||
}
|
||||
|
1806
staging/src/k8s.io/api/core/v1/generated.pb.go
generated
1806
staging/src/k8s.io/api/core/v1/generated.pb.go
generated
File diff suppressed because it is too large
Load Diff
@ -5045,9 +5045,19 @@ message ServiceSpec {
|
||||
// implementation (e.g. cloud providers) should ignore Services that set this field.
|
||||
// This field can only be set when creating or updating a Service to type 'LoadBalancer'.
|
||||
// Once set, it can not be changed. This field will be wiped when a service is updated to a non 'LoadBalancer' type.
|
||||
// featureGate=LoadBalancerClass
|
||||
// +featureGate=LoadBalancerClass
|
||||
// +optional
|
||||
optional string loadBalancerClass = 21;
|
||||
|
||||
// InternalTrafficPolicy specifies if the cluster internal traffic
|
||||
// should be routed to all endpoints or node-local endpoints only.
|
||||
// "Cluster" routes internal traffic to a Service to all endpoints.
|
||||
// "Local" routes traffic to node-local endpoints only, traffic is
|
||||
// dropped if no node-local endpoints are ready.
|
||||
// The default value is "Cluster".
|
||||
// +featureGate=ServiceInternalTrafficPolicy
|
||||
// +optional
|
||||
optional string internalTrafficPolicy = 22;
|
||||
}
|
||||
|
||||
// ServiceStatus represents the current status of a service.
|
||||
|
@ -3940,6 +3940,19 @@ const (
|
||||
ServiceTypeExternalName ServiceType = "ExternalName"
|
||||
)
|
||||
|
||||
// ServiceInternalTrafficPolicyType describes the type of traffic routing for
|
||||
// internal traffic
|
||||
type ServiceInternalTrafficPolicyType string
|
||||
|
||||
const (
|
||||
// ServiceInternalTrafficPolicyCluster routes traffic to all endpoints
|
||||
ServiceInternalTrafficPolicyCluster ServiceInternalTrafficPolicyType = "Cluster"
|
||||
|
||||
// ServiceInternalTrafficPolicyLocal only routes to node-local
|
||||
// endpoints, otherwise drops the traffic
|
||||
ServiceInternalTrafficPolicyLocal ServiceInternalTrafficPolicyType = "Local"
|
||||
)
|
||||
|
||||
// Service External Traffic Policy Type string
|
||||
type ServiceExternalTrafficPolicyType string
|
||||
|
||||
@ -4272,9 +4285,19 @@ type ServiceSpec struct {
|
||||
// implementation (e.g. cloud providers) should ignore Services that set this field.
|
||||
// This field can only be set when creating or updating a Service to type 'LoadBalancer'.
|
||||
// Once set, it can not be changed. This field will be wiped when a service is updated to a non 'LoadBalancer' type.
|
||||
// featureGate=LoadBalancerClass
|
||||
// +featureGate=LoadBalancerClass
|
||||
// +optional
|
||||
LoadBalancerClass *string `json:"loadBalancerClass,omitempty" protobuf:"bytes,21,opt,name=loadBalancerClass"`
|
||||
|
||||
// InternalTrafficPolicy specifies if the cluster internal traffic
|
||||
// should be routed to all endpoints or node-local endpoints only.
|
||||
// "Cluster" routes internal traffic to a Service to all endpoints.
|
||||
// "Local" routes traffic to node-local endpoints only, traffic is
|
||||
// dropped if no node-local endpoints are ready.
|
||||
// The default value is "Cluster".
|
||||
// +featureGate=ServiceInternalTrafficPolicy
|
||||
// +optional
|
||||
InternalTrafficPolicy *ServiceInternalTrafficPolicyType `json:"internalTrafficPolicy,omitempty" protobuf:"bytes,22,opt,name=internalTrafficPolicy"`
|
||||
}
|
||||
|
||||
// ServicePort contains information on service's port.
|
||||
|
@ -2257,7 +2257,8 @@ var map_ServiceSpec = map[string]string{
|
||||
"ipFamilies": "IPFamilies is a list of IP families (e.g. IPv4, IPv6) assigned to this service, and is gated by the \"IPv6DualStack\" feature gate. This field is usually assigned automatically based on cluster configuration and the ipFamilyPolicy field. If this field is specified manually, the requested family is available in the cluster, and ipFamilyPolicy allows it, it will be used; otherwise creation of the service will fail. This field is conditionally mutable: it allows for adding or removing a secondary IP family, but it does not allow changing the primary IP family of the Service. Valid values are \"IPv4\" and \"IPv6\". This field only applies to Services of types ClusterIP, NodePort, and LoadBalancer, and does apply to \"headless\" services. This field will be wiped when updating a Service to type ExternalName.\n\nThis field may hold a maximum of two entries (dual-stack families, in either order). These families must correspond to the values of the clusterIPs field, if specified. Both clusterIPs and ipFamilies are governed by the ipFamilyPolicy field.",
|
||||
"ipFamilyPolicy": "IPFamilyPolicy represents the dual-stack-ness requested or required by this Service, and is gated by the \"IPv6DualStack\" feature gate. If there is no value provided, then this field will be set to SingleStack. Services can be \"SingleStack\" (a single IP family), \"PreferDualStack\" (two IP families on dual-stack configured clusters or a single IP family on single-stack clusters), or \"RequireDualStack\" (two IP families on dual-stack configured clusters, otherwise fail). The ipFamilies and clusterIPs fields depend on the value of this field. This field will be wiped when updating a service to type ExternalName.",
|
||||
"allocateLoadBalancerNodePorts": "allocateLoadBalancerNodePorts defines if NodePorts will be automatically allocated for services with type LoadBalancer. Default is \"true\". It may be set to \"false\" if the cluster load-balancer does not rely on NodePorts. allocateLoadBalancerNodePorts may only be set for services with type LoadBalancer and will be cleared if the type is changed to any other type. This field is alpha-level and is only honored by servers that enable the ServiceLBNodePortControl feature.",
|
||||
"loadBalancerClass": "loadBalancerClass is the class of the load balancer implementation this Service belongs to. If specified, the value of this field must be a label-style identifier, with an optional prefix, e.g. \"internal-vip\" or \"example.com/internal-vip\". Unprefixed names are reserved for end-users. This field can only be set when the Service type is 'LoadBalancer'. If not set, the default load balancer implementation is used, today this is typically done through the cloud provider integration, but should apply for any default implementation. If set, it is assumed that a load balancer implementation is watching for Services with a matching class. Any default load balancer implementation (e.g. cloud providers) should ignore Services that set this field. This field can only be set when creating or updating a Service to type 'LoadBalancer'. Once set, it can not be changed. This field will be wiped when a service is updated to a non 'LoadBalancer' type. featureGate=LoadBalancerClass",
|
||||
"loadBalancerClass": "loadBalancerClass is the class of the load balancer implementation this Service belongs to. If specified, the value of this field must be a label-style identifier, with an optional prefix, e.g. \"internal-vip\" or \"example.com/internal-vip\". Unprefixed names are reserved for end-users. This field can only be set when the Service type is 'LoadBalancer'. If not set, the default load balancer implementation is used, today this is typically done through the cloud provider integration, but should apply for any default implementation. If set, it is assumed that a load balancer implementation is watching for Services with a matching class. Any default load balancer implementation (e.g. cloud providers) should ignore Services that set this field. This field can only be set when creating or updating a Service to type 'LoadBalancer'. Once set, it can not be changed. This field will be wiped when a service is updated to a non 'LoadBalancer' type.",
|
||||
"internalTrafficPolicy": "InternalTrafficPolicy specifies if the cluster internal traffic should be routed to all endpoints or node-local endpoints only. \"Cluster\" routes internal traffic to a Service to all endpoints. \"Local\" routes traffic to node-local endpoints only, traffic is dropped if no node-local endpoints are ready. The default value is \"Cluster\".",
|
||||
}
|
||||
|
||||
func (ServiceSpec) SwaggerDoc() map[string]string {
|
||||
|
@ -5350,6 +5350,11 @@ func (in *ServiceSpec) DeepCopyInto(out *ServiceSpec) {
|
||||
*out = new(string)
|
||||
**out = **in
|
||||
}
|
||||
if in.InternalTrafficPolicy != nil {
|
||||
in, out := &in.InternalTrafficPolicy, &out.InternalTrafficPolicy
|
||||
*out = new(ServiceInternalTrafficPolicyType)
|
||||
**out = **in
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
|
@ -84,7 +84,8 @@
|
||||
],
|
||||
"ipFamilyPolicy": "9ȫŚ",
|
||||
"allocateLoadBalancerNodePorts": true,
|
||||
"loadBalancerClass": "31"
|
||||
"loadBalancerClass": "31",
|
||||
"internalTrafficPolicy": ""
|
||||
},
|
||||
"status": {
|
||||
"loadBalancer": {
|
||||
@ -94,8 +95,8 @@
|
||||
"hostname": "33",
|
||||
"ports": [
|
||||
{
|
||||
"port": -907310967,
|
||||
"protocol": "喂ƈ斎AO6",
|
||||
"port": 684408190,
|
||||
"protocol": "ƈ斎AO6ĴC浔Ű",
|
||||
"error": "34"
|
||||
}
|
||||
]
|
||||
@ -105,9 +106,9 @@
|
||||
"conditions": [
|
||||
{
|
||||
"type": "35",
|
||||
"status": "C",
|
||||
"observedGeneration": -2492120148461555858,
|
||||
"lastTransitionTime": "2392-12-09T15:37:55Z",
|
||||
"status": "ž(-譵",
|
||||
"observedGeneration": -8651056334266075769,
|
||||
"lastTransitionTime": "2404-01-10T11:35:42Z",
|
||||
"reason": "36",
|
||||
"message": "37"
|
||||
}
|
||||
|
Binary file not shown.
@ -39,6 +39,7 @@ spec:
|
||||
externalName: "29"
|
||||
externalTrafficPolicy: ƏS$+½H牗洝尿
|
||||
healthCheckNodePort: -1965738697
|
||||
internalTrafficPolicy: ""
|
||||
ipFamilies:
|
||||
- 斬³;Ơ歿:狞夌碕ʂɭîcP$Iņɖ
|
||||
ipFamilyPolicy: 9ȫŚ
|
||||
@ -65,11 +66,11 @@ spec:
|
||||
type: 鮽ort昍řČ扷5ƗǸƢ6/ʕVŚ(Ŀ
|
||||
status:
|
||||
conditions:
|
||||
- lastTransitionTime: "2392-12-09T15:37:55Z"
|
||||
- lastTransitionTime: "2404-01-10T11:35:42Z"
|
||||
message: "37"
|
||||
observedGeneration: -2492120148461555858
|
||||
observedGeneration: -8651056334266075769
|
||||
reason: "36"
|
||||
status: C
|
||||
status: ž(-譵
|
||||
type: "35"
|
||||
loadBalancer:
|
||||
ingress:
|
||||
@ -77,5 +78,5 @@ status:
|
||||
ip: "32"
|
||||
ports:
|
||||
- error: "34"
|
||||
port: -907310967
|
||||
protocol: 喂ƈ斎AO6
|
||||
port: 684408190
|
||||
protocol: ƈ斎AO6ĴC浔Ű
|
||||
|
@ -44,6 +44,7 @@ type ServiceSpecApplyConfiguration struct {
|
||||
IPFamilyPolicy *corev1.IPFamilyPolicyType `json:"ipFamilyPolicy,omitempty"`
|
||||
AllocateLoadBalancerNodePorts *bool `json:"allocateLoadBalancerNodePorts,omitempty"`
|
||||
LoadBalancerClass *string `json:"loadBalancerClass,omitempty"`
|
||||
InternalTrafficPolicy *corev1.ServiceInternalTrafficPolicyType `json:"internalTrafficPolicy,omitempty"`
|
||||
}
|
||||
|
||||
// ServiceSpecApplyConfiguration constructs an declarative configuration of the ServiceSpec type for use with
|
||||
@ -224,3 +225,11 @@ func (b *ServiceSpecApplyConfiguration) WithLoadBalancerClass(value string) *Ser
|
||||
b.LoadBalancerClass = &value
|
||||
return b
|
||||
}
|
||||
|
||||
// WithInternalTrafficPolicy sets the InternalTrafficPolicy field in the declarative configuration to the given value
|
||||
// and returns the receiver, so that objects can be built by chaining "With" function invocations.
|
||||
// If called multiple times, the InternalTrafficPolicy field is set to the value of the last call.
|
||||
func (b *ServiceSpecApplyConfiguration) WithInternalTrafficPolicy(value corev1.ServiceInternalTrafficPolicyType) *ServiceSpecApplyConfiguration {
|
||||
b.InternalTrafficPolicy = &value
|
||||
return b
|
||||
}
|
||||
|
Loading…
Reference in New Issue
Block a user