mirror of
https://github.com/k3s-io/kubernetes.git
synced 2025-07-22 19:31:44 +00:00
Svc REST: Move ResourceLocation() to 'inner' layer
Part of the de-layering effort. Also move the test.
This commit is contained in:
parent
7887c4c8fc
commit
d1b83bad67
@ -261,12 +261,14 @@ func (c LegacyRESTStorageProvider) NewLegacyRESTStorage(restOptionsGetter generi
|
||||
serviceIPAllocators[secondaryServiceClusterIPAllocator.IPFamily()] = secondaryServiceClusterIPAllocator
|
||||
}
|
||||
|
||||
serviceRESTStorage, serviceStatusStorage, err := servicestore.NewGenericREST(
|
||||
serviceRESTStorage, serviceStatusStorage, _, err := servicestore.NewGenericREST(
|
||||
restOptionsGetter,
|
||||
serviceClusterIPAllocator.IPFamily(),
|
||||
serviceIPAllocators,
|
||||
serviceNodePortAllocator,
|
||||
endpointsStorage)
|
||||
endpointsStorage,
|
||||
podStorage.Pod,
|
||||
c.ProxyTransport)
|
||||
if err != nil {
|
||||
return LegacyRESTStorage{}, genericapiserver.APIGroupInfo{}, err
|
||||
}
|
||||
|
@ -19,17 +19,14 @@ package storage
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"math/rand"
|
||||
"net"
|
||||
"net/http"
|
||||
"net/url"
|
||||
"strconv"
|
||||
|
||||
"k8s.io/apimachinery/pkg/api/errors"
|
||||
metainternalversion "k8s.io/apimachinery/pkg/apis/meta/internalversion"
|
||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||
"k8s.io/apimachinery/pkg/runtime"
|
||||
utilnet "k8s.io/apimachinery/pkg/util/net"
|
||||
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
|
||||
"k8s.io/apimachinery/pkg/util/validation/field"
|
||||
"k8s.io/apimachinery/pkg/watch"
|
||||
@ -84,6 +81,7 @@ type ServiceStorage interface {
|
||||
rest.Watcher
|
||||
rest.StorageVersionProvider
|
||||
rest.ResetFieldsStrategy
|
||||
rest.Redirector
|
||||
}
|
||||
|
||||
// NewREST returns a wrapper around the underlying generic storage and performs
|
||||
@ -360,74 +358,7 @@ var _ = rest.Redirector(&REST{})
|
||||
|
||||
// ResourceLocation returns a URL to which one can send traffic for the specified service.
|
||||
func (rs *REST) ResourceLocation(ctx context.Context, id string) (*url.URL, http.RoundTripper, error) {
|
||||
// Allow ID as "svcname", "svcname:port", or "scheme:svcname:port".
|
||||
svcScheme, svcName, portStr, valid := utilnet.SplitSchemeNamePort(id)
|
||||
if !valid {
|
||||
return nil, nil, errors.NewBadRequest(fmt.Sprintf("invalid service request %q", id))
|
||||
}
|
||||
|
||||
// If a port *number* was specified, find the corresponding service port name
|
||||
if portNum, err := strconv.ParseInt(portStr, 10, 64); err == nil {
|
||||
obj, err := rs.services.Get(ctx, svcName, &metav1.GetOptions{})
|
||||
if err != nil {
|
||||
return nil, nil, err
|
||||
}
|
||||
svc := obj.(*api.Service)
|
||||
found := false
|
||||
for _, svcPort := range svc.Spec.Ports {
|
||||
if int64(svcPort.Port) == portNum {
|
||||
// use the declared port's name
|
||||
portStr = svcPort.Name
|
||||
found = true
|
||||
break
|
||||
}
|
||||
}
|
||||
if !found {
|
||||
return nil, nil, errors.NewServiceUnavailable(fmt.Sprintf("no service port %d found for service %q", portNum, svcName))
|
||||
}
|
||||
}
|
||||
|
||||
obj, err := rs.endpoints.Get(ctx, svcName, &metav1.GetOptions{})
|
||||
if err != nil {
|
||||
return nil, nil, err
|
||||
}
|
||||
eps := obj.(*api.Endpoints)
|
||||
if len(eps.Subsets) == 0 {
|
||||
return nil, nil, errors.NewServiceUnavailable(fmt.Sprintf("no endpoints available for service %q", svcName))
|
||||
}
|
||||
// Pick a random Subset to start searching from.
|
||||
ssSeed := rand.Intn(len(eps.Subsets))
|
||||
// Find a Subset that has the port.
|
||||
for ssi := 0; ssi < len(eps.Subsets); ssi++ {
|
||||
ss := &eps.Subsets[(ssSeed+ssi)%len(eps.Subsets)]
|
||||
if len(ss.Addresses) == 0 {
|
||||
continue
|
||||
}
|
||||
for i := range ss.Ports {
|
||||
if ss.Ports[i].Name == portStr {
|
||||
addrSeed := rand.Intn(len(ss.Addresses))
|
||||
// This is a little wonky, but it's expensive to test for the presence of a Pod
|
||||
// So we repeatedly try at random and validate it, this means that for an invalid
|
||||
// service with a lot of endpoints we're going to potentially make a lot of calls,
|
||||
// but in the expected case we'll only make one.
|
||||
for try := 0; try < len(ss.Addresses); try++ {
|
||||
addr := ss.Addresses[(addrSeed+try)%len(ss.Addresses)]
|
||||
if err := isValidAddress(ctx, &addr, rs.pods); err != nil {
|
||||
utilruntime.HandleError(fmt.Errorf("Address %v isn't valid (%v)", addr, err))
|
||||
continue
|
||||
}
|
||||
ip := addr.IP
|
||||
port := int(ss.Ports[i].Port)
|
||||
return &url.URL{
|
||||
Scheme: svcScheme,
|
||||
Host: net.JoinHostPort(ip, strconv.Itoa(port)),
|
||||
}, rs.proxyTransport, nil
|
||||
}
|
||||
utilruntime.HandleError(fmt.Errorf("Failed to find a valid address, skipping subset: %v", ss))
|
||||
}
|
||||
}
|
||||
}
|
||||
return nil, nil, errors.NewServiceUnavailable(fmt.Sprintf("no endpoints available for service %q", id))
|
||||
return rs.services.ResourceLocation(ctx, id)
|
||||
}
|
||||
|
||||
func (r *REST) ConvertToTable(ctx context.Context, object runtime.Object, tableOptions runtime.Object) (*metav1.Table, error) {
|
||||
|
@ -20,6 +20,8 @@ import (
|
||||
"context"
|
||||
"fmt"
|
||||
"net"
|
||||
"net/http"
|
||||
"net/url"
|
||||
"reflect"
|
||||
"sort"
|
||||
"testing"
|
||||
@ -40,7 +42,6 @@ import (
|
||||
"k8s.io/apiserver/pkg/util/dryrun"
|
||||
utilfeature "k8s.io/apiserver/pkg/util/feature"
|
||||
featuregatetesting "k8s.io/component-base/featuregate/testing"
|
||||
epstest "k8s.io/kubernetes/pkg/api/endpoints/testing"
|
||||
svctest "k8s.io/kubernetes/pkg/api/service/testing"
|
||||
api "k8s.io/kubernetes/pkg/apis/core"
|
||||
"k8s.io/kubernetes/pkg/features"
|
||||
@ -178,14 +179,16 @@ func (s *serviceStorage) StorageVersion() runtime.GroupVersioner {
|
||||
|
||||
// GetResetFields implements rest.ResetFieldsStrategy
|
||||
func (s *serviceStorage) GetResetFields() map[fieldpath.APIVersion]*fieldpath.Set {
|
||||
//FIXME: should panic?
|
||||
return nil
|
||||
}
|
||||
|
||||
func NewTestREST(t *testing.T, ipFamilies []api.IPFamily) (*REST, *etcd3testing.EtcdTestServer) {
|
||||
return NewTestRESTWithPods(t, nil, nil, ipFamilies)
|
||||
// ResourceLocation implements rest.Redirector
|
||||
func (s *serviceStorage) ResourceLocation(ctx context.Context, id string) (remoteLocation *url.URL, transport http.RoundTripper, err error) {
|
||||
panic("not implemented")
|
||||
}
|
||||
|
||||
func NewTestRESTWithPods(t *testing.T, endpoints []*api.Endpoints, pods []api.Pod, ipFamilies []api.IPFamily) (*REST, *etcd3testing.EtcdTestServer) {
|
||||
func NewTestREST(t *testing.T, ipFamilies []api.IPFamily) (*REST, *etcd3testing.EtcdTestServer) {
|
||||
etcdStorage, server := registrytest.NewEtcdStorage(t, "")
|
||||
|
||||
podStorage, err := podstore.NewStorage(generic.RESTOptions{
|
||||
@ -197,13 +200,7 @@ func NewTestRESTWithPods(t *testing.T, endpoints []*api.Endpoints, pods []api.Po
|
||||
if err != nil {
|
||||
t.Fatalf("unexpected error from REST storage: %v", err)
|
||||
}
|
||||
ctx := genericapirequest.NewDefaultContext()
|
||||
for ix := range pods {
|
||||
key, _ := podStorage.Pod.KeyFunc(ctx, pods[ix].Name)
|
||||
if err := podStorage.Pod.Storage.Create(ctx, key, &pods[ix], nil, 0, false); err != nil {
|
||||
t.Fatalf("Couldn't create pod: %v", err)
|
||||
}
|
||||
}
|
||||
|
||||
endpointStorage, err := endpointstore.NewREST(generic.RESTOptions{
|
||||
StorageConfig: etcdStorage.ForResource(schema.GroupResource{Resource: "endpoints"}),
|
||||
Decorator: generic.UndecoratedStorage,
|
||||
@ -212,12 +209,6 @@ func NewTestRESTWithPods(t *testing.T, endpoints []*api.Endpoints, pods []api.Po
|
||||
if err != nil {
|
||||
t.Fatalf("unexpected error from REST storage: %v", err)
|
||||
}
|
||||
for ix := range endpoints {
|
||||
key, _ := endpointStorage.KeyFunc(ctx, endpoints[ix].Name)
|
||||
if err := endpointStorage.Store.Storage.Create(ctx, key, endpoints[ix], nil, 0, false); err != nil {
|
||||
t.Fatalf("Couldn't create endpoint: %v", err)
|
||||
}
|
||||
}
|
||||
|
||||
var rPrimary ipallocator.Interface
|
||||
var rSecondary ipallocator.Interface
|
||||
@ -281,7 +272,7 @@ func newInnerREST(t *testing.T, etcdStorage *storagebackend.ConfigForResource, i
|
||||
ResourcePrefix: "endpoints",
|
||||
})
|
||||
|
||||
inner, _, err := NewGenericREST(restOptions, api.IPv4Protocol, ipAllocs, portAlloc, endpoints)
|
||||
inner, _, _, err := NewGenericREST(restOptions, api.IPv4Protocol, ipAllocs, portAlloc, endpoints, nil, nil)
|
||||
if err != nil {
|
||||
t.Fatalf("unexpected error from REST storage: %v", err)
|
||||
}
|
||||
@ -660,137 +651,6 @@ func makePod(name string, ips ...string) api.Pod {
|
||||
return p
|
||||
}
|
||||
|
||||
func TestServiceRegistryResourceLocation(t *testing.T) {
|
||||
pods := []api.Pod{
|
||||
makePod("unnamed", "1.2.3.4", "1.2.3.5"),
|
||||
makePod("named", "1.2.3.6", "1.2.3.7"),
|
||||
makePod("no-endpoints", "9.9.9.9"), // to prove this does not get chosen
|
||||
}
|
||||
|
||||
endpoints := []*api.Endpoints{
|
||||
epstest.MakeEndpoints("unnamed",
|
||||
[]api.EndpointAddress{
|
||||
epstest.MakeEndpointAddress("1.2.3.4", "unnamed"),
|
||||
},
|
||||
[]api.EndpointPort{
|
||||
epstest.MakeEndpointPort("", 80),
|
||||
}),
|
||||
epstest.MakeEndpoints("unnamed2",
|
||||
[]api.EndpointAddress{
|
||||
epstest.MakeEndpointAddress("1.2.3.5", "unnamed"),
|
||||
},
|
||||
[]api.EndpointPort{
|
||||
epstest.MakeEndpointPort("", 80),
|
||||
}),
|
||||
epstest.MakeEndpoints("named",
|
||||
[]api.EndpointAddress{
|
||||
epstest.MakeEndpointAddress("1.2.3.6", "named"),
|
||||
},
|
||||
[]api.EndpointPort{
|
||||
epstest.MakeEndpointPort("p", 80),
|
||||
epstest.MakeEndpointPort("q", 81),
|
||||
}),
|
||||
epstest.MakeEndpoints("no-endpoints", nil, nil), // to prove this does not get chosen
|
||||
}
|
||||
|
||||
storage, server := NewTestRESTWithPods(t, endpoints, pods, []api.IPFamily{api.IPv4Protocol})
|
||||
defer server.Terminate(t)
|
||||
|
||||
ctx := genericapirequest.NewDefaultContext()
|
||||
for _, name := range []string{"unnamed", "unnamed2", "no-endpoints"} {
|
||||
_, err := storage.Create(ctx,
|
||||
svctest.MakeService(name, svctest.SetPorts(
|
||||
svctest.MakeServicePort("", 93, intstr.FromInt(80), api.ProtocolTCP))),
|
||||
rest.ValidateAllObjectFunc, &metav1.CreateOptions{})
|
||||
if err != nil {
|
||||
t.Fatalf("unexpected error creating service %q: %v", name, err)
|
||||
}
|
||||
|
||||
}
|
||||
_, err := storage.Create(ctx,
|
||||
svctest.MakeService("named", svctest.SetPorts(
|
||||
svctest.MakeServicePort("p", 93, intstr.FromInt(80), api.ProtocolTCP),
|
||||
svctest.MakeServicePort("q", 76, intstr.FromInt(81), api.ProtocolTCP))),
|
||||
rest.ValidateAllObjectFunc, &metav1.CreateOptions{})
|
||||
if err != nil {
|
||||
t.Fatalf("unexpected error creating service %q: %v", "named", err)
|
||||
}
|
||||
redirector := rest.Redirector(storage)
|
||||
|
||||
cases := []struct {
|
||||
query string
|
||||
err bool
|
||||
expect string
|
||||
}{{
|
||||
query: "unnamed",
|
||||
expect: "//1.2.3.4:80",
|
||||
}, {
|
||||
query: "unnamed:",
|
||||
expect: "//1.2.3.4:80",
|
||||
}, {
|
||||
query: "unnamed:93",
|
||||
expect: "//1.2.3.4:80",
|
||||
}, {
|
||||
query: "http:unnamed:",
|
||||
expect: "http://1.2.3.4:80",
|
||||
}, {
|
||||
query: "http:unnamed:93",
|
||||
expect: "http://1.2.3.4:80",
|
||||
}, {
|
||||
query: "unnamed:80",
|
||||
err: true,
|
||||
}, {
|
||||
query: "unnamed2",
|
||||
expect: "//1.2.3.5:80",
|
||||
}, {
|
||||
query: "named:p",
|
||||
expect: "//1.2.3.6:80",
|
||||
}, {
|
||||
query: "named:q",
|
||||
expect: "//1.2.3.6:81",
|
||||
}, {
|
||||
query: "named:93",
|
||||
expect: "//1.2.3.6:80",
|
||||
}, {
|
||||
query: "named:76",
|
||||
expect: "//1.2.3.6:81",
|
||||
}, {
|
||||
query: "http:named:p",
|
||||
expect: "http://1.2.3.6:80",
|
||||
}, {
|
||||
query: "http:named:q",
|
||||
expect: "http://1.2.3.6:81",
|
||||
}, {
|
||||
query: "named:bad",
|
||||
err: true,
|
||||
}, {
|
||||
query: "no-endpoints",
|
||||
err: true,
|
||||
}, {
|
||||
query: "non-existent",
|
||||
err: true,
|
||||
}}
|
||||
for _, tc := range cases {
|
||||
t.Run(tc.query, func(t *testing.T) {
|
||||
location, _, err := redirector.ResourceLocation(ctx, tc.query)
|
||||
if tc.err == false && err != nil {
|
||||
t.Fatalf("unexpected error: %v", err)
|
||||
}
|
||||
if tc.err == true && err == nil {
|
||||
t.Fatalf("unexpected success")
|
||||
}
|
||||
if !tc.err {
|
||||
if location == nil {
|
||||
t.Errorf("unexpected location: %v", location)
|
||||
}
|
||||
if e, a := tc.expect, location.String(); e != a {
|
||||
t.Errorf("expected %q, but got %q", e, a)
|
||||
}
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
func TestServiceRegistryList(t *testing.T) {
|
||||
ctx := genericapirequest.NewDefaultContext()
|
||||
storage, server := NewTestREST(t, []api.IPFamily{api.IPv4Protocol})
|
||||
|
@ -18,16 +18,24 @@ package storage
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"math/rand"
|
||||
"net"
|
||||
"net/http"
|
||||
"net/url"
|
||||
"strconv"
|
||||
|
||||
"k8s.io/apimachinery/pkg/api/errors"
|
||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||
"k8s.io/apimachinery/pkg/runtime"
|
||||
utilnet "k8s.io/apimachinery/pkg/util/net"
|
||||
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
|
||||
genericapirequest "k8s.io/apiserver/pkg/endpoints/request"
|
||||
"k8s.io/apiserver/pkg/registry/generic"
|
||||
genericregistry "k8s.io/apiserver/pkg/registry/generic/registry"
|
||||
"k8s.io/apiserver/pkg/registry/rest"
|
||||
"k8s.io/apiserver/pkg/util/dryrun"
|
||||
utilfeature "k8s.io/apiserver/pkg/util/feature"
|
||||
"k8s.io/cri-api/pkg/errors"
|
||||
"k8s.io/klog/v2"
|
||||
api "k8s.io/kubernetes/pkg/apis/core"
|
||||
"k8s.io/kubernetes/pkg/features"
|
||||
@ -48,12 +56,18 @@ type EndpointsStorage interface {
|
||||
rest.GracefulDeleter
|
||||
}
|
||||
|
||||
type PodStorage interface {
|
||||
rest.Getter
|
||||
}
|
||||
|
||||
type GenericREST struct {
|
||||
*genericregistry.Store
|
||||
primaryIPFamily api.IPFamily
|
||||
secondaryIPFamily api.IPFamily
|
||||
alloc RESTAllocStuff
|
||||
endpoints EndpointsStorage
|
||||
pods PodStorage
|
||||
proxyTransport http.RoundTripper
|
||||
}
|
||||
|
||||
// NewGenericREST returns a RESTStorage object that will work against services.
|
||||
@ -62,7 +76,9 @@ func NewGenericREST(
|
||||
serviceIPFamily api.IPFamily,
|
||||
ipAllocs map[api.IPFamily]ipallocator.Interface,
|
||||
portAlloc portallocator.Interface,
|
||||
endpoints EndpointsStorage) (*GenericREST, *StatusREST, error) {
|
||||
endpoints EndpointsStorage,
|
||||
pods PodStorage,
|
||||
proxyTransport http.RoundTripper) (*GenericREST, *StatusREST, *svcreg.ProxyREST, error) {
|
||||
|
||||
strategy, _ := svcreg.StrategyForServiceCIDRs(ipAllocs[serviceIPFamily].CIDR(), len(ipAllocs) > 1)
|
||||
|
||||
@ -81,7 +97,7 @@ func NewGenericREST(
|
||||
}
|
||||
options := &generic.StoreOptions{RESTOptions: optsGetter}
|
||||
if err := store.CompleteWithOptions(options); err != nil {
|
||||
return nil, nil, err
|
||||
return nil, nil, nil, err
|
||||
}
|
||||
|
||||
statusStore := *store
|
||||
@ -100,13 +116,15 @@ func NewGenericREST(
|
||||
secondaryIPFamily: secondaryIPFamily,
|
||||
alloc: makeAlloc(serviceIPFamily, ipAllocs, portAlloc),
|
||||
endpoints: endpoints,
|
||||
pods: pods,
|
||||
proxyTransport: proxyTransport,
|
||||
}
|
||||
store.Decorator = genericStore.defaultOnRead
|
||||
store.AfterDelete = genericStore.afterDelete
|
||||
store.BeginCreate = genericStore.beginCreate
|
||||
store.BeginUpdate = genericStore.beginUpdate
|
||||
|
||||
return genericStore, &StatusREST{store: &statusStore}, nil
|
||||
return genericStore, &StatusREST{store: &statusStore}, &svcreg.ProxyREST{Redirector: genericStore, ProxyTransport: proxyTransport}, nil
|
||||
}
|
||||
|
||||
// otherFamily returns the non-selected IPFamily. This assumes the input is
|
||||
@ -352,3 +370,79 @@ func (r *GenericREST) beginUpdate(ctx context.Context, obj, oldObj runtime.Objec
|
||||
|
||||
return finish, nil
|
||||
}
|
||||
|
||||
// Implement Redirector.
|
||||
var _ rest.Redirector = &GenericREST{}
|
||||
|
||||
// ResourceLocation returns a URL to which one can send traffic for the specified service.
|
||||
func (r *GenericREST) ResourceLocation(ctx context.Context, id string) (*url.URL, http.RoundTripper, error) {
|
||||
// Allow ID as "svcname", "svcname:port", or "scheme:svcname:port".
|
||||
svcScheme, svcName, portStr, valid := utilnet.SplitSchemeNamePort(id)
|
||||
if !valid {
|
||||
return nil, nil, errors.NewBadRequest(fmt.Sprintf("invalid service request %q", id))
|
||||
}
|
||||
|
||||
// If a port *number* was specified, find the corresponding service port name
|
||||
if portNum, err := strconv.ParseInt(portStr, 10, 64); err == nil {
|
||||
obj, err := r.Get(ctx, svcName, &metav1.GetOptions{})
|
||||
if err != nil {
|
||||
return nil, nil, err
|
||||
}
|
||||
svc := obj.(*api.Service)
|
||||
found := false
|
||||
for _, svcPort := range svc.Spec.Ports {
|
||||
if int64(svcPort.Port) == portNum {
|
||||
// use the declared port's name
|
||||
portStr = svcPort.Name
|
||||
found = true
|
||||
break
|
||||
}
|
||||
}
|
||||
if !found {
|
||||
return nil, nil, errors.NewServiceUnavailable(fmt.Sprintf("no service port %d found for service %q", portNum, svcName))
|
||||
}
|
||||
}
|
||||
|
||||
obj, err := r.endpoints.Get(ctx, svcName, &metav1.GetOptions{})
|
||||
if err != nil {
|
||||
return nil, nil, err
|
||||
}
|
||||
eps := obj.(*api.Endpoints)
|
||||
if len(eps.Subsets) == 0 {
|
||||
return nil, nil, errors.NewServiceUnavailable(fmt.Sprintf("no endpoints available for service %q", svcName))
|
||||
}
|
||||
// Pick a random Subset to start searching from.
|
||||
ssSeed := rand.Intn(len(eps.Subsets))
|
||||
// Find a Subset that has the port.
|
||||
for ssi := 0; ssi < len(eps.Subsets); ssi++ {
|
||||
ss := &eps.Subsets[(ssSeed+ssi)%len(eps.Subsets)]
|
||||
if len(ss.Addresses) == 0 {
|
||||
continue
|
||||
}
|
||||
for i := range ss.Ports {
|
||||
if ss.Ports[i].Name == portStr {
|
||||
addrSeed := rand.Intn(len(ss.Addresses))
|
||||
// This is a little wonky, but it's expensive to test for the presence of a Pod
|
||||
// So we repeatedly try at random and validate it, this means that for an invalid
|
||||
// service with a lot of endpoints we're going to potentially make a lot of calls,
|
||||
// but in the expected case we'll only make one.
|
||||
for try := 0; try < len(ss.Addresses); try++ {
|
||||
addr := ss.Addresses[(addrSeed+try)%len(ss.Addresses)]
|
||||
// TODO(thockin): do we really need this check?
|
||||
if err := isValidAddress(ctx, &addr, r.pods); err != nil {
|
||||
utilruntime.HandleError(fmt.Errorf("Address %v isn't valid (%v)", addr, err))
|
||||
continue
|
||||
}
|
||||
ip := addr.IP
|
||||
port := int(ss.Ports[i].Port)
|
||||
return &url.URL{
|
||||
Scheme: svcScheme,
|
||||
Host: net.JoinHostPort(ip, strconv.Itoa(port)),
|
||||
}, r.proxyTransport, nil
|
||||
}
|
||||
utilruntime.HandleError(fmt.Errorf("Failed to find a valid address, skipping subset: %v", ss))
|
||||
}
|
||||
}
|
||||
}
|
||||
return nil, nil, errors.NewServiceUnavailable(fmt.Sprintf("no endpoints available for service %q", id))
|
||||
}
|
||||
|
@ -17,7 +17,6 @@ limitations under the License.
|
||||
package storage
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"net"
|
||||
"reflect"
|
||||
@ -39,9 +38,12 @@ import (
|
||||
etcd3testing "k8s.io/apiserver/pkg/storage/etcd3/testing"
|
||||
utilfeature "k8s.io/apiserver/pkg/util/feature"
|
||||
featuregatetesting "k8s.io/component-base/featuregate/testing"
|
||||
epstest "k8s.io/kubernetes/pkg/api/endpoints/testing"
|
||||
svctest "k8s.io/kubernetes/pkg/api/service/testing"
|
||||
api "k8s.io/kubernetes/pkg/apis/core"
|
||||
"k8s.io/kubernetes/pkg/features"
|
||||
endpointstore "k8s.io/kubernetes/pkg/registry/core/endpoint/storage"
|
||||
podstore "k8s.io/kubernetes/pkg/registry/core/pod/storage"
|
||||
"k8s.io/kubernetes/pkg/registry/core/service/ipallocator"
|
||||
"k8s.io/kubernetes/pkg/registry/core/service/portallocator"
|
||||
"k8s.io/kubernetes/pkg/registry/registrytest"
|
||||
@ -64,16 +66,6 @@ func makePortAllocator(ports machineryutilnet.PortRange) portallocator.Interface
|
||||
return al
|
||||
}
|
||||
|
||||
type fakeEndpoints struct{}
|
||||
|
||||
func (fakeEndpoints) Delete(_ context.Context, _ string, _ rest.ValidateObjectFunc, _ *metav1.DeleteOptions) (runtime.Object, bool, error) {
|
||||
return nil, false, nil
|
||||
}
|
||||
|
||||
func (fakeEndpoints) Get(_ context.Context, _ string, _ *metav1.GetOptions) (runtime.Object, error) {
|
||||
return nil, nil
|
||||
}
|
||||
|
||||
func ipIsAllocated(t *testing.T, alloc ipallocator.Interface, ipstr string) bool {
|
||||
t.Helper()
|
||||
ip := netutils.ParseIPSloppy(ipstr)
|
||||
@ -94,6 +86,10 @@ func portIsAllocated(t *testing.T, alloc portallocator.Interface, port int32) bo
|
||||
}
|
||||
|
||||
func newStorage(t *testing.T, ipFamilies []api.IPFamily) (*GenericREST, *StatusREST, *etcd3testing.EtcdTestServer) {
|
||||
return newStorageWithPods(t, ipFamilies, nil, nil)
|
||||
}
|
||||
|
||||
func newStorageWithPods(t *testing.T, ipFamilies []api.IPFamily, pods []api.Pod, endpoints []*api.Endpoints) (*GenericREST, *StatusREST, *etcd3testing.EtcdTestServer) {
|
||||
etcdStorage, server := registrytest.NewEtcdStorage(t, "")
|
||||
restOptions := generic.RESTOptions{
|
||||
StorageConfig: etcdStorage.ForResource(schema.GroupResource{Resource: "services"}),
|
||||
@ -118,7 +114,45 @@ func newStorage(t *testing.T, ipFamilies []api.IPFamily) (*GenericREST, *StatusR
|
||||
|
||||
portAlloc := makePortAllocator(*(machineryutilnet.ParsePortRangeOrDie("30000-32767")))
|
||||
|
||||
serviceStorage, statusStorage, err := NewGenericREST(restOptions, ipFamilies[0], ipAllocs, portAlloc, fakeEndpoints{})
|
||||
// Not all tests will specify pods and endpoints.
|
||||
podStorage, err := podstore.NewStorage(generic.RESTOptions{
|
||||
StorageConfig: etcdStorage,
|
||||
Decorator: generic.UndecoratedStorage,
|
||||
DeleteCollectionWorkers: 3,
|
||||
ResourcePrefix: "pods",
|
||||
}, nil, nil, nil)
|
||||
if err != nil {
|
||||
t.Fatalf("unexpected error from REST storage: %v", err)
|
||||
}
|
||||
if pods != nil && len(pods) > 0 {
|
||||
ctx := genericapirequest.NewDefaultContext()
|
||||
for ix := range pods {
|
||||
key, _ := podStorage.Pod.KeyFunc(ctx, pods[ix].Name)
|
||||
if err := podStorage.Pod.Storage.Create(ctx, key, &pods[ix], nil, 0, false); err != nil {
|
||||
t.Fatalf("Couldn't create pod: %v", err)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
endpointsStorage, err := endpointstore.NewREST(generic.RESTOptions{
|
||||
StorageConfig: etcdStorage,
|
||||
Decorator: generic.UndecoratedStorage,
|
||||
ResourcePrefix: "endpoints",
|
||||
})
|
||||
if err != nil {
|
||||
t.Fatalf("unexpected error from REST storage: %v", err)
|
||||
}
|
||||
if endpoints != nil && len(endpoints) > 0 {
|
||||
ctx := genericapirequest.NewDefaultContext()
|
||||
for ix := range endpoints {
|
||||
key, _ := endpointsStorage.KeyFunc(ctx, endpoints[ix].Name)
|
||||
if err := endpointsStorage.Store.Storage.Create(ctx, key, endpoints[ix], nil, 0, false); err != nil {
|
||||
t.Fatalf("Couldn't create endpoint: %v", err)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
serviceStorage, statusStorage, _, err := NewGenericREST(restOptions, ipFamilies[0], ipAllocs, portAlloc, endpointsStorage, podStorage.Pod, nil)
|
||||
if err != nil {
|
||||
t.Fatalf("unexpected error from REST storage: %v", err)
|
||||
}
|
||||
@ -7229,3 +7263,137 @@ func TestFeatureExternalTrafficPolicy(t *testing.T) {
|
||||
// lbsourceranges, externalname, itp, PublishNotReadyAddresses,
|
||||
// ipfamilypolicy and list,
|
||||
// AllocateLoadBalancerNodePorts, LoadBalancerClass, status
|
||||
|
||||
func TestServiceRegistryResourceLocation(t *testing.T) {
|
||||
pods := []api.Pod{
|
||||
makePod("unnamed", "1.2.3.4", "1.2.3.5"),
|
||||
makePod("named", "1.2.3.6", "1.2.3.7"),
|
||||
makePod("no-endpoints", "9.9.9.9"), // to prove this does not get chosen
|
||||
}
|
||||
|
||||
endpoints := []*api.Endpoints{
|
||||
epstest.MakeEndpoints("unnamed",
|
||||
[]api.EndpointAddress{
|
||||
epstest.MakeEndpointAddress("1.2.3.4", "unnamed"),
|
||||
},
|
||||
[]api.EndpointPort{
|
||||
epstest.MakeEndpointPort("", 80),
|
||||
}),
|
||||
epstest.MakeEndpoints("unnamed2",
|
||||
[]api.EndpointAddress{
|
||||
epstest.MakeEndpointAddress("1.2.3.5", "unnamed"),
|
||||
},
|
||||
[]api.EndpointPort{
|
||||
epstest.MakeEndpointPort("", 80),
|
||||
}),
|
||||
epstest.MakeEndpoints("named",
|
||||
[]api.EndpointAddress{
|
||||
epstest.MakeEndpointAddress("1.2.3.6", "named"),
|
||||
},
|
||||
[]api.EndpointPort{
|
||||
epstest.MakeEndpointPort("p", 80),
|
||||
epstest.MakeEndpointPort("q", 81),
|
||||
}),
|
||||
epstest.MakeEndpoints("no-endpoints", nil, nil), // to prove this does not get chosen
|
||||
}
|
||||
|
||||
storage, _, server := newStorageWithPods(t, []api.IPFamily{api.IPv4Protocol}, pods, endpoints)
|
||||
defer server.Terminate(t)
|
||||
defer storage.Store.DestroyFunc()
|
||||
|
||||
ctx := genericapirequest.NewDefaultContext()
|
||||
for _, name := range []string{"unnamed", "unnamed2", "no-endpoints"} {
|
||||
_, err := storage.Create(ctx,
|
||||
svctest.MakeService(name,
|
||||
svctest.SetPorts(
|
||||
svctest.MakeServicePort("", 93, intstr.FromInt(80), api.ProtocolTCP))),
|
||||
rest.ValidateAllObjectFunc, &metav1.CreateOptions{})
|
||||
if err != nil {
|
||||
t.Fatalf("unexpected error creating service %q: %v", name, err)
|
||||
}
|
||||
|
||||
}
|
||||
_, err := storage.Create(ctx,
|
||||
svctest.MakeService("named",
|
||||
svctest.SetPorts(
|
||||
svctest.MakeServicePort("p", 93, intstr.FromInt(80), api.ProtocolTCP),
|
||||
svctest.MakeServicePort("q", 76, intstr.FromInt(81), api.ProtocolTCP))),
|
||||
rest.ValidateAllObjectFunc, &metav1.CreateOptions{})
|
||||
if err != nil {
|
||||
t.Fatalf("unexpected error creating service %q: %v", "named", err)
|
||||
}
|
||||
redirector := rest.Redirector(storage)
|
||||
|
||||
cases := []struct {
|
||||
query string
|
||||
err bool
|
||||
expect string
|
||||
}{{
|
||||
query: "unnamed",
|
||||
expect: "//1.2.3.4:80",
|
||||
}, {
|
||||
query: "unnamed:",
|
||||
expect: "//1.2.3.4:80",
|
||||
}, {
|
||||
query: "unnamed:93",
|
||||
expect: "//1.2.3.4:80",
|
||||
}, {
|
||||
query: "http:unnamed:",
|
||||
expect: "http://1.2.3.4:80",
|
||||
}, {
|
||||
query: "http:unnamed:93",
|
||||
expect: "http://1.2.3.4:80",
|
||||
}, {
|
||||
query: "unnamed:80",
|
||||
err: true,
|
||||
}, {
|
||||
query: "unnamed2",
|
||||
expect: "//1.2.3.5:80",
|
||||
}, {
|
||||
query: "named:p",
|
||||
expect: "//1.2.3.6:80",
|
||||
}, {
|
||||
query: "named:q",
|
||||
expect: "//1.2.3.6:81",
|
||||
}, {
|
||||
query: "named:93",
|
||||
expect: "//1.2.3.6:80",
|
||||
}, {
|
||||
query: "named:76",
|
||||
expect: "//1.2.3.6:81",
|
||||
}, {
|
||||
query: "http:named:p",
|
||||
expect: "http://1.2.3.6:80",
|
||||
}, {
|
||||
query: "http:named:q",
|
||||
expect: "http://1.2.3.6:81",
|
||||
}, {
|
||||
query: "named:bad",
|
||||
err: true,
|
||||
}, {
|
||||
query: "no-endpoints",
|
||||
err: true,
|
||||
}, {
|
||||
query: "non-existent",
|
||||
err: true,
|
||||
}}
|
||||
for _, tc := range cases {
|
||||
t.Run(tc.query, func(t *testing.T) {
|
||||
location, _, err := redirector.ResourceLocation(ctx, tc.query)
|
||||
if tc.err == false && err != nil {
|
||||
t.Fatalf("unexpected error: %v", err)
|
||||
}
|
||||
if tc.err == true && err == nil {
|
||||
t.Fatalf("unexpected success")
|
||||
}
|
||||
if !tc.err {
|
||||
if location == nil {
|
||||
t.Errorf("unexpected location: %v", location)
|
||||
}
|
||||
if e, a := tc.expect, location.String(); e != a {
|
||||
t.Errorf("expected %q, but got %q", e, a)
|
||||
}
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
||||
|
Loading…
Reference in New Issue
Block a user