diff --git a/pkg/api/service/testing/make.go b/pkg/api/service/testing/make.go index 0af907e1846..15ab04aeb11 100644 --- a/pkg/api/service/testing/make.go +++ b/pkg/api/service/testing/make.go @@ -48,6 +48,8 @@ func MakeService(name string, tweaks ...Tweak) *api.Service { SetTypeClusterIP(svc) // Default to 1 port SetPorts(MakeServicePort("", 93, intstr.FromInt(76), api.ProtocolTCP))(svc) + // Default internalTrafficPolicy to "Cluster" + SetInternalTrafficPolicy(api.ServiceInternalTrafficPolicyCluster)(svc) for _, tweak := range tweaks { tweak(svc) @@ -151,3 +153,10 @@ func SetNodePorts(values ...int) Tweak { } } } + +// SetInternalTrafficPolicy sets the internalTrafficPolicy field for a Service. +func SetInternalTrafficPolicy(policy api.ServiceInternalTrafficPolicyType) Tweak { + return func(svc *api.Service) { + svc.Spec.InternalTrafficPolicy = &policy + } +} diff --git a/pkg/apis/core/fuzzer/fuzzer.go b/pkg/apis/core/fuzzer/fuzzer.go index fcb9d8a042a..f1ff79632b1 100644 --- a/pkg/apis/core/fuzzer/fuzzer.go +++ b/pkg/apis/core/fuzzer/fuzzer.go @@ -297,6 +297,10 @@ var Funcs = func(codecs runtimeserializer.CodecFactory) []interface{} { types := []core.ServiceExternalTrafficPolicyType{core.ServiceExternalTrafficPolicyTypeCluster, core.ServiceExternalTrafficPolicyTypeLocal} *p = types[c.Rand.Intn(len(types))] }, + func(p *core.ServiceInternalTrafficPolicyType, c fuzz.Continue) { + types := []core.ServiceInternalTrafficPolicyType{core.ServiceInternalTrafficPolicyCluster, core.ServiceInternalTrafficPolicyLocal} + *p = types[c.Rand.Intn(len(types))] + }, func(ct *core.Container, c fuzz.Continue) { c.FuzzNoCustom(ct) // fuzz self without calling this function again ct.TerminationMessagePath = "/" + ct.TerminationMessagePath // Must be non-empty diff --git a/pkg/apis/core/validation/validation_test.go b/pkg/apis/core/validation/validation_test.go index 2975b8ac712..99cb717afde 100644 --- a/pkg/apis/core/validation/validation_test.go +++ b/pkg/apis/core/validation/validation_test.go @@ -10333,6 +10333,7 @@ func TestValidatePodStatusUpdate(t *testing.T) { } func makeValidService() core.Service { + clusterInternalTrafficPolicy := core.ServiceInternalTrafficPolicyCluster return core.Service{ ObjectMeta: metav1.ObjectMeta{ Name: "valid", @@ -10342,10 +10343,11 @@ func makeValidService() core.Service { ResourceVersion: "1", }, Spec: core.ServiceSpec{ - Selector: map[string]string{"key": "val"}, - SessionAffinity: "None", - Type: core.ServiceTypeClusterIP, - Ports: []core.ServicePort{{Name: "p", Protocol: "TCP", Port: 8675, TargetPort: intstr.FromInt(8675)}}, + Selector: map[string]string{"key": "val"}, + SessionAffinity: "None", + Type: core.ServiceTypeClusterIP, + Ports: []core.ServicePort{{Name: "p", Protocol: "TCP", Port: 8675, TargetPort: intstr.FromInt(8675)}}, + InternalTrafficPolicy: &clusterInternalTrafficPolicy, }, } } @@ -11286,6 +11288,22 @@ func TestValidateServiceCreate(t *testing.T) { }, numErrs: 1, }, + { + name: "internalTrafficPolicy field set to Cluster", + tweakSvc: func(s *core.Service) { + cluster := core.ServiceInternalTrafficPolicyCluster + s.Spec.InternalTrafficPolicy = &cluster + }, + numErrs: 0, + }, + { + name: "internalTrafficPolicy field set to Local", + tweakSvc: func(s *core.Service) { + local := core.ServiceInternalTrafficPolicyLocal + s.Spec.InternalTrafficPolicy = &local + }, + numErrs: 0, + }, { name: "nagative healthCheckNodePort field", tweakSvc: func(s *core.Service) { @@ -14370,6 +14388,28 @@ func TestValidateServiceUpdate(t *testing.T) { }, numErrs: 2, }, + { + name: "update internalTrafficPolicy from Cluster to Local", + tweakSvc: func(oldSvc, newSvc *core.Service) { + cluster := core.ServiceInternalTrafficPolicyCluster + oldSvc.Spec.InternalTrafficPolicy = &cluster + + local := core.ServiceInternalTrafficPolicyLocal + newSvc.Spec.InternalTrafficPolicy = &local + }, + numErrs: 0, + }, + { + name: "update internalTrafficPolicy from Local to Cluster", + tweakSvc: func(oldSvc, newSvc *core.Service) { + local := core.ServiceInternalTrafficPolicyLocal + oldSvc.Spec.InternalTrafficPolicy = &local + + cluster := core.ServiceInternalTrafficPolicyCluster + newSvc.Spec.InternalTrafficPolicy = &cluster + }, + numErrs: 0, + }, } for _, tc := range testCases { diff --git a/pkg/features/kube_features.go b/pkg/features/kube_features.go index fb65ec4232a..ef683cda7ec 100644 --- a/pkg/features/kube_features.go +++ b/pkg/features/kube_features.go @@ -846,7 +846,7 @@ var defaultKubernetesFeatureGates = map[featuregate.Feature]featuregate.FeatureS ServiceLoadBalancerClass: {Default: true, PreRelease: featuregate.Beta}, LogarithmicScaleDown: {Default: false, PreRelease: featuregate.Alpha}, IngressClassNamespacedParams: {Default: true, PreRelease: featuregate.Beta}, - ServiceInternalTrafficPolicy: {Default: false, PreRelease: featuregate.Alpha}, + ServiceInternalTrafficPolicy: {Default: true, PreRelease: featuregate.Beta}, SuspendJob: {Default: true, PreRelease: featuregate.Beta}, KubeletPodResourcesGetAllocatable: {Default: false, PreRelease: featuregate.Alpha}, NamespaceDefaultLabelName: {Default: true, PreRelease: featuregate.GA, LockToDefault: true}, // remove in 1.24 diff --git a/pkg/proxy/ipvs/proxier_test.go b/pkg/proxy/ipvs/proxier_test.go index d9158c0bc51..36b80b6d7ab 100644 --- a/pkg/proxy/ipvs/proxier_test.go +++ b/pkg/proxy/ipvs/proxier_test.go @@ -4781,7 +4781,7 @@ func Test_EndpointSliceReadyAndTerminatingLocal(t *testing.T) { fp.endpointsSynced = true fp.endpointSlicesSynced = true - localInternalTrafficPolicy := v1.ServiceInternalTrafficPolicyLocal + clusterInternalTrafficPolicy := v1.ServiceInternalTrafficPolicyCluster serviceName := "svc1" // Add initial service @@ -4793,7 +4793,7 @@ func Test_EndpointSliceReadyAndTerminatingLocal(t *testing.T) { Selector: map[string]string{"foo": "bar"}, Type: v1.ServiceTypeNodePort, ExternalTrafficPolicy: v1.ServiceExternalTrafficPolicyTypeLocal, - InternalTrafficPolicy: &localInternalTrafficPolicy, + InternalTrafficPolicy: &clusterInternalTrafficPolicy, ExternalIPs: []string{ "1.2.3.4", }, @@ -4957,7 +4957,7 @@ func Test_EndpointSliceOnlyReadyAndTerminatingLocal(t *testing.T) { fp.endpointsSynced = true fp.endpointSlicesSynced = true - localInternalTrafficPolicy := v1.ServiceInternalTrafficPolicyLocal + clusterInternalTrafficPolicy := v1.ServiceInternalTrafficPolicyCluster // Add initial service serviceName := "svc1" @@ -4969,7 +4969,7 @@ func Test_EndpointSliceOnlyReadyAndTerminatingLocal(t *testing.T) { Selector: map[string]string{"foo": "bar"}, Type: v1.ServiceTypeNodePort, ExternalTrafficPolicy: v1.ServiceExternalTrafficPolicyTypeLocal, - InternalTrafficPolicy: &localInternalTrafficPolicy, + InternalTrafficPolicy: &clusterInternalTrafficPolicy, ExternalIPs: []string{ "1.2.3.4", }, @@ -5130,7 +5130,7 @@ func Test_EndpointSliceOnlyReadyAndTerminatingLocalWithFeatureGateDisabled(t *te fp.endpointsSynced = true fp.endpointSlicesSynced = true - localInternalTrafficPolicy := v1.ServiceInternalTrafficPolicyLocal + clusterInternalTrafficPolicy := v1.ServiceInternalTrafficPolicyCluster // Add initial service serviceName := "svc1" @@ -5142,7 +5142,7 @@ func Test_EndpointSliceOnlyReadyAndTerminatingLocalWithFeatureGateDisabled(t *te Selector: map[string]string{"foo": "bar"}, Type: v1.ServiceTypeNodePort, ExternalTrafficPolicy: v1.ServiceExternalTrafficPolicyTypeLocal, - InternalTrafficPolicy: &localInternalTrafficPolicy, + InternalTrafficPolicy: &clusterInternalTrafficPolicy, ExternalIPs: []string{ "1.2.3.4", }, diff --git a/pkg/registry/core/service/storage/rest_test.go b/pkg/registry/core/service/storage/rest_test.go index 233d87264ae..c472ee8c8a4 100644 --- a/pkg/registry/core/service/storage/rest_test.go +++ b/pkg/registry/core/service/storage/rest_test.go @@ -1451,6 +1451,70 @@ func TestServiceRegistryExternalTrafficGlobal(t *testing.T) { } } +// Validate the internalTrafficPolicy field when set to "Cluster" then updated to "Local" +func TestServiceRegistryInternalTrafficPolicyClusterThenLocal(t *testing.T) { + ctx := genericapirequest.NewDefaultContext() + storage, server := NewTestREST(t, []api.IPFamily{api.IPv4Protocol}) + defer server.Terminate(t) + svc := svctest.MakeService("internal-traffic-policy-cluster", + svctest.SetInternalTrafficPolicy(api.ServiceInternalTrafficPolicyCluster), + ) + obj, err := storage.Create(ctx, svc, rest.ValidateAllObjectFunc, &metav1.CreateOptions{}) + if obj == nil || err != nil { + t.Errorf("Unexpected failure creating service %v", err) + } + + createdSvc := obj.(*api.Service) + if *createdSvc.Spec.InternalTrafficPolicy != api.ServiceInternalTrafficPolicyCluster { + t.Errorf("Expecting internalTrafficPolicy field to have value Cluster, got: %s", *createdSvc.Spec.InternalTrafficPolicy) + } + + update := createdSvc.DeepCopy() + local := api.ServiceInternalTrafficPolicyLocal + update.Spec.InternalTrafficPolicy = &local + + updatedSvc, _, errUpdate := storage.Update(ctx, update.Name, rest.DefaultUpdatedObjectInfo(update), rest.ValidateAllObjectFunc, rest.ValidateAllObjectUpdateFunc, false, &metav1.UpdateOptions{}) + if errUpdate != nil { + t.Fatalf("unexpected error during update %v", errUpdate) + } + updatedService := updatedSvc.(*api.Service) + if *updatedService.Spec.InternalTrafficPolicy != api.ServiceInternalTrafficPolicyLocal { + t.Errorf("Expected internalTrafficPolicy to be Local, got: %s", *updatedService.Spec.InternalTrafficPolicy) + } +} + +// Validate the internalTrafficPolicy field when set to "Local" and then updated to "Cluster" +func TestServiceRegistryInternalTrafficPolicyLocalThenCluster(t *testing.T) { + ctx := genericapirequest.NewDefaultContext() + storage, server := NewTestREST(t, []api.IPFamily{api.IPv4Protocol}) + defer server.Terminate(t) + svc := svctest.MakeService("internal-traffic-policy-cluster", + svctest.SetInternalTrafficPolicy(api.ServiceInternalTrafficPolicyLocal), + ) + obj, err := storage.Create(ctx, svc, rest.ValidateAllObjectFunc, &metav1.CreateOptions{}) + if obj == nil || err != nil { + t.Errorf("Unexpected failure creating service %v", err) + } + + createdSvc := obj.(*api.Service) + if *createdSvc.Spec.InternalTrafficPolicy != api.ServiceInternalTrafficPolicyLocal { + t.Errorf("Expecting internalTrafficPolicy field to have value Local, got: %s", *createdSvc.Spec.InternalTrafficPolicy) + } + + update := createdSvc.DeepCopy() + cluster := api.ServiceInternalTrafficPolicyCluster + update.Spec.InternalTrafficPolicy = &cluster + + updatedSvc, _, errUpdate := storage.Update(ctx, update.Name, rest.DefaultUpdatedObjectInfo(update), rest.ValidateAllObjectFunc, rest.ValidateAllObjectUpdateFunc, false, &metav1.UpdateOptions{}) + if errUpdate != nil { + t.Fatalf("unexpected error during update %v", errUpdate) + } + updatedService := updatedSvc.(*api.Service) + if *updatedService.Spec.InternalTrafficPolicy != api.ServiceInternalTrafficPolicyCluster { + t.Errorf("Expected internalTrafficPolicy to be Cluster, got: %s", *updatedService.Spec.InternalTrafficPolicy) + } +} + func TestInitClusterIP(t *testing.T) { testCases := []struct { name string diff --git a/pkg/registry/core/service/storage/storage_test.go b/pkg/registry/core/service/storage/storage_test.go index 3c5fc8040bf..9aab79788a5 100644 --- a/pkg/registry/core/service/storage/storage_test.go +++ b/pkg/registry/core/service/storage/storage_test.go @@ -54,6 +54,7 @@ func newStorage(t *testing.T) (*GenericREST, *StatusREST, *etcd3testing.EtcdTest func validService() *api.Service { singleStack := api.IPFamilyPolicySingleStack + clusterInternalTrafficPolicy := api.ServiceInternalTrafficPolicyCluster return &api.Service{ ObjectMeta: metav1.ObjectMeta{ @@ -73,6 +74,7 @@ func validService() *api.Service { Protocol: api.ProtocolTCP, TargetPort: intstr.FromInt(6502), }}, + InternalTrafficPolicy: &clusterInternalTrafficPolicy, }, } } @@ -109,6 +111,8 @@ func TestCreate(t *testing.T) { } func TestUpdate(t *testing.T) { + clusterInternalTrafficPolicy := api.ServiceInternalTrafficPolicyCluster + storage, _, server := newStorage(t) defer server.Terminate(t) defer storage.Store.DestroyFunc() @@ -130,6 +134,7 @@ func TestUpdate(t *testing.T) { Protocol: api.ProtocolTCP, TargetPort: intstr.FromInt(6502), }}, + InternalTrafficPolicy: &clusterInternalTrafficPolicy, } return object }, diff --git a/pkg/registry/core/service/strategy_test.go b/pkg/registry/core/service/strategy_test.go index 84c8ba2668b..91c4b6e50ed 100644 --- a/pkg/registry/core/service/strategy_test.go +++ b/pkg/registry/core/service/strategy_test.go @@ -67,6 +67,7 @@ func TestCheckGeneratedNameError(t *testing.T) { func makeValidService() *api.Service { preferDual := api.IPFamilyPolicyPreferDualStack + clusterInternalTrafficPolicy := api.ServiceInternalTrafficPolicyCluster return &api.Service{ ObjectMeta: metav1.ObjectMeta{ @@ -84,10 +85,11 @@ func makeValidService() *api.Service { makeValidServicePort("p", "TCP", 8675), makeValidServicePort("q", "TCP", 309), }, - ClusterIP: "1.2.3.4", - ClusterIPs: []string{"1.2.3.4", "5:6:7::8"}, - IPFamilyPolicy: &preferDual, - IPFamilies: []api.IPFamily{"IPv4", "IPv6"}, + ClusterIP: "1.2.3.4", + ClusterIPs: []string{"1.2.3.4", "5:6:7::8"}, + IPFamilyPolicy: &preferDual, + IPFamilies: []api.IPFamily{"IPv4", "IPv6"}, + InternalTrafficPolicy: &clusterInternalTrafficPolicy, }, } }