Svc REST: Set Cluster IPs during dry-run Create

Dry-run should behave like a real API call and return valid results.
This commit is contained in:
Tim Hockin 2020-11-25 15:28:35 -08:00
parent 52856f3fbe
commit e338c9db4b
4 changed files with 149 additions and 146 deletions

View File

@ -192,6 +192,13 @@ func SetAllocateLoadBalancerNodePorts(val bool) Tweak {
}
}
// SetUniqueNodePorts sets all nodeports to unique values.
func SetUniqueNodePorts(svc *api.Service) {
for i := range svc.Spec.Ports {
svc.Spec.Ports[i].NodePort = int32(30000 + i)
}
}
// SetHealthCheckNodePort sets the healthCheckNodePort field for a Service.
func SetHealthCheckNodePort(value int32) Tweak {
return func(svc *api.Service) {

View File

@ -190,15 +190,14 @@ func (al *RESTAllocStuff) allocateCreate(service *api.Service, dryRun bool) (tra
}
// Allocate ClusterIPs
//FIXME: we need to put values in, even if dry run - else validation should
//not pass. It does but that should be fixed.
if !dryRun {
if txn, err := al.allocServiceClusterIPsNew(service); err != nil {
result.Revert()
return nil, err
} else {
result = append(result, txn)
}
//TODO(thockin): validation should not pass with empty clusterIP, but it
//does (and is tested!). Fixing that all is a big PR and will have to
//happen later.
if txn, err := al.allocServiceClusterIPsNew(service, dryRun); err != nil {
result.Revert()
return nil, err
} else {
result = append(result, txn)
}
// Allocate ports
@ -378,6 +377,7 @@ func (rs *REST) Update(ctx context.Context, name string, objInfo rest.UpdatedObj
// on failure: any ip that should be released, will *not* be released
// on success: any ip that should be released, will be released
defer func() {
//FIXME: plumb dryRun down here
// release the allocated, this is expected to be cleared if the entire function ran to success
if allocated_released, err := rs.alloc.releaseClusterIPs(allocated); err != nil {
klog.V(4).Infof("service %v/%v failed to clean up after failed service update error:%v. Allocated/Released:%v/%v", service.Namespace, service.Name, err, allocated, allocated_released)
@ -385,6 +385,7 @@ func (rs *REST) Update(ctx context.Context, name string, objInfo rest.UpdatedObj
}
// performRelease is set when the enture function ran to success
if performRelease {
//FIXME: plumb dryRun down here
if toReleaseIPs_released, err := rs.alloc.releaseClusterIPs(toReleaseIPs); err != nil {
klog.V(4).Infof("service %v/%v failed to clean up after failed service update error:%v. ShouldRelease/Released:%v/%v", service.Namespace, service.Name, err, toReleaseIPs, toReleaseIPs_released)
}
@ -527,11 +528,14 @@ func (r *REST) ConvertToTable(ctx context.Context, object runtime.Object, tableO
return r.services.ConvertToTable(ctx, object, tableOptions)
}
func (al *RESTAllocStuff) allocClusterIPs(service *api.Service, toAlloc map[api.IPFamily]string) (map[api.IPFamily]string, error) {
func (al *RESTAllocStuff) allocClusterIPs(service *api.Service, toAlloc map[api.IPFamily]string, dryRun bool) (map[api.IPFamily]string, error) {
allocated := make(map[api.IPFamily]string)
for family, ip := range toAlloc {
allocator := al.serviceIPAllocatorsByFamily[family] // should always be there, as we pre validate
if dryRun {
allocator = allocator.DryRun()
}
if ip == "" {
allocatedIP, err := allocator.AllocateNext()
if err != nil {
@ -577,13 +581,13 @@ func (al *RESTAllocStuff) releaseClusterIPs(toRelease map[api.IPFamily]string) (
// standard allocator for dualstackgate==Off, hard wired dependency
// and ignores policy, families and clusterIPs
func (al *RESTAllocStuff) allocServiceClusterIP(service *api.Service) (map[api.IPFamily]string, error) {
func (al *RESTAllocStuff) allocServiceClusterIP(service *api.Service, dryRun bool) (map[api.IPFamily]string, error) {
toAlloc := make(map[api.IPFamily]string)
// get clusterIP.. empty string if user did not specify an ip
toAlloc[al.defaultServiceIPFamily] = service.Spec.ClusterIP
// alloc
allocated, err := al.allocClusterIPs(service, toAlloc)
allocated, err := al.allocClusterIPs(service, toAlloc, dryRun)
// set
if err == nil {
@ -595,16 +599,19 @@ func (al *RESTAllocStuff) allocServiceClusterIP(service *api.Service) (map[api.I
}
//FIXME: merge into allocServiceClusterIPs rather than call it
func (al *RESTAllocStuff) allocServiceClusterIPsNew(service *api.Service) (transaction, error) {
func (al *RESTAllocStuff) allocServiceClusterIPsNew(service *api.Service, dryRun bool) (transaction, error) {
// clusterIPs that were allocated may need to be released in case of
// failure at a higher level.
toReleaseClusterIPs, err := al.allocServiceClusterIPs(service)
toReleaseClusterIPs, err := al.allocServiceClusterIPs(service, dryRun)
if err != nil {
return nil, err
}
txn := callbackTransaction{
revert: func() {
if dryRun {
return
}
released, err := al.releaseClusterIPs(toReleaseClusterIPs)
if err != nil {
klog.Warningf("failed to release clusterIPs for failed new service:%v allocated:%v released:%v error:%v",
@ -616,7 +623,7 @@ func (al *RESTAllocStuff) allocServiceClusterIPsNew(service *api.Service) (trans
}
// allocates ClusterIPs for a service
func (al *RESTAllocStuff) allocServiceClusterIPs(service *api.Service) (map[api.IPFamily]string, error) {
func (al *RESTAllocStuff) allocServiceClusterIPs(service *api.Service, dryRun bool) (map[api.IPFamily]string, error) {
// external name don't get ClusterIPs
if service.Spec.Type == api.ServiceTypeExternalName {
return nil, nil
@ -628,7 +635,7 @@ func (al *RESTAllocStuff) allocServiceClusterIPs(service *api.Service) (map[api.
}
if !utilfeature.DefaultFeatureGate.Enabled(features.IPv6DualStack) {
return al.allocServiceClusterIP(service)
return al.allocServiceClusterIP(service, dryRun)
}
toAlloc := make(map[api.IPFamily]string)
@ -651,7 +658,7 @@ func (al *RESTAllocStuff) allocServiceClusterIPs(service *api.Service) (map[api.
}
// allocate
allocated, err := al.allocClusterIPs(service, toAlloc)
allocated, err := al.allocClusterIPs(service, toAlloc, dryRun)
// set if successful
if err == nil {
@ -702,7 +709,8 @@ func (al *RESTAllocStuff) handleClusterIPsForUpdatedService(oldService *api.Serv
// CASE A:
// Update service from ExternalName to non-ExternalName, should initialize ClusterIP.
if oldService.Spec.Type == api.ServiceTypeExternalName && service.Spec.Type != api.ServiceTypeExternalName {
allocated, err := al.allocServiceClusterIPs(service)
//FIXME: plumb dryRun down here
allocated, err := al.allocServiceClusterIPs(service, false)
return allocated, nil, err
}
@ -748,7 +756,7 @@ func (al *RESTAllocStuff) handleClusterIPsForUpdatedService(oldService *api.Serv
toAllocate[service.Spec.IPFamilies[1]] = service.Spec.ClusterIPs[1]
// allocate
allocated, err := al.allocClusterIPs(service, toAllocate)
allocated, err := al.allocClusterIPs(service, toAllocate, false) //FIXME: plumb dry-run down here
// set if successful
if err == nil {
service.Spec.ClusterIPs[1] = allocated[service.Spec.IPFamilies[1]]

View File

@ -1062,7 +1062,7 @@ func TestAllocateLoadBalancerNodePorts(t *testing.T) {
name: "allocate false, gate on, port specified",
svc: svctest.MakeService("alloc-false-specific",
svctest.SetTypeLoadBalancer,
svctest.SetNodePorts(30000),
svctest.SetUniqueNodePorts,
svctest.SetAllocateLoadBalancerNodePorts(false)),
expectNodePorts: true,
allocateNodePortGate: true,
@ -1070,7 +1070,7 @@ func TestAllocateLoadBalancerNodePorts(t *testing.T) {
name: "allocate true, gate on, port specified",
svc: svctest.MakeService("alloc-true-specific",
svctest.SetTypeLoadBalancer,
svctest.SetNodePorts(30000),
svctest.SetUniqueNodePorts,
svctest.SetAllocateLoadBalancerNodePorts(true)),
expectNodePorts: true,
allocateNodePortGate: true,

View File

@ -28,6 +28,7 @@ import (
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apimachinery/pkg/util/intstr"
machineryutilnet "k8s.io/apimachinery/pkg/util/net"
genericapirequest "k8s.io/apiserver/pkg/endpoints/request"
"k8s.io/apiserver/pkg/registry/generic"
genericregistrytest "k8s.io/apiserver/pkg/registry/generic/testing"
@ -39,6 +40,7 @@ import (
api "k8s.io/kubernetes/pkg/apis/core"
"k8s.io/kubernetes/pkg/features"
"k8s.io/kubernetes/pkg/registry/core/service/ipallocator"
"k8s.io/kubernetes/pkg/registry/core/service/portallocator"
"k8s.io/kubernetes/pkg/registry/registrytest"
netutils "k8s.io/utils/net"
)
@ -51,6 +53,14 @@ func makeIPAllocator(cidr *net.IPNet) ipallocator.Interface {
return al
}
func makePortAllocator(ports machineryutilnet.PortRange) portallocator.Interface {
al, err := portallocator.NewInMemory(ports)
if err != nil {
panic(fmt.Sprintf("error creating port allocator: %v", err))
}
return al
}
func newStorage(t *testing.T, ipFamilies []api.IPFamily) (*GenericREST, *StatusREST, *etcd3testing.EtcdTestServer) {
etcdStorage, server := registrytest.NewEtcdStorage(t, "")
restOptions := generic.RESTOptions{
@ -74,7 +84,9 @@ func newStorage(t *testing.T, ipFamilies []api.IPFamily) (*GenericREST, *StatusR
}
}
serviceStorage, statusStorage, err := NewGenericREST(restOptions, ipFamilies[0], ipAllocs, nil)
portAlloc := makePortAllocator(*(machineryutilnet.ParsePortRangeOrDie("30000-32767")))
serviceStorage, statusStorage, err := NewGenericREST(restOptions, ipFamilies[0], ipAllocs, portAlloc)
if err != nil {
t.Fatalf("unexpected error from REST storage: %v", err)
}
@ -5051,127 +5063,103 @@ func TestCreateInitIPFields(t *testing.T) {
}
}
// func TestServiceRegistryCreateDryRun(t *testing.T) {
// requireDualStack := api.IPFamilyPolicyRequireDualStack
// testCases := []struct {
// name string
// svc *api.Service
// enableDualStack bool
// }{
// {
// name: "v4 service featuregate off",
// enableDualStack: false,
// svc: &api.Service{
// ObjectMeta: metav1.ObjectMeta{Name: "foo"},
// Spec: api.ServiceSpec{
// Selector: map[string]string{"bar": "baz"},
// SessionAffinity: api.ServiceAffinityNone,
// Type: api.ServiceTypeClusterIP,
// ClusterIP: "1.2.3.4",
// ClusterIPs: []string{"1.2.3.4"},
// Ports: []api.ServicePort{{
// Port: 6502,
// Protocol: api.ProtocolTCP,
// TargetPort: intstr.FromInt(6502),
// }},
// },
// },
// },
// {
// name: "v6 service featuregate on but singlestack",
// enableDualStack: true,
// svc: &api.Service{
// ObjectMeta: metav1.ObjectMeta{Name: "foo"},
// Spec: api.ServiceSpec{
// Selector: map[string]string{"bar": "baz"},
// SessionAffinity: api.ServiceAffinityNone,
// Type: api.ServiceTypeClusterIP,
// IPFamilies: []api.IPFamily{api.IPv6Protocol},
// ClusterIP: "2000:0:0:0:0:0:0:1",
// ClusterIPs: []string{"2000:0:0:0:0:0:0:1"},
// Ports: []api.ServicePort{{
// Port: 6502,
// Protocol: api.ProtocolTCP,
// TargetPort: intstr.FromInt(6502),
// }},
// },
// },
// },
// {
// name: "dualstack v4,v6 service",
// enableDualStack: true,
// svc: &api.Service{
// ObjectMeta: metav1.ObjectMeta{Name: "foo"},
// Spec: api.ServiceSpec{
// Selector: map[string]string{"bar": "baz"},
// SessionAffinity: api.ServiceAffinityNone,
// Type: api.ServiceTypeClusterIP,
// IPFamilyPolicy: &requireDualStack,
// ClusterIP: "1.2.3.4",
// ClusterIPs: []string{"1.2.3.4", "2000:0:0:0:0:0:0:1"},
// IPFamilies: []api.IPFamily{api.IPv4Protocol, api.IPv6Protocol},
// Ports: []api.ServicePort{{
// Port: 6502,
// Protocol: api.ProtocolTCP,
// TargetPort: intstr.FromInt(6502),
// }},
// },
// },
// },
// {
// name: "dualstack v6,v4 service",
// enableDualStack: true,
// svc: &api.Service{
// ObjectMeta: metav1.ObjectMeta{Name: "foo"},
// Spec: api.ServiceSpec{
// Selector: map[string]string{"bar": "baz"},
// SessionAffinity: api.ServiceAffinityNone,
// Type: api.ServiceTypeClusterIP,
// IPFamilyPolicy: &requireDualStack,
// ClusterIP: "2000:0:0:0:0:0:0:1",
// ClusterIPs: []string{"2000:0:0:0:0:0:0:1", "1.2.3.4"},
// IPFamilies: []api.IPFamily{api.IPv6Protocol, api.IPv4Protocol},
// Ports: []api.ServicePort{{
// Port: 6502,
// Protocol: api.ProtocolTCP,
// TargetPort: intstr.FromInt(6502),
// }},
// },
// },
// },
// }
//
// for _, tc := range testCases {
// t.Run(tc.name, func(t *testing.T) {
// defer featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.IPv6DualStack, tc.enableDualStack)()
//
// families := []api.IPFamily{api.IPv4Protocol}
// if tc.enableDualStack {
// families = append(families, api.IPv6Protocol)
// }
// storage, registry, server := NewTestREST(t, nil, families)
// defer server.Terminate(t)
//
// ctx := genericapirequest.NewDefaultContext()
// _, err := storage.Create(ctx, tc.svc, rest.ValidateAllObjectFunc, &metav1.CreateOptions{DryRun: []string{metav1.DryRunAll}})
// if err != nil {
// t.Fatalf("Unexpected error: %v", err)
// }
//
// for i, family := range tc.svc.Spec.IPFamilies {
// alloc := storage.alloc.serviceIPAllocatorsByFamily[family]
// if alloc.Has(netutils.ParseIPSloppy(tc.svc.Spec.ClusterIPs[i])) {
// t.Errorf("unexpected side effect: ip allocated %v", tc.svc.Spec.ClusterIPs[i])
// }
// }
//
// srv, err := registry.GetService(ctx, tc.svc.Name, &metav1.GetOptions{})
// if err != nil {
// t.Errorf("unexpected error: %v", err)
// }
// if srv != nil {
// t.Errorf("unexpected service found: %v", srv)
// }
// })
// }
// }
// Prove that a dry-run create doesn't actually allocate IPs or ports.
func TestCreateDryRun(t *testing.T) {
testCases := []struct {
name string
clusterFamilies []api.IPFamily
enableDualStack bool
svc *api.Service
}{{
name: "singlestack:v4_gate:off_clusterip:unset",
clusterFamilies: []api.IPFamily{api.IPv4Protocol},
enableDualStack: false,
svc: svctest.MakeService("foo"),
}, {
name: "singlestack:v4_gate:off_clusterip:set",
clusterFamilies: []api.IPFamily{api.IPv4Protocol},
enableDualStack: false,
svc: svctest.MakeService("foo", svctest.SetClusterIPs("10.0.0.1")),
}, {
name: "singlestack:v6_gate:on_clusterip:unset",
clusterFamilies: []api.IPFamily{api.IPv6Protocol},
enableDualStack: true,
svc: svctest.MakeService("foo"),
}, {
name: "singlestack:v6_gate:on_clusterip:set",
clusterFamilies: []api.IPFamily{api.IPv6Protocol},
enableDualStack: true,
svc: svctest.MakeService("foo", svctest.SetClusterIPs("2000::1")),
}, {
name: "dualstack:v4v6_gate:on_clusterip:unset",
clusterFamilies: []api.IPFamily{api.IPv4Protocol, api.IPv6Protocol},
enableDualStack: true,
svc: svctest.MakeService("foo", svctest.SetIPFamilyPolicy(api.IPFamilyPolicyPreferDualStack)),
}, {
name: "dualstack:v4v6_gate:on_clusterip:set",
clusterFamilies: []api.IPFamily{api.IPv4Protocol, api.IPv6Protocol},
enableDualStack: true,
svc: svctest.MakeService("foo", svctest.SetIPFamilyPolicy(api.IPFamilyPolicyPreferDualStack), svctest.SetClusterIPs("10.0.0.1", "2000::1")),
}, {
name: "singlestack:v4_gate:off_type:NodePort_nodeport:unset",
clusterFamilies: []api.IPFamily{api.IPv4Protocol},
enableDualStack: false,
svc: svctest.MakeService("foo", svctest.SetTypeNodePort),
}, {
name: "singlestack:v4_gate:on_type:LoadBalancer_nodePort:set",
clusterFamilies: []api.IPFamily{api.IPv4Protocol},
enableDualStack: true,
svc: svctest.MakeService("foo", svctest.SetTypeLoadBalancer, svctest.SetUniqueNodePorts),
}}
for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
defer featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.IPv6DualStack, tc.enableDualStack)()
storage, _, server := newStorage(t, tc.clusterFamilies)
defer server.Terminate(t)
defer storage.Store.DestroyFunc()
ctx := genericapirequest.NewDefaultContext()
createdObj, err := storage.Create(ctx, tc.svc, rest.ValidateAllObjectFunc, &metav1.CreateOptions{DryRun: []string{metav1.DryRunAll}})
if err != nil {
t.Fatalf("unexpected error creating service: %v", err)
}
createdSvc := createdObj.(*api.Service)
// Ensure IPs were allocated
if netutils.ParseIPSloppy(createdSvc.Spec.ClusterIP) == nil {
t.Errorf("expected valid clusterIP: %v", createdSvc.Spec.ClusterIP)
}
// Ensure the IP allocators are clean.
if !tc.enableDualStack {
if storage.alloc.serviceIPAllocatorsByFamily[api.IPv4Protocol].Has(netutils.ParseIPSloppy(createdSvc.Spec.ClusterIP)) {
t.Errorf("expected IP to not be allocated: %v", createdSvc.Spec.ClusterIP)
}
} else {
for _, ip := range createdSvc.Spec.ClusterIPs {
if netutils.ParseIPSloppy(ip) == nil {
t.Errorf("expected valid clusterIP: %v", createdSvc.Spec.ClusterIP)
}
}
for i, fam := range createdSvc.Spec.IPFamilies {
if storage.alloc.serviceIPAllocatorsByFamily[fam].Has(netutils.ParseIPSloppy(createdSvc.Spec.ClusterIPs[i])) {
t.Errorf("expected IP to not be allocated: %v", createdSvc.Spec.ClusterIPs[i])
}
}
}
if tc.svc.Spec.Type != api.ServiceTypeClusterIP {
for _, p := range createdSvc.Spec.Ports {
if p.NodePort == 0 {
t.Errorf("expected a NodePort value")
}
if storage.alloc.serviceNodePorts.Has(int(p.NodePort)) {
t.Errorf("expected port to not be allocated: %v", p.NodePort)
}
}
}
})
}
}