Populate ClusterIPs on read

Old stored services will not have the `clusterIPs` field when read back
without this.

This includes some renaming for clarity and expanded comments, and a new
test for default on read.
This commit is contained in:
Tim Hockin 2020-10-26 21:49:57 -07:00
parent ad6a2af7d8
commit a4c9330683
4 changed files with 189 additions and 28 deletions

View File

@ -25,17 +25,17 @@ import (
"k8s.io/apiserver/pkg/registry/generic" "k8s.io/apiserver/pkg/registry/generic"
genericregistry "k8s.io/apiserver/pkg/registry/generic/registry" genericregistry "k8s.io/apiserver/pkg/registry/generic/registry"
"k8s.io/apiserver/pkg/registry/rest" "k8s.io/apiserver/pkg/registry/rest"
utilfeature "k8s.io/apiserver/pkg/util/feature"
api "k8s.io/kubernetes/pkg/apis/core" api "k8s.io/kubernetes/pkg/apis/core"
"k8s.io/kubernetes/pkg/features"
"k8s.io/kubernetes/pkg/printers" "k8s.io/kubernetes/pkg/printers"
printersinternal "k8s.io/kubernetes/pkg/printers/internalversion" printersinternal "k8s.io/kubernetes/pkg/printers/internalversion"
printerstorage "k8s.io/kubernetes/pkg/printers/storage" printerstorage "k8s.io/kubernetes/pkg/printers/storage"
"k8s.io/kubernetes/pkg/registry/core/service" "k8s.io/kubernetes/pkg/registry/core/service"
registry "k8s.io/kubernetes/pkg/registry/core/service" registry "k8s.io/kubernetes/pkg/registry/core/service"
svcreg "k8s.io/kubernetes/pkg/registry/core/service"
netutil "k8s.io/utils/net" netutil "k8s.io/utils/net"
utilfeature "k8s.io/apiserver/pkg/util/feature"
"k8s.io/kubernetes/pkg/features"
) )
type GenericREST struct { type GenericREST struct {
@ -85,7 +85,7 @@ func NewGenericREST(optsGetter generic.RESTOptionsGetter, serviceCIDR net.IPNet,
} }
} }
genericStore := &GenericREST{store, primaryIPFamily, secondaryFamily} genericStore := &GenericREST{store, primaryIPFamily, secondaryFamily}
store.Decorator = genericStore.defaultServiceOnRead // default on read store.Decorator = genericStore.defaultOnRead
return genericStore, &StatusREST{store: &statusStore}, nil return genericStore, &StatusREST{store: &statusStore}, nil
} }
@ -126,35 +126,35 @@ func (r *StatusREST) Update(ctx context.Context, name string, objInfo rest.Updat
return r.store.Update(ctx, name, objInfo, createValidation, updateValidation, false, options) return r.store.Update(ctx, name, objInfo, createValidation, updateValidation, false, options)
} }
// defaults fields that were not previously set on read. becomes an // defaultOnRead sets interlinked fields that were not previously set on read.
// essential part of upgrading a service // We can't do this in the normal defaulting path because that same logic
func (r *GenericREST) defaultServiceOnRead(obj runtime.Object) error { // applies on Get, Create, and Update, but we need to distinguish between them.
if !utilfeature.DefaultFeatureGate.Enabled(features.IPv6DualStack) { //
return nil // This will be called on both Service and ServiceList types.
} func (r *GenericREST) defaultOnRead(obj runtime.Object) error {
service, ok := obj.(*api.Service) service, ok := obj.(*api.Service)
if ok { if ok {
return r.defaultAServiceOnRead(service) return r.defaultOnReadService(service)
} }
serviceList, ok := obj.(*api.ServiceList) serviceList, ok := obj.(*api.ServiceList)
if ok { if ok {
return r.defaultServiceList(serviceList) return r.defaultOnReadServiceList(serviceList)
} }
// this was not an object we can default // This was not an object we can default. This is not an error, as the
// caching layer can pass through here, too.
return nil return nil
} }
// defaults a service list // defaultOnReadServiceList defaults a ServiceList.
func (r *GenericREST) defaultServiceList(serviceList *api.ServiceList) error { func (r *GenericREST) defaultOnReadServiceList(serviceList *api.ServiceList) error {
if serviceList == nil { if serviceList == nil {
return nil return nil
} }
for i := range serviceList.Items { for i := range serviceList.Items {
err := r.defaultAServiceOnRead(&serviceList.Items[i]) err := r.defaultOnReadService(&serviceList.Items[i])
if err != nil { if err != nil {
return err return err
} }
@ -163,12 +163,22 @@ func (r *GenericREST) defaultServiceList(serviceList *api.ServiceList) error {
return nil return nil
} }
// defaults a single service // defaultOnReadService defaults a single Service.
func (r *GenericREST) defaultAServiceOnRead(service *api.Service) error { func (r *GenericREST) defaultOnReadService(service *api.Service) error {
if service == nil { if service == nil {
return nil return nil
} }
// We might find Services that were written before ClusterIP became plural.
// We still want to present a consistent view of them.
// NOTE: the args are (old, new)
svcreg.NormalizeClusterIPs(nil, service)
// The rest of this does not apply unless dual-stack is enabled.
if !utilfeature.DefaultFeatureGate.Enabled(features.IPv6DualStack) {
return nil
}
if len(service.Spec.IPFamilies) > 0 { if len(service.Spec.IPFamilies) > 0 {
return nil // already defaulted return nil // already defaulted
} }

View File

@ -18,6 +18,7 @@ package storage
import ( import (
"net" "net"
"reflect"
"testing" "testing"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
@ -322,6 +323,155 @@ func makeServiceList() (undefaulted, defaulted *api.ServiceList) {
return undefaulted, defaulted return undefaulted, defaulted
} }
func TestServiceDefaultOnRead(t *testing.T) {
// Helper makes a mostly-valid Service. Test-cases can tweak it as needed.
makeService := func(tweak func(*api.Service)) *api.Service {
svc := &api.Service{
ObjectMeta: metav1.ObjectMeta{Name: "svc", Namespace: "ns"},
Spec: api.ServiceSpec{
Type: api.ServiceTypeClusterIP,
ClusterIP: "1.2.3.4",
ClusterIPs: []string{"1.2.3.4"},
},
}
if tweak != nil {
tweak(svc)
}
return svc
}
// Helper makes a mostly-valid ServiceList. Test-cases can tweak it as needed.
makeServiceList := func(tweak func(*api.ServiceList)) *api.ServiceList {
list := &api.ServiceList{
Items: []api.Service{{
ObjectMeta: metav1.ObjectMeta{Name: "svc", Namespace: "ns"},
Spec: api.ServiceSpec{
Type: api.ServiceTypeClusterIP,
ClusterIP: "1.2.3.4",
ClusterIPs: []string{"1.2.3.4"},
},
}},
}
if tweak != nil {
tweak(list)
}
return list
}
testCases := []struct {
name string
input runtime.Object
expectErr bool
expect runtime.Object
}{{
name: "no change v4",
input: makeService(nil),
expect: makeService(nil),
}, {
name: "missing clusterIPs v4",
input: makeService(func(svc *api.Service) {
svc.Spec.ClusterIPs = nil
}),
expect: makeService(nil),
}, {
name: "no change v6",
input: makeService(func(svc *api.Service) {
svc.Spec.ClusterIP = "2000::"
svc.Spec.ClusterIPs = []string{"2000::"}
}),
expect: makeService(func(svc *api.Service) {
svc.Spec.ClusterIP = "2000::"
svc.Spec.ClusterIPs = []string{"2000::"}
}),
}, {
name: "missing clusterIPs v6",
input: makeService(func(svc *api.Service) {
svc.Spec.ClusterIP = "2000::"
svc.Spec.ClusterIPs = nil
}),
expect: makeService(func(svc *api.Service) {
svc.Spec.ClusterIP = "2000::"
svc.Spec.ClusterIPs = []string{"2000::"}
}),
}, {
name: "list, no change v4",
input: makeServiceList(nil),
expect: makeServiceList(nil),
}, {
name: "list, missing clusterIPs v4",
input: makeServiceList(func(list *api.ServiceList) {
list.Items[0].Spec.ClusterIPs = nil
}),
expect: makeService(nil),
}, {
name: "not Service or ServiceList",
input: &api.Pod{},
expectErr: false,
}}
for _, tc := range testCases {
makeStorage := func(t *testing.T) (*GenericREST, *etcd3testing.EtcdTestServer) {
etcdStorage, server := registrytest.NewEtcdStorage(t, "")
restOptions := generic.RESTOptions{
StorageConfig: etcdStorage,
Decorator: generic.UndecoratedStorage,
DeleteCollectionWorkers: 1,
ResourcePrefix: "services",
}
_, cidr, err := net.ParseCIDR("10.0.0.0/24")
if err != nil {
t.Fatalf("failed to parse CIDR")
}
serviceStorage, _, err := NewGenericREST(restOptions, *cidr, false)
if err != nil {
t.Fatalf("unexpected error from REST storage: %v", err)
}
return serviceStorage, server
}
t.Run(tc.name, func(t *testing.T) {
storage, server := makeStorage(t)
defer server.Terminate(t)
defer storage.Store.DestroyFunc()
tmp := tc.input.DeepCopyObject()
err := storage.defaultOnRead(tmp)
if err != nil && !tc.expectErr {
t.Errorf("unexpected error: %v", err)
}
if err == nil && tc.expectErr {
t.Errorf("unexpected success")
}
svc, ok := tmp.(*api.Service)
if !ok {
list, ok := tmp.(*api.ServiceList)
if !ok {
return
}
svc = &list.Items[0]
}
exp, ok := tc.expect.(*api.Service)
if !ok {
list, ok := tc.expect.(*api.ServiceList)
if !ok {
return
}
exp = &list.Items[0]
}
// Verify fields we know are affected
if svc.Spec.ClusterIP != exp.Spec.ClusterIP {
t.Errorf("clusterIP: expected %v, got %v", exp.Spec.ClusterIP, svc.Spec.ClusterIP)
}
if !reflect.DeepEqual(svc.Spec.ClusterIPs, exp.Spec.ClusterIPs) {
t.Errorf("clusterIPs: expected %v, got %v", exp.Spec.ClusterIPs, svc.Spec.ClusterIPs)
}
})
}
}
func TestServiceDefaulting(t *testing.T) { func TestServiceDefaulting(t *testing.T) {
makeStorage := func(t *testing.T, primaryCIDR string, isDualStack bool) (*GenericREST, *StatusREST, *etcd3testing.EtcdTestServer) { makeStorage := func(t *testing.T, primaryCIDR string, isDualStack bool) (*GenericREST, *StatusREST, *etcd3testing.EtcdTestServer) {
etcdStorage, server := registrytest.NewEtcdStorage(t, "") etcdStorage, server := registrytest.NewEtcdStorage(t, "")
@ -455,15 +605,15 @@ func TestServiceDefaulting(t *testing.T) {
} }
copyUndefaultedList := undefaultedServiceList.DeepCopy() copyUndefaultedList := undefaultedServiceList.DeepCopy()
// run for each service // run for each Service
for i, svc := range copyUndefaultedList.Items { for i, svc := range copyUndefaultedList.Items {
storage.defaultServiceOnRead(&svc) storage.defaultOnRead(&svc)
compareSvc(svc, defaultedServiceList.Items[i]) compareSvc(svc, defaultedServiceList.Items[i])
} }
copyUndefaultedList = undefaultedServiceList.DeepCopy() copyUndefaultedList = undefaultedServiceList.DeepCopy()
// run as a servicr list // run as a ServiceList
storage.defaultServiceOnRead(copyUndefaultedList) storage.defaultOnRead(copyUndefaultedList)
for i, svc := range copyUndefaultedList.Items { for i, svc := range copyUndefaultedList.Items {
compareSvc(svc, defaultedServiceList.Items[i]) compareSvc(svc, defaultedServiceList.Items[i])
} }

View File

@ -97,7 +97,7 @@ func (strategy svcStrategy) PrepareForCreate(ctx context.Context, obj runtime.Ob
service := obj.(*api.Service) service := obj.(*api.Service)
service.Status = api.ServiceStatus{} service.Status = api.ServiceStatus{}
normalizeClusterIPs(nil, service) NormalizeClusterIPs(nil, service)
dropServiceDisabledFields(service, nil) dropServiceDisabledFields(service, nil)
} }
@ -107,7 +107,7 @@ func (strategy svcStrategy) PrepareForUpdate(ctx context.Context, obj, old runti
oldService := old.(*api.Service) oldService := old.(*api.Service)
newService.Status = oldService.Status newService.Status = oldService.Status
normalizeClusterIPs(oldService, newService) NormalizeClusterIPs(oldService, newService)
dropServiceDisabledFields(newService, oldService) dropServiceDisabledFields(newService, oldService)
dropTypeDependentFields(newService, oldService) dropTypeDependentFields(newService, oldService)
trimFieldsForDualStackDowngrade(newService, oldService) trimFieldsForDualStackDowngrade(newService, oldService)
@ -224,8 +224,9 @@ func (serviceStatusStrategy) ValidateUpdate(ctx context.Context, obj, old runtim
return validation.ValidateServiceStatusUpdate(obj.(*api.Service), old.(*api.Service)) return validation.ValidateServiceStatusUpdate(obj.(*api.Service), old.(*api.Service))
} }
// normalizeClusterIPs adjust clusterIPs based on ClusterIP // NormalizeClusterIPs adjust clusterIPs based on ClusterIP. This must not
func normalizeClusterIPs(oldSvc *api.Service, newSvc *api.Service) { // consider any other fields.
func NormalizeClusterIPs(oldSvc, newSvc *api.Service) {
// In all cases here, we don't need to over-think the inputs. Validation // In all cases here, we don't need to over-think the inputs. Validation
// will be called on the new object soon enough. All this needs to do is // will be called on the new object soon enough. All this needs to do is
// try to divine what user meant with these linked fields. The below // try to divine what user meant with these linked fields. The below

View File

@ -704,7 +704,7 @@ func TestNormalizeClusterIPs(t *testing.T) {
for _, tc := range testCases { for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) { t.Run(tc.name, func(t *testing.T) {
normalizeClusterIPs(tc.oldService, tc.newService) NormalizeClusterIPs(tc.oldService, tc.newService)
if tc.newService == nil { if tc.newService == nil {
t.Fatalf("unexpected new service to be nil") t.Fatalf("unexpected new service to be nil")