diff --git a/pkg/registry/core/rest/storage_core.go b/pkg/registry/core/rest/storage_core.go index 61af3b7c0e8..443a955a8d8 100644 --- a/pkg/registry/core/rest/storage_core.go +++ b/pkg/registry/core/rest/storage_core.go @@ -261,12 +261,14 @@ func (c LegacyRESTStorageProvider) NewLegacyRESTStorage(restOptionsGetter generi serviceIPAllocators[secondaryServiceClusterIPAllocator.IPFamily()] = secondaryServiceClusterIPAllocator } - serviceRESTStorage, serviceStatusStorage, err := servicestore.NewGenericREST( + serviceRESTStorage, serviceStatusStorage, _, err := servicestore.NewGenericREST( restOptionsGetter, serviceClusterIPAllocator.IPFamily(), serviceIPAllocators, serviceNodePortAllocator, - endpointsStorage) + endpointsStorage, + podStorage.Pod, + c.ProxyTransport) if err != nil { return LegacyRESTStorage{}, genericapiserver.APIGroupInfo{}, err } diff --git a/pkg/registry/core/service/storage/rest.go b/pkg/registry/core/service/storage/rest.go index 8948207722c..30169021d08 100644 --- a/pkg/registry/core/service/storage/rest.go +++ b/pkg/registry/core/service/storage/rest.go @@ -19,17 +19,14 @@ package storage import ( "context" "fmt" - "math/rand" "net" "net/http" "net/url" - "strconv" "k8s.io/apimachinery/pkg/api/errors" metainternalversion "k8s.io/apimachinery/pkg/apis/meta/internalversion" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime" - utilnet "k8s.io/apimachinery/pkg/util/net" utilruntime "k8s.io/apimachinery/pkg/util/runtime" "k8s.io/apimachinery/pkg/util/validation/field" "k8s.io/apimachinery/pkg/watch" @@ -84,6 +81,7 @@ type ServiceStorage interface { rest.Watcher rest.StorageVersionProvider rest.ResetFieldsStrategy + rest.Redirector } // NewREST returns a wrapper around the underlying generic storage and performs @@ -360,74 +358,7 @@ var _ = rest.Redirector(&REST{}) // ResourceLocation returns a URL to which one can send traffic for the specified service. func (rs *REST) ResourceLocation(ctx context.Context, id string) (*url.URL, http.RoundTripper, error) { - // Allow ID as "svcname", "svcname:port", or "scheme:svcname:port". - svcScheme, svcName, portStr, valid := utilnet.SplitSchemeNamePort(id) - if !valid { - return nil, nil, errors.NewBadRequest(fmt.Sprintf("invalid service request %q", id)) - } - - // If a port *number* was specified, find the corresponding service port name - if portNum, err := strconv.ParseInt(portStr, 10, 64); err == nil { - obj, err := rs.services.Get(ctx, svcName, &metav1.GetOptions{}) - if err != nil { - return nil, nil, err - } - svc := obj.(*api.Service) - found := false - for _, svcPort := range svc.Spec.Ports { - if int64(svcPort.Port) == portNum { - // use the declared port's name - portStr = svcPort.Name - found = true - break - } - } - if !found { - return nil, nil, errors.NewServiceUnavailable(fmt.Sprintf("no service port %d found for service %q", portNum, svcName)) - } - } - - obj, err := rs.endpoints.Get(ctx, svcName, &metav1.GetOptions{}) - if err != nil { - return nil, nil, err - } - eps := obj.(*api.Endpoints) - if len(eps.Subsets) == 0 { - return nil, nil, errors.NewServiceUnavailable(fmt.Sprintf("no endpoints available for service %q", svcName)) - } - // Pick a random Subset to start searching from. - ssSeed := rand.Intn(len(eps.Subsets)) - // Find a Subset that has the port. - for ssi := 0; ssi < len(eps.Subsets); ssi++ { - ss := &eps.Subsets[(ssSeed+ssi)%len(eps.Subsets)] - if len(ss.Addresses) == 0 { - continue - } - for i := range ss.Ports { - if ss.Ports[i].Name == portStr { - addrSeed := rand.Intn(len(ss.Addresses)) - // This is a little wonky, but it's expensive to test for the presence of a Pod - // So we repeatedly try at random and validate it, this means that for an invalid - // service with a lot of endpoints we're going to potentially make a lot of calls, - // but in the expected case we'll only make one. - for try := 0; try < len(ss.Addresses); try++ { - addr := ss.Addresses[(addrSeed+try)%len(ss.Addresses)] - if err := isValidAddress(ctx, &addr, rs.pods); err != nil { - utilruntime.HandleError(fmt.Errorf("Address %v isn't valid (%v)", addr, err)) - continue - } - ip := addr.IP - port := int(ss.Ports[i].Port) - return &url.URL{ - Scheme: svcScheme, - Host: net.JoinHostPort(ip, strconv.Itoa(port)), - }, rs.proxyTransport, nil - } - utilruntime.HandleError(fmt.Errorf("Failed to find a valid address, skipping subset: %v", ss)) - } - } - } - return nil, nil, errors.NewServiceUnavailable(fmt.Sprintf("no endpoints available for service %q", id)) + return rs.services.ResourceLocation(ctx, id) } func (r *REST) ConvertToTable(ctx context.Context, object runtime.Object, tableOptions runtime.Object) (*metav1.Table, error) { diff --git a/pkg/registry/core/service/storage/rest_test.go b/pkg/registry/core/service/storage/rest_test.go index 5ca2ebee3ad..c79d7532aad 100644 --- a/pkg/registry/core/service/storage/rest_test.go +++ b/pkg/registry/core/service/storage/rest_test.go @@ -20,6 +20,8 @@ import ( "context" "fmt" "net" + "net/http" + "net/url" "reflect" "sort" "testing" @@ -40,7 +42,6 @@ import ( "k8s.io/apiserver/pkg/util/dryrun" utilfeature "k8s.io/apiserver/pkg/util/feature" featuregatetesting "k8s.io/component-base/featuregate/testing" - epstest "k8s.io/kubernetes/pkg/api/endpoints/testing" svctest "k8s.io/kubernetes/pkg/api/service/testing" api "k8s.io/kubernetes/pkg/apis/core" "k8s.io/kubernetes/pkg/features" @@ -178,14 +179,16 @@ func (s *serviceStorage) StorageVersion() runtime.GroupVersioner { // GetResetFields implements rest.ResetFieldsStrategy func (s *serviceStorage) GetResetFields() map[fieldpath.APIVersion]*fieldpath.Set { + //FIXME: should panic? return nil } -func NewTestREST(t *testing.T, ipFamilies []api.IPFamily) (*REST, *etcd3testing.EtcdTestServer) { - return NewTestRESTWithPods(t, nil, nil, ipFamilies) +// ResourceLocation implements rest.Redirector +func (s *serviceStorage) ResourceLocation(ctx context.Context, id string) (remoteLocation *url.URL, transport http.RoundTripper, err error) { + panic("not implemented") } -func NewTestRESTWithPods(t *testing.T, endpoints []*api.Endpoints, pods []api.Pod, ipFamilies []api.IPFamily) (*REST, *etcd3testing.EtcdTestServer) { +func NewTestREST(t *testing.T, ipFamilies []api.IPFamily) (*REST, *etcd3testing.EtcdTestServer) { etcdStorage, server := registrytest.NewEtcdStorage(t, "") podStorage, err := podstore.NewStorage(generic.RESTOptions{ @@ -197,13 +200,7 @@ func NewTestRESTWithPods(t *testing.T, endpoints []*api.Endpoints, pods []api.Po if err != nil { t.Fatalf("unexpected error from REST storage: %v", err) } - ctx := genericapirequest.NewDefaultContext() - for ix := range pods { - key, _ := podStorage.Pod.KeyFunc(ctx, pods[ix].Name) - if err := podStorage.Pod.Storage.Create(ctx, key, &pods[ix], nil, 0, false); err != nil { - t.Fatalf("Couldn't create pod: %v", err) - } - } + endpointStorage, err := endpointstore.NewREST(generic.RESTOptions{ StorageConfig: etcdStorage.ForResource(schema.GroupResource{Resource: "endpoints"}), Decorator: generic.UndecoratedStorage, @@ -212,12 +209,6 @@ func NewTestRESTWithPods(t *testing.T, endpoints []*api.Endpoints, pods []api.Po if err != nil { t.Fatalf("unexpected error from REST storage: %v", err) } - for ix := range endpoints { - key, _ := endpointStorage.KeyFunc(ctx, endpoints[ix].Name) - if err := endpointStorage.Store.Storage.Create(ctx, key, endpoints[ix], nil, 0, false); err != nil { - t.Fatalf("Couldn't create endpoint: %v", err) - } - } var rPrimary ipallocator.Interface var rSecondary ipallocator.Interface @@ -281,7 +272,7 @@ func newInnerREST(t *testing.T, etcdStorage *storagebackend.ConfigForResource, i ResourcePrefix: "endpoints", }) - inner, _, err := NewGenericREST(restOptions, api.IPv4Protocol, ipAllocs, portAlloc, endpoints) + inner, _, _, err := NewGenericREST(restOptions, api.IPv4Protocol, ipAllocs, portAlloc, endpoints, nil, nil) if err != nil { t.Fatalf("unexpected error from REST storage: %v", err) } @@ -660,137 +651,6 @@ func makePod(name string, ips ...string) api.Pod { return p } -func TestServiceRegistryResourceLocation(t *testing.T) { - pods := []api.Pod{ - makePod("unnamed", "1.2.3.4", "1.2.3.5"), - makePod("named", "1.2.3.6", "1.2.3.7"), - makePod("no-endpoints", "9.9.9.9"), // to prove this does not get chosen - } - - endpoints := []*api.Endpoints{ - epstest.MakeEndpoints("unnamed", - []api.EndpointAddress{ - epstest.MakeEndpointAddress("1.2.3.4", "unnamed"), - }, - []api.EndpointPort{ - epstest.MakeEndpointPort("", 80), - }), - epstest.MakeEndpoints("unnamed2", - []api.EndpointAddress{ - epstest.MakeEndpointAddress("1.2.3.5", "unnamed"), - }, - []api.EndpointPort{ - epstest.MakeEndpointPort("", 80), - }), - epstest.MakeEndpoints("named", - []api.EndpointAddress{ - epstest.MakeEndpointAddress("1.2.3.6", "named"), - }, - []api.EndpointPort{ - epstest.MakeEndpointPort("p", 80), - epstest.MakeEndpointPort("q", 81), - }), - epstest.MakeEndpoints("no-endpoints", nil, nil), // to prove this does not get chosen - } - - storage, server := NewTestRESTWithPods(t, endpoints, pods, []api.IPFamily{api.IPv4Protocol}) - defer server.Terminate(t) - - ctx := genericapirequest.NewDefaultContext() - for _, name := range []string{"unnamed", "unnamed2", "no-endpoints"} { - _, err := storage.Create(ctx, - svctest.MakeService(name, svctest.SetPorts( - svctest.MakeServicePort("", 93, intstr.FromInt(80), api.ProtocolTCP))), - rest.ValidateAllObjectFunc, &metav1.CreateOptions{}) - if err != nil { - t.Fatalf("unexpected error creating service %q: %v", name, err) - } - - } - _, err := storage.Create(ctx, - svctest.MakeService("named", svctest.SetPorts( - svctest.MakeServicePort("p", 93, intstr.FromInt(80), api.ProtocolTCP), - svctest.MakeServicePort("q", 76, intstr.FromInt(81), api.ProtocolTCP))), - rest.ValidateAllObjectFunc, &metav1.CreateOptions{}) - if err != nil { - t.Fatalf("unexpected error creating service %q: %v", "named", err) - } - redirector := rest.Redirector(storage) - - cases := []struct { - query string - err bool - expect string - }{{ - query: "unnamed", - expect: "//1.2.3.4:80", - }, { - query: "unnamed:", - expect: "//1.2.3.4:80", - }, { - query: "unnamed:93", - expect: "//1.2.3.4:80", - }, { - query: "http:unnamed:", - expect: "http://1.2.3.4:80", - }, { - query: "http:unnamed:93", - expect: "http://1.2.3.4:80", - }, { - query: "unnamed:80", - err: true, - }, { - query: "unnamed2", - expect: "//1.2.3.5:80", - }, { - query: "named:p", - expect: "//1.2.3.6:80", - }, { - query: "named:q", - expect: "//1.2.3.6:81", - }, { - query: "named:93", - expect: "//1.2.3.6:80", - }, { - query: "named:76", - expect: "//1.2.3.6:81", - }, { - query: "http:named:p", - expect: "http://1.2.3.6:80", - }, { - query: "http:named:q", - expect: "http://1.2.3.6:81", - }, { - query: "named:bad", - err: true, - }, { - query: "no-endpoints", - err: true, - }, { - query: "non-existent", - err: true, - }} - for _, tc := range cases { - t.Run(tc.query, func(t *testing.T) { - location, _, err := redirector.ResourceLocation(ctx, tc.query) - if tc.err == false && err != nil { - t.Fatalf("unexpected error: %v", err) - } - if tc.err == true && err == nil { - t.Fatalf("unexpected success") - } - if !tc.err { - if location == nil { - t.Errorf("unexpected location: %v", location) - } - if e, a := tc.expect, location.String(); e != a { - t.Errorf("expected %q, but got %q", e, a) - } - } - }) - } -} - func TestServiceRegistryList(t *testing.T) { ctx := genericapirequest.NewDefaultContext() storage, server := NewTestREST(t, []api.IPFamily{api.IPv4Protocol}) diff --git a/pkg/registry/core/service/storage/storage.go b/pkg/registry/core/service/storage/storage.go index a385fc7a96e..dbd01c4b097 100644 --- a/pkg/registry/core/service/storage/storage.go +++ b/pkg/registry/core/service/storage/storage.go @@ -18,16 +18,24 @@ package storage import ( "context" + "fmt" + "math/rand" + "net" + "net/http" + "net/url" + "strconv" + "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime" + utilnet "k8s.io/apimachinery/pkg/util/net" + utilruntime "k8s.io/apimachinery/pkg/util/runtime" genericapirequest "k8s.io/apiserver/pkg/endpoints/request" "k8s.io/apiserver/pkg/registry/generic" genericregistry "k8s.io/apiserver/pkg/registry/generic/registry" "k8s.io/apiserver/pkg/registry/rest" "k8s.io/apiserver/pkg/util/dryrun" utilfeature "k8s.io/apiserver/pkg/util/feature" - "k8s.io/cri-api/pkg/errors" "k8s.io/klog/v2" api "k8s.io/kubernetes/pkg/apis/core" "k8s.io/kubernetes/pkg/features" @@ -48,12 +56,18 @@ type EndpointsStorage interface { rest.GracefulDeleter } +type PodStorage interface { + rest.Getter +} + type GenericREST struct { *genericregistry.Store primaryIPFamily api.IPFamily secondaryIPFamily api.IPFamily alloc RESTAllocStuff endpoints EndpointsStorage + pods PodStorage + proxyTransport http.RoundTripper } // NewGenericREST returns a RESTStorage object that will work against services. @@ -62,7 +76,9 @@ func NewGenericREST( serviceIPFamily api.IPFamily, ipAllocs map[api.IPFamily]ipallocator.Interface, portAlloc portallocator.Interface, - endpoints EndpointsStorage) (*GenericREST, *StatusREST, error) { + endpoints EndpointsStorage, + pods PodStorage, + proxyTransport http.RoundTripper) (*GenericREST, *StatusREST, *svcreg.ProxyREST, error) { strategy, _ := svcreg.StrategyForServiceCIDRs(ipAllocs[serviceIPFamily].CIDR(), len(ipAllocs) > 1) @@ -81,7 +97,7 @@ func NewGenericREST( } options := &generic.StoreOptions{RESTOptions: optsGetter} if err := store.CompleteWithOptions(options); err != nil { - return nil, nil, err + return nil, nil, nil, err } statusStore := *store @@ -100,13 +116,15 @@ func NewGenericREST( secondaryIPFamily: secondaryIPFamily, alloc: makeAlloc(serviceIPFamily, ipAllocs, portAlloc), endpoints: endpoints, + pods: pods, + proxyTransport: proxyTransport, } store.Decorator = genericStore.defaultOnRead store.AfterDelete = genericStore.afterDelete store.BeginCreate = genericStore.beginCreate store.BeginUpdate = genericStore.beginUpdate - return genericStore, &StatusREST{store: &statusStore}, nil + return genericStore, &StatusREST{store: &statusStore}, &svcreg.ProxyREST{Redirector: genericStore, ProxyTransport: proxyTransport}, nil } // otherFamily returns the non-selected IPFamily. This assumes the input is @@ -352,3 +370,79 @@ func (r *GenericREST) beginUpdate(ctx context.Context, obj, oldObj runtime.Objec return finish, nil } + +// Implement Redirector. +var _ rest.Redirector = &GenericREST{} + +// ResourceLocation returns a URL to which one can send traffic for the specified service. +func (r *GenericREST) ResourceLocation(ctx context.Context, id string) (*url.URL, http.RoundTripper, error) { + // Allow ID as "svcname", "svcname:port", or "scheme:svcname:port". + svcScheme, svcName, portStr, valid := utilnet.SplitSchemeNamePort(id) + if !valid { + return nil, nil, errors.NewBadRequest(fmt.Sprintf("invalid service request %q", id)) + } + + // If a port *number* was specified, find the corresponding service port name + if portNum, err := strconv.ParseInt(portStr, 10, 64); err == nil { + obj, err := r.Get(ctx, svcName, &metav1.GetOptions{}) + if err != nil { + return nil, nil, err + } + svc := obj.(*api.Service) + found := false + for _, svcPort := range svc.Spec.Ports { + if int64(svcPort.Port) == portNum { + // use the declared port's name + portStr = svcPort.Name + found = true + break + } + } + if !found { + return nil, nil, errors.NewServiceUnavailable(fmt.Sprintf("no service port %d found for service %q", portNum, svcName)) + } + } + + obj, err := r.endpoints.Get(ctx, svcName, &metav1.GetOptions{}) + if err != nil { + return nil, nil, err + } + eps := obj.(*api.Endpoints) + if len(eps.Subsets) == 0 { + return nil, nil, errors.NewServiceUnavailable(fmt.Sprintf("no endpoints available for service %q", svcName)) + } + // Pick a random Subset to start searching from. + ssSeed := rand.Intn(len(eps.Subsets)) + // Find a Subset that has the port. + for ssi := 0; ssi < len(eps.Subsets); ssi++ { + ss := &eps.Subsets[(ssSeed+ssi)%len(eps.Subsets)] + if len(ss.Addresses) == 0 { + continue + } + for i := range ss.Ports { + if ss.Ports[i].Name == portStr { + addrSeed := rand.Intn(len(ss.Addresses)) + // This is a little wonky, but it's expensive to test for the presence of a Pod + // So we repeatedly try at random and validate it, this means that for an invalid + // service with a lot of endpoints we're going to potentially make a lot of calls, + // but in the expected case we'll only make one. + for try := 0; try < len(ss.Addresses); try++ { + addr := ss.Addresses[(addrSeed+try)%len(ss.Addresses)] + // TODO(thockin): do we really need this check? + if err := isValidAddress(ctx, &addr, r.pods); err != nil { + utilruntime.HandleError(fmt.Errorf("Address %v isn't valid (%v)", addr, err)) + continue + } + ip := addr.IP + port := int(ss.Ports[i].Port) + return &url.URL{ + Scheme: svcScheme, + Host: net.JoinHostPort(ip, strconv.Itoa(port)), + }, r.proxyTransport, nil + } + utilruntime.HandleError(fmt.Errorf("Failed to find a valid address, skipping subset: %v", ss)) + } + } + } + return nil, nil, errors.NewServiceUnavailable(fmt.Sprintf("no endpoints available for service %q", id)) +} diff --git a/pkg/registry/core/service/storage/storage_test.go b/pkg/registry/core/service/storage/storage_test.go index f958b2e29ea..1b5247a7589 100644 --- a/pkg/registry/core/service/storage/storage_test.go +++ b/pkg/registry/core/service/storage/storage_test.go @@ -17,7 +17,6 @@ limitations under the License. package storage import ( - "context" "fmt" "net" "reflect" @@ -39,9 +38,12 @@ import ( etcd3testing "k8s.io/apiserver/pkg/storage/etcd3/testing" utilfeature "k8s.io/apiserver/pkg/util/feature" featuregatetesting "k8s.io/component-base/featuregate/testing" + epstest "k8s.io/kubernetes/pkg/api/endpoints/testing" svctest "k8s.io/kubernetes/pkg/api/service/testing" api "k8s.io/kubernetes/pkg/apis/core" "k8s.io/kubernetes/pkg/features" + endpointstore "k8s.io/kubernetes/pkg/registry/core/endpoint/storage" + podstore "k8s.io/kubernetes/pkg/registry/core/pod/storage" "k8s.io/kubernetes/pkg/registry/core/service/ipallocator" "k8s.io/kubernetes/pkg/registry/core/service/portallocator" "k8s.io/kubernetes/pkg/registry/registrytest" @@ -64,16 +66,6 @@ func makePortAllocator(ports machineryutilnet.PortRange) portallocator.Interface return al } -type fakeEndpoints struct{} - -func (fakeEndpoints) Delete(_ context.Context, _ string, _ rest.ValidateObjectFunc, _ *metav1.DeleteOptions) (runtime.Object, bool, error) { - return nil, false, nil -} - -func (fakeEndpoints) Get(_ context.Context, _ string, _ *metav1.GetOptions) (runtime.Object, error) { - return nil, nil -} - func ipIsAllocated(t *testing.T, alloc ipallocator.Interface, ipstr string) bool { t.Helper() ip := netutils.ParseIPSloppy(ipstr) @@ -94,6 +86,10 @@ func portIsAllocated(t *testing.T, alloc portallocator.Interface, port int32) bo } func newStorage(t *testing.T, ipFamilies []api.IPFamily) (*GenericREST, *StatusREST, *etcd3testing.EtcdTestServer) { + return newStorageWithPods(t, ipFamilies, nil, nil) +} + +func newStorageWithPods(t *testing.T, ipFamilies []api.IPFamily, pods []api.Pod, endpoints []*api.Endpoints) (*GenericREST, *StatusREST, *etcd3testing.EtcdTestServer) { etcdStorage, server := registrytest.NewEtcdStorage(t, "") restOptions := generic.RESTOptions{ StorageConfig: etcdStorage.ForResource(schema.GroupResource{Resource: "services"}), @@ -118,7 +114,45 @@ func newStorage(t *testing.T, ipFamilies []api.IPFamily) (*GenericREST, *StatusR portAlloc := makePortAllocator(*(machineryutilnet.ParsePortRangeOrDie("30000-32767"))) - serviceStorage, statusStorage, err := NewGenericREST(restOptions, ipFamilies[0], ipAllocs, portAlloc, fakeEndpoints{}) + // Not all tests will specify pods and endpoints. + podStorage, err := podstore.NewStorage(generic.RESTOptions{ + StorageConfig: etcdStorage, + Decorator: generic.UndecoratedStorage, + DeleteCollectionWorkers: 3, + ResourcePrefix: "pods", + }, nil, nil, nil) + if err != nil { + t.Fatalf("unexpected error from REST storage: %v", err) + } + if pods != nil && len(pods) > 0 { + ctx := genericapirequest.NewDefaultContext() + for ix := range pods { + key, _ := podStorage.Pod.KeyFunc(ctx, pods[ix].Name) + if err := podStorage.Pod.Storage.Create(ctx, key, &pods[ix], nil, 0, false); err != nil { + t.Fatalf("Couldn't create pod: %v", err) + } + } + } + + endpointsStorage, err := endpointstore.NewREST(generic.RESTOptions{ + StorageConfig: etcdStorage, + Decorator: generic.UndecoratedStorage, + ResourcePrefix: "endpoints", + }) + if err != nil { + t.Fatalf("unexpected error from REST storage: %v", err) + } + if endpoints != nil && len(endpoints) > 0 { + ctx := genericapirequest.NewDefaultContext() + for ix := range endpoints { + key, _ := endpointsStorage.KeyFunc(ctx, endpoints[ix].Name) + if err := endpointsStorage.Store.Storage.Create(ctx, key, endpoints[ix], nil, 0, false); err != nil { + t.Fatalf("Couldn't create endpoint: %v", err) + } + } + } + + serviceStorage, statusStorage, _, err := NewGenericREST(restOptions, ipFamilies[0], ipAllocs, portAlloc, endpointsStorage, podStorage.Pod, nil) if err != nil { t.Fatalf("unexpected error from REST storage: %v", err) } @@ -7229,3 +7263,137 @@ func TestFeatureExternalTrafficPolicy(t *testing.T) { // lbsourceranges, externalname, itp, PublishNotReadyAddresses, // ipfamilypolicy and list, // AllocateLoadBalancerNodePorts, LoadBalancerClass, status + +func TestServiceRegistryResourceLocation(t *testing.T) { + pods := []api.Pod{ + makePod("unnamed", "1.2.3.4", "1.2.3.5"), + makePod("named", "1.2.3.6", "1.2.3.7"), + makePod("no-endpoints", "9.9.9.9"), // to prove this does not get chosen + } + + endpoints := []*api.Endpoints{ + epstest.MakeEndpoints("unnamed", + []api.EndpointAddress{ + epstest.MakeEndpointAddress("1.2.3.4", "unnamed"), + }, + []api.EndpointPort{ + epstest.MakeEndpointPort("", 80), + }), + epstest.MakeEndpoints("unnamed2", + []api.EndpointAddress{ + epstest.MakeEndpointAddress("1.2.3.5", "unnamed"), + }, + []api.EndpointPort{ + epstest.MakeEndpointPort("", 80), + }), + epstest.MakeEndpoints("named", + []api.EndpointAddress{ + epstest.MakeEndpointAddress("1.2.3.6", "named"), + }, + []api.EndpointPort{ + epstest.MakeEndpointPort("p", 80), + epstest.MakeEndpointPort("q", 81), + }), + epstest.MakeEndpoints("no-endpoints", nil, nil), // to prove this does not get chosen + } + + storage, _, server := newStorageWithPods(t, []api.IPFamily{api.IPv4Protocol}, pods, endpoints) + defer server.Terminate(t) + defer storage.Store.DestroyFunc() + + ctx := genericapirequest.NewDefaultContext() + for _, name := range []string{"unnamed", "unnamed2", "no-endpoints"} { + _, err := storage.Create(ctx, + svctest.MakeService(name, + svctest.SetPorts( + svctest.MakeServicePort("", 93, intstr.FromInt(80), api.ProtocolTCP))), + rest.ValidateAllObjectFunc, &metav1.CreateOptions{}) + if err != nil { + t.Fatalf("unexpected error creating service %q: %v", name, err) + } + + } + _, err := storage.Create(ctx, + svctest.MakeService("named", + svctest.SetPorts( + svctest.MakeServicePort("p", 93, intstr.FromInt(80), api.ProtocolTCP), + svctest.MakeServicePort("q", 76, intstr.FromInt(81), api.ProtocolTCP))), + rest.ValidateAllObjectFunc, &metav1.CreateOptions{}) + if err != nil { + t.Fatalf("unexpected error creating service %q: %v", "named", err) + } + redirector := rest.Redirector(storage) + + cases := []struct { + query string + err bool + expect string + }{{ + query: "unnamed", + expect: "//1.2.3.4:80", + }, { + query: "unnamed:", + expect: "//1.2.3.4:80", + }, { + query: "unnamed:93", + expect: "//1.2.3.4:80", + }, { + query: "http:unnamed:", + expect: "http://1.2.3.4:80", + }, { + query: "http:unnamed:93", + expect: "http://1.2.3.4:80", + }, { + query: "unnamed:80", + err: true, + }, { + query: "unnamed2", + expect: "//1.2.3.5:80", + }, { + query: "named:p", + expect: "//1.2.3.6:80", + }, { + query: "named:q", + expect: "//1.2.3.6:81", + }, { + query: "named:93", + expect: "//1.2.3.6:80", + }, { + query: "named:76", + expect: "//1.2.3.6:81", + }, { + query: "http:named:p", + expect: "http://1.2.3.6:80", + }, { + query: "http:named:q", + expect: "http://1.2.3.6:81", + }, { + query: "named:bad", + err: true, + }, { + query: "no-endpoints", + err: true, + }, { + query: "non-existent", + err: true, + }} + for _, tc := range cases { + t.Run(tc.query, func(t *testing.T) { + location, _, err := redirector.ResourceLocation(ctx, tc.query) + if tc.err == false && err != nil { + t.Fatalf("unexpected error: %v", err) + } + if tc.err == true && err == nil { + t.Fatalf("unexpected success") + } + if !tc.err { + if location == nil { + t.Errorf("unexpected location: %v", location) + } + if e, a := tc.expect, location.String(); e != a { + t.Errorf("expected %q, but got %q", e, a) + } + } + }) + } +}