mirror of
https://github.com/k3s-io/kubernetes.git
synced 2025-09-09 13:12:20 +00:00
Merge pull request #5805 from liggitt/node_proxy
Improve ResourceLocation API, allow proxy to use authenticated transport
This commit is contained in:
@@ -20,28 +20,32 @@ import (
|
||||
"errors"
|
||||
"fmt"
|
||||
"net"
|
||||
"net/http"
|
||||
"net/url"
|
||||
"strconv"
|
||||
|
||||
"github.com/GoogleCloudPlatform/kubernetes/pkg/api"
|
||||
kerrors "github.com/GoogleCloudPlatform/kubernetes/pkg/api/errors"
|
||||
"github.com/GoogleCloudPlatform/kubernetes/pkg/api/rest"
|
||||
"github.com/GoogleCloudPlatform/kubernetes/pkg/api/validation"
|
||||
"github.com/GoogleCloudPlatform/kubernetes/pkg/client"
|
||||
"github.com/GoogleCloudPlatform/kubernetes/pkg/fields"
|
||||
"github.com/GoogleCloudPlatform/kubernetes/pkg/labels"
|
||||
"github.com/GoogleCloudPlatform/kubernetes/pkg/master/ports"
|
||||
"github.com/GoogleCloudPlatform/kubernetes/pkg/runtime"
|
||||
"github.com/GoogleCloudPlatform/kubernetes/pkg/watch"
|
||||
)
|
||||
|
||||
// REST adapts minion into apiserver's RESTStorage model.
|
||||
type REST struct {
|
||||
registry Registry
|
||||
registry Registry
|
||||
connection client.ConnectionInfoGetter
|
||||
}
|
||||
|
||||
// NewStorage returns a new rest.Storage implementation for minion.
|
||||
func NewStorage(m Registry) *REST {
|
||||
func NewStorage(m Registry, connection client.ConnectionInfoGetter) *REST {
|
||||
return &REST{
|
||||
registry: m,
|
||||
registry: m,
|
||||
connection: connection,
|
||||
}
|
||||
}
|
||||
|
||||
@@ -138,13 +142,29 @@ func (rs *REST) Watch(ctx api.Context, label labels.Selector, field fields.Selec
|
||||
return rs.registry.WatchMinions(ctx, label, field, resourceVersion)
|
||||
}
|
||||
|
||||
// Implement Redirector.
|
||||
var _ = rest.Redirector(&REST{})
|
||||
|
||||
// ResourceLocation returns a URL to which one can send traffic for the specified minion.
|
||||
func (rs *REST) ResourceLocation(ctx api.Context, id string) (string, error) {
|
||||
func (rs *REST) ResourceLocation(ctx api.Context, id string) (*url.URL, http.RoundTripper, error) {
|
||||
minion, err := rs.registry.GetMinion(ctx, id)
|
||||
if err != nil {
|
||||
return "", err
|
||||
return nil, nil, err
|
||||
}
|
||||
host := minion.Name
|
||||
// TODO: Minion webservers should be secure!
|
||||
return "http://" + net.JoinHostPort(host, strconv.Itoa(ports.KubeletPort)), nil
|
||||
|
||||
scheme, port, transport, err := rs.connection.GetConnectionInfo(host)
|
||||
if err != nil {
|
||||
return nil, nil, err
|
||||
}
|
||||
|
||||
return &url.URL{
|
||||
Scheme: scheme,
|
||||
Host: net.JoinHostPort(
|
||||
host,
|
||||
strconv.FormatUint(uint64(port), 10),
|
||||
),
|
||||
},
|
||||
transport,
|
||||
nil
|
||||
}
|
||||
|
@@ -17,6 +17,7 @@ limitations under the License.
|
||||
package minion
|
||||
|
||||
import (
|
||||
"net/http"
|
||||
"testing"
|
||||
|
||||
"github.com/GoogleCloudPlatform/kubernetes/pkg/api"
|
||||
@@ -27,8 +28,15 @@ import (
|
||||
"github.com/GoogleCloudPlatform/kubernetes/pkg/registry/registrytest"
|
||||
)
|
||||
|
||||
type FakeConnectionInfoGetter struct {
|
||||
}
|
||||
|
||||
func (FakeConnectionInfoGetter) GetConnectionInfo(host string) (string, uint, http.RoundTripper, error) {
|
||||
return "http", 12345, nil, nil
|
||||
}
|
||||
|
||||
func TestMinionRegistryREST(t *testing.T) {
|
||||
ms := NewStorage(registrytest.NewMinionRegistry([]string{"foo", "bar"}, api.NodeResources{}))
|
||||
ms := NewStorage(registrytest.NewMinionRegistry([]string{"foo", "bar"}, api.NodeResources{}), FakeConnectionInfoGetter{})
|
||||
ctx := api.NewContext()
|
||||
if obj, err := ms.Get(ctx, "foo"); err != nil || obj.(*api.Node).Name != "foo" {
|
||||
t.Errorf("missing expected object")
|
||||
@@ -88,7 +96,7 @@ func TestMinionRegistryREST(t *testing.T) {
|
||||
}
|
||||
|
||||
func TestMinionRegistryValidUpdate(t *testing.T) {
|
||||
storage := NewStorage(registrytest.NewMinionRegistry([]string{"foo", "bar"}, api.NodeResources{}))
|
||||
storage := NewStorage(registrytest.NewMinionRegistry([]string{"foo", "bar"}, api.NodeResources{}), FakeConnectionInfoGetter{})
|
||||
ctx := api.NewContext()
|
||||
obj, err := storage.Get(ctx, "foo")
|
||||
if err != nil {
|
||||
@@ -113,7 +121,7 @@ var (
|
||||
)
|
||||
|
||||
func TestMinionRegistryValidatesCreate(t *testing.T) {
|
||||
storage := NewStorage(registrytest.NewMinionRegistry([]string{"foo", "bar"}, api.NodeResources{}))
|
||||
storage := NewStorage(registrytest.NewMinionRegistry([]string{"foo", "bar"}, api.NodeResources{}), FakeConnectionInfoGetter{})
|
||||
ctx := api.NewContext()
|
||||
failureCases := map[string]api.Node{
|
||||
"zero-length Name": {
|
||||
@@ -156,7 +164,7 @@ func contains(nodes *api.NodeList, nodeID string) bool {
|
||||
|
||||
func TestCreate(t *testing.T) {
|
||||
registry := registrytest.NewMinionRegistry([]string{"foo", "bar"}, api.NodeResources{})
|
||||
test := resttest.New(t, NewStorage(registry), registry.SetError).ClusterScope()
|
||||
test := resttest.New(t, NewStorage(registry, FakeConnectionInfoGetter{}), registry.SetError).ClusterScope()
|
||||
test.TestCreate(
|
||||
// valid
|
||||
&api.Node{
|
||||
|
@@ -18,6 +18,8 @@ package etcd
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"net/http"
|
||||
"net/url"
|
||||
|
||||
"github.com/GoogleCloudPlatform/kubernetes/pkg/api"
|
||||
"github.com/GoogleCloudPlatform/kubernetes/pkg/api/errors"
|
||||
@@ -75,8 +77,11 @@ func NewStorage(h tools.EtcdHelper) (*REST, *BindingREST, *StatusREST) {
|
||||
return &REST{*store}, &BindingREST{store: store}, &StatusREST{store: &statusStore}
|
||||
}
|
||||
|
||||
// Implement Redirector.
|
||||
var _ = rest.Redirector(&REST{})
|
||||
|
||||
// ResourceLocation returns a pods location from its HostIP
|
||||
func (r *REST) ResourceLocation(ctx api.Context, name string) (string, error) {
|
||||
func (r *REST) ResourceLocation(ctx api.Context, name string) (*url.URL, http.RoundTripper, error) {
|
||||
return pod.ResourceLocation(r, ctx, name)
|
||||
}
|
||||
|
||||
|
@@ -683,13 +683,19 @@ func TestResourceLocation(t *testing.T) {
|
||||
storage = storage.WithPodStatus(cache)
|
||||
|
||||
redirector := rest.Redirector(storage)
|
||||
location, err := redirector.ResourceLocation(api.NewDefaultContext(), tc.query)
|
||||
location, _, err := redirector.ResourceLocation(api.NewDefaultContext(), tc.query)
|
||||
if err != nil {
|
||||
t.Errorf("Unexpected error: %v", err)
|
||||
}
|
||||
if location == nil {
|
||||
t.Errorf("Unexpected nil: %v", location)
|
||||
}
|
||||
|
||||
if location != tc.location {
|
||||
t.Errorf("Expected %v, but got %v", tc.location, location)
|
||||
if location.Scheme != "" {
|
||||
t.Errorf("Expected '%v', but got '%v'", "", location.Scheme)
|
||||
}
|
||||
if location.Host != tc.location {
|
||||
t.Errorf("Expected %v, but got %v", tc.location, location.Host)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@@ -18,6 +18,9 @@ package pod
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"net"
|
||||
"net/http"
|
||||
"net/url"
|
||||
"strings"
|
||||
|
||||
"github.com/GoogleCloudPlatform/kubernetes/pkg/api"
|
||||
@@ -151,12 +154,12 @@ type ResourceGetter interface {
|
||||
}
|
||||
|
||||
// ResourceLocation returns a URL to which one can send traffic for the specified pod.
|
||||
func ResourceLocation(getter ResourceGetter, ctx api.Context, id string) (string, error) {
|
||||
func ResourceLocation(getter ResourceGetter, ctx api.Context, id string) (*url.URL, http.RoundTripper, error) {
|
||||
// Allow ID as "podname" or "podname:port". If port is not specified,
|
||||
// try to use the first defined port on the pod.
|
||||
parts := strings.Split(id, ":")
|
||||
if len(parts) > 2 {
|
||||
return "", errors.NewBadRequest(fmt.Sprintf("invalid pod request %q", id))
|
||||
return nil, nil, errors.NewBadRequest(fmt.Sprintf("invalid pod request %q", id))
|
||||
}
|
||||
name := parts[0]
|
||||
port := ""
|
||||
@@ -167,11 +170,11 @@ func ResourceLocation(getter ResourceGetter, ctx api.Context, id string) (string
|
||||
|
||||
obj, err := getter.Get(ctx, name)
|
||||
if err != nil {
|
||||
return "", err
|
||||
return nil, nil, err
|
||||
}
|
||||
pod := obj.(*api.Pod)
|
||||
if pod == nil {
|
||||
return "", nil
|
||||
return nil, nil, nil
|
||||
}
|
||||
|
||||
// Try to figure out a port.
|
||||
@@ -186,9 +189,11 @@ func ResourceLocation(getter ResourceGetter, ctx api.Context, id string) (string
|
||||
|
||||
// We leave off the scheme ('http://') because we have no idea what sort of server
|
||||
// is listening at this endpoint.
|
||||
loc := pod.Status.PodIP
|
||||
if port != "" {
|
||||
loc += fmt.Sprintf(":%s", port)
|
||||
loc := &url.URL{}
|
||||
if port == "" {
|
||||
loc.Host = pod.Status.PodIP
|
||||
} else {
|
||||
loc.Host = net.JoinHostPort(pod.Status.PodIP, port)
|
||||
}
|
||||
return loc, nil
|
||||
return loc, nil, nil
|
||||
}
|
||||
|
@@ -20,6 +20,8 @@ import (
|
||||
"fmt"
|
||||
"math/rand"
|
||||
"net"
|
||||
"net/http"
|
||||
"net/url"
|
||||
"strconv"
|
||||
|
||||
"github.com/GoogleCloudPlatform/kubernetes/pkg/api"
|
||||
@@ -223,19 +225,24 @@ func (rs *REST) Update(ctx api.Context, obj runtime.Object) (runtime.Object, boo
|
||||
return out, false, err
|
||||
}
|
||||
|
||||
// Implement Redirector.
|
||||
var _ = rest.Redirector(&REST{})
|
||||
|
||||
// ResourceLocation returns a URL to which one can send traffic for the specified service.
|
||||
func (rs *REST) ResourceLocation(ctx api.Context, id string) (string, error) {
|
||||
func (rs *REST) ResourceLocation(ctx api.Context, id string) (*url.URL, http.RoundTripper, error) {
|
||||
eps, err := rs.registry.GetEndpoints(ctx, id)
|
||||
if err != nil {
|
||||
return "", err
|
||||
return nil, nil, err
|
||||
}
|
||||
if len(eps.Endpoints) == 0 {
|
||||
return "", fmt.Errorf("no endpoints available for %v", id)
|
||||
return nil, nil, fmt.Errorf("no endpoints available for %v", id)
|
||||
}
|
||||
// We leave off the scheme ('http://') because we have no idea what sort of server
|
||||
// is listening at this endpoint.
|
||||
ep := &eps.Endpoints[rand.Intn(len(eps.Endpoints))]
|
||||
return net.JoinHostPort(ep.IP, strconv.Itoa(ep.Port)), nil
|
||||
return &url.URL{
|
||||
Host: net.JoinHostPort(ep.IP, strconv.Itoa(ep.Port)),
|
||||
}, nil, nil
|
||||
}
|
||||
|
||||
func (rs *REST) getLoadbalancerName(ctx api.Context, service *api.Service) string {
|
||||
|
@@ -385,11 +385,14 @@ func TestServiceRegistryResourceLocation(t *testing.T) {
|
||||
},
|
||||
})
|
||||
redirector := rest.Redirector(storage)
|
||||
location, err := redirector.ResourceLocation(ctx, "foo")
|
||||
location, _, err := redirector.ResourceLocation(ctx, "foo")
|
||||
if err != nil {
|
||||
t.Errorf("Unexpected error: %v", err)
|
||||
}
|
||||
if e, a := "foo:80", location; e != a {
|
||||
if location == nil {
|
||||
t.Errorf("Unexpected nil: %v", location)
|
||||
}
|
||||
if e, a := "//foo:80", location.String(); e != a {
|
||||
t.Errorf("Expected %v, but got %v", e, a)
|
||||
}
|
||||
if e, a := "foo", registry.GottenID; e != a {
|
||||
@@ -398,7 +401,7 @@ func TestServiceRegistryResourceLocation(t *testing.T) {
|
||||
|
||||
// Test error path
|
||||
registry.Err = fmt.Errorf("fake error")
|
||||
if _, err = redirector.ResourceLocation(ctx, "foo"); err == nil {
|
||||
if _, _, err = redirector.ResourceLocation(ctx, "foo"); err == nil {
|
||||
t.Errorf("unexpected nil error")
|
||||
}
|
||||
}
|
||||
|
Reference in New Issue
Block a user