diff --git a/cmd/kube-apiserver/app/server.go b/cmd/kube-apiserver/app/server.go index cab9ffb6c06..854c23acef9 100644 --- a/cmd/kube-apiserver/app/server.go +++ b/cmd/kube-apiserver/app/server.go @@ -52,7 +52,6 @@ import ( "k8s.io/kubernetes/pkg/genericapiserver" "k8s.io/kubernetes/pkg/genericapiserver/authorizer" genericvalidation "k8s.io/kubernetes/pkg/genericapiserver/validation" - kubeletclient "k8s.io/kubernetes/pkg/kubelet/client" "k8s.io/kubernetes/pkg/master" "k8s.io/kubernetes/pkg/registry/cachesize" "k8s.io/kubernetes/pkg/serviceaccount" @@ -138,11 +137,6 @@ func Run(s *options.APIServer) error { // Proxying to pods and services is IP-based... don't expect to be able to verify the hostname proxyTLSClientConfig := &tls.Config{InsecureSkipVerify: true} - kubeletClient, err := kubeletclient.NewStaticKubeletClient(&s.KubeletConfig) - if err != nil { - glog.Fatalf("Failed to start kubelet client: %v", err) - } - if s.StorageConfig.DeserializationCacheSize == 0 { // When size of cache is not explicitly set, estimate its size based on // target memory usage. @@ -319,7 +313,7 @@ func Run(s *options.APIServer) error { EnableCoreControllers: true, DeleteCollectionWorkers: s.DeleteCollectionWorkers, EventTTL: s.EventTTL, - KubeletClient: kubeletClient, + KubeletClientConfig: s.KubeletConfig, EnableUISupport: true, EnableLogsSupport: true, diff --git a/pkg/kubelet/client/kubelet_client.go b/pkg/kubelet/client/kubelet_client.go index 8cb5937f679..f4744b2d7da 100644 --- a/pkg/kubelet/client/kubelet_client.go +++ b/pkg/kubelet/client/kubelet_client.go @@ -17,19 +17,17 @@ limitations under the License. package client import ( - "errors" - "fmt" "net" "net/http" - "strings" + "strconv" "time" "k8s.io/kubernetes/pkg/api" - "k8s.io/kubernetes/pkg/api/validation" "k8s.io/kubernetes/pkg/client/restclient" "k8s.io/kubernetes/pkg/client/transport" "k8s.io/kubernetes/pkg/types" utilnet "k8s.io/kubernetes/pkg/util/net" + nodeutil "k8s.io/kubernetes/pkg/util/node" ) type KubeletClientConfig struct { @@ -50,19 +48,17 @@ type KubeletClientConfig struct { Dial func(net, addr string) (net.Conn, error) } -// KubeletClient is an interface for all kubelet functionality -type KubeletClient interface { - GetRawConnectionInfo(ctx api.Context, nodeName types.NodeName) (scheme string, port uint, transport http.RoundTripper, err error) +// ConnectionInfo provides the information needed to connect to a kubelet +type ConnectionInfo struct { + Scheme string + Hostname string + Port string + Transport http.RoundTripper } +// ConnectionInfoGetter provides ConnectionInfo for the kubelet running on a named node type ConnectionInfoGetter interface { - GetConnectionInfo(ctx api.Context, nodeName types.NodeName) (scheme string, host string, port uint, transport http.RoundTripper, err error) -} - -// HTTPKubeletClient is the default implementation of KubeletHealthchecker, accesses the kubelet over HTTP. -type HTTPKubeletClient struct { - Client *http.Client - Config *KubeletClientConfig + GetConnectionInfo(ctx api.Context, nodeName types.NodeName) (*ConnectionInfo, error) } func MakeTransport(config *KubeletClientConfig) (http.RoundTripper, error) { @@ -82,43 +78,6 @@ func MakeTransport(config *KubeletClientConfig) (http.RoundTripper, error) { return transport.HTTPWrappersForConfig(config.transportConfig(), rt) } -// TODO: this structure is questionable, it should be using client.Config and overriding defaults. -func NewStaticKubeletClient(config *KubeletClientConfig) (KubeletClient, error) { - transport, err := MakeTransport(config) - if err != nil { - return nil, err - } - c := &http.Client{ - Transport: transport, - Timeout: config.HTTPTimeout, - } - return &HTTPKubeletClient{ - Client: c, - Config: config, - }, nil -} - -// In default HTTPKubeletClient ctx is unused. -func (c *HTTPKubeletClient) GetRawConnectionInfo(ctx api.Context, nodeName types.NodeName) (string, uint, http.RoundTripper, error) { - if errs := validation.ValidateNodeName(string(nodeName), false); len(errs) != 0 { - return "", 0, nil, fmt.Errorf("invalid node name: %s", strings.Join(errs, ";")) - } - scheme := "http" - if c.Config.EnableHttps { - scheme = "https" - } - return scheme, c.Config.Port, c.Client.Transport, nil -} - -// FakeKubeletClient is a fake implementation of KubeletClient which returns an error -// when called. It is useful to pass to the master in a test configuration with -// no kubelets. -type FakeKubeletClient struct{} - -func (c FakeKubeletClient) GetRawConnectionInfo(ctx api.Context, nodeName types.NodeName) (string, uint, http.RoundTripper, error) { - return "", 0, nil, errors.New("Not Implemented") -} - // transportConfig converts a client config to an appropriate transport config. func (c *KubeletClientConfig) transportConfig() *transport.Config { cfg := &transport.Config{ @@ -137,3 +96,73 @@ func (c *KubeletClientConfig) transportConfig() *transport.Config { } return cfg } + +// NodeGetter defines an interface for looking up a node by name +type NodeGetter interface { + Get(name string) (*api.Node, error) +} + +// NodeGetterFunc allows implementing NodeGetter with a function +type NodeGetterFunc func(name string) (*api.Node, error) + +func (f NodeGetterFunc) Get(name string) (*api.Node, error) { + return f(name) +} + +// NodeConnectionInfoGetter obtains connection info from the status of a Node API object +type NodeConnectionInfoGetter struct { + // nodes is used to look up Node objects + nodes NodeGetter + // scheme is the scheme to use to connect to all kubelets + scheme string + // defaultPort is the port to use if no Kubelet endpoint port is recorded in the node status + defaultPort int + // transport is the transport to use to send a request to all kubelets + transport http.RoundTripper +} + +func NewNodeConnectionInfoGetter(nodes NodeGetter, config KubeletClientConfig) (ConnectionInfoGetter, error) { + scheme := "http" + if config.EnableHttps { + scheme = "https" + } + + transport, err := MakeTransport(&config) + if err != nil { + return nil, err + } + + return &NodeConnectionInfoGetter{ + nodes: nodes, + scheme: scheme, + defaultPort: int(config.Port), + transport: transport, + }, nil +} + +func (k *NodeConnectionInfoGetter) GetConnectionInfo(ctx api.Context, nodeName types.NodeName) (*ConnectionInfo, error) { + node, err := k.nodes.Get(string(nodeName)) + if err != nil { + return nil, err + } + + // Find a kubelet-reported address, using preferred address type + hostIP, err := nodeutil.GetNodeHostIP(node) + if err != nil { + return nil, err + } + host := hostIP.String() + + // Use the kubelet-reported port, if present + port := int(node.Status.DaemonEndpoints.KubeletEndpoint.Port) + if port <= 0 { + port = k.defaultPort + } + + return &ConnectionInfo{ + Scheme: k.scheme, + Hostname: host, + Port: strconv.Itoa(port), + Transport: k.transport, + }, nil +} diff --git a/pkg/kubelet/client/kubelet_client_test.go b/pkg/kubelet/client/kubelet_client_test.go index b1a548746db..6ef2ad3913c 100644 --- a/pkg/kubelet/client/kubelet_client_test.go +++ b/pkg/kubelet/client/kubelet_client_test.go @@ -17,50 +17,17 @@ limitations under the License. package client import ( - "encoding/json" - "net/http/httptest" - "net/url" "testing" + "k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset/typed/core/unversioned" "k8s.io/kubernetes/pkg/client/restclient" - "k8s.io/kubernetes/pkg/probe" - utiltesting "k8s.io/kubernetes/pkg/util/testing" ) -func TestHTTPKubeletClient(t *testing.T) { - expectObj := probe.Success - body, err := json.Marshal(expectObj) - if err != nil { - t.Errorf("unexpected error: %v", err) - } +// Ensure a node client can be used as a NodeGetter. +// This allows anyone with a node client to easily construct a NewNodeConnectionInfoGetter. +var _ = NodeGetter(unversioned.NodeInterface(nil)) - fakeHandler := utiltesting.FakeHandler{ - StatusCode: 200, - ResponseBody: string(body), - } - testServer := httptest.NewServer(&fakeHandler) - defer testServer.Close() - - if _, err := url.Parse(testServer.URL); err != nil { - t.Errorf("unexpected error: %v", err) - } -} - -func TestNewKubeletClient(t *testing.T) { - config := &KubeletClientConfig{ - EnableHttps: false, - } - - client, err := NewStaticKubeletClient(config) - if err != nil { - t.Errorf("Error while trying to create a client: %v", err) - } - if client == nil { - t.Error("client is nil.") - } -} - -func TestNewKubeletClientTLSInvalid(t *testing.T) { +func TestMakeTransportInvalid(t *testing.T) { config := &KubeletClientConfig{ EnableHttps: true, //Invalid certificate and key path @@ -71,16 +38,16 @@ func TestNewKubeletClientTLSInvalid(t *testing.T) { }, } - client, err := NewStaticKubeletClient(config) + rt, err := MakeTransport(config) if err == nil { t.Errorf("Expected an error") } - if client != nil { - t.Error("client should be nil as we provided invalid cert file") + if rt != nil { + t.Error("rt should be nil as we provided invalid cert file") } } -func TestNewKubeletClientTLSValid(t *testing.T) { +func TestMakeTransportValid(t *testing.T) { config := &KubeletClientConfig{ Port: 1234, EnableHttps: true, @@ -93,34 +60,11 @@ func TestNewKubeletClientTLSValid(t *testing.T) { }, } - client, err := NewStaticKubeletClient(config) + rt, err := MakeTransport(config) if err != nil { t.Errorf("Not expecting an error #%v", err) } - if client == nil { - t.Error("client should not be nil") - } - - { - scheme, port, transport, err := client.GetRawConnectionInfo(nil, "foo") - if err != nil { - t.Errorf("Error getting info: %v", err) - } - if scheme != "https" { - t.Errorf("Expected https, got %s", scheme) - } - if port != 1234 { - t.Errorf("Expected 1234, got %d", port) - } - if transport == nil { - t.Errorf("Expected transport, got nil") - } - } - - { - _, _, _, err := client.GetRawConnectionInfo(nil, "foo bar") - if err == nil { - t.Errorf("Expected error getting connection info for invalid node name, got none") - } + if rt == nil { + t.Error("rt should not be nil") } } diff --git a/pkg/master/master.go b/pkg/master/master.go index 06d434b6527..dbeef921e17 100644 --- a/pkg/master/master.go +++ b/pkg/master/master.go @@ -20,6 +20,7 @@ import ( "fmt" "net" "net/url" + "reflect" "strconv" "strings" "sync" @@ -100,7 +101,7 @@ type Config struct { EndpointReconcilerConfig EndpointReconcilerConfig DeleteCollectionWorkers int EventTTL time.Duration - KubeletClient kubeletclient.KubeletClient + KubeletClientConfig kubeletclient.KubeletClientConfig // genericapiserver.RESTStorageProviders provides RESTStorage building methods keyed by groupName RESTStorageProviders map[string]genericapiserver.RESTStorageProvider // Used to start and monitor tunneling @@ -179,10 +180,10 @@ func (c *Config) SkipComplete() completedConfig { // New returns a new instance of Master from the given config. // Certain config fields will be set to a default value if unset. // Certain config fields must be specified, including: -// KubeletClient +// KubeletClientConfig func (c completedConfig) New() (*Master, error) { - if c.KubeletClient == nil { - return nil, fmt.Errorf("Master.New() called with config.KubeletClient == nil") + if reflect.DeepEqual(c.KubeletClientConfig, kubeletclient.KubeletClientConfig{}) { + return nil, fmt.Errorf("Master.New() called with empty config.KubeletClientConfig") } s, err := c.Config.GenericConfig.SkipComplete().New() // completion is done in Complete, no need for a second time @@ -220,7 +221,7 @@ func (c completedConfig) New() (*Master, error) { legacyRESTStorageProvider := corerest.LegacyRESTStorageProvider{ StorageFactory: c.StorageFactory, ProxyTransport: s.ProxyTransport, - KubeletClient: c.KubeletClient, + KubeletClientConfig: c.KubeletClientConfig, EventTTL: c.EventTTL, ServiceClusterIPRange: c.GenericConfig.ServiceClusterIPRange, ServiceNodePortRange: c.GenericConfig.ServiceNodePortRange, diff --git a/pkg/master/master_test.go b/pkg/master/master_test.go index c33f7c80ac1..c697959e79a 100644 --- a/pkg/master/master_test.go +++ b/pkg/master/master_test.go @@ -49,7 +49,7 @@ import ( "k8s.io/kubernetes/pkg/client/restclient" openapigen "k8s.io/kubernetes/pkg/generated/openapi" "k8s.io/kubernetes/pkg/genericapiserver" - "k8s.io/kubernetes/pkg/kubelet/client" + kubeletclient "k8s.io/kubernetes/pkg/kubelet/client" ipallocator "k8s.io/kubernetes/pkg/registry/core/service/ipallocator" "k8s.io/kubernetes/pkg/registry/registrytest" "k8s.io/kubernetes/pkg/runtime" @@ -89,7 +89,6 @@ func setUp(t *testing.T) (*Master, *etcdtesting.EtcdTestServer, Config, *assert. config.GenericConfig.LoopbackClientConfig = &restclient.Config{APIPath: "/api", ContentConfig: restclient.ContentConfig{NegotiatedSerializer: api.Codecs}} config.GenericConfig.APIResourceConfigSource = DefaultAPIResourceConfigSource() config.GenericConfig.PublicAddress = net.ParseIP("192.168.10.4") - config.KubeletClient = client.FakeKubeletClient{} config.GenericConfig.LegacyAPIGroupPrefixes = sets.NewString("/api") config.GenericConfig.APIGroupPrefix = "/apis" config.GenericConfig.APIResourceConfigSource = DefaultAPIResourceConfigSource() @@ -98,6 +97,7 @@ func setUp(t *testing.T) (*Master, *etcdtesting.EtcdTestServer, Config, *assert. config.GenericConfig.RequestContextMapper = api.NewRequestContextMapper() config.GenericConfig.LoopbackClientConfig = &restclient.Config{APIPath: "/api", ContentConfig: restclient.ContentConfig{NegotiatedSerializer: api.Codecs}} config.EnableCoreControllers = false + config.KubeletClientConfig = kubeletclient.KubeletClientConfig{Port: 10250} master, err := config.Complete().New() if err != nil { diff --git a/pkg/registry/core/node/etcd/etcd.go b/pkg/registry/core/node/etcd/etcd.go index 64893e34982..ed5e0bae7ad 100644 --- a/pkg/registry/core/node/etcd/etcd.go +++ b/pkg/registry/core/node/etcd/etcd.go @@ -30,8 +30,6 @@ import ( "k8s.io/kubernetes/pkg/registry/generic" "k8s.io/kubernetes/pkg/registry/generic/registry" "k8s.io/kubernetes/pkg/runtime" - "k8s.io/kubernetes/pkg/types" - nodeutil "k8s.io/kubernetes/pkg/util/node" ) // NodeStorage includes storage for nodes and all sub resources @@ -39,11 +37,13 @@ type NodeStorage struct { Node *REST Status *StatusREST Proxy *noderest.ProxyREST + + KubeletConnectionInfo client.ConnectionInfoGetter } type REST struct { *registry.Store - connection client.KubeletClient + connection client.ConnectionInfoGetter proxyTransport http.RoundTripper } @@ -67,7 +67,7 @@ func (r *StatusREST) Update(ctx api.Context, name string, objInfo rest.UpdatedOb } // NewStorage returns a NodeStorage object that will work against nodes. -func NewStorage(opts generic.RESTOptions, connection client.KubeletClient, proxyTransport http.RoundTripper) NodeStorage { +func NewStorage(opts generic.RESTOptions, kubeletClientConfig client.KubeletClientConfig, proxyTransport http.RoundTripper) (*NodeStorage, error) { prefix := "/" + opts.ResourcePrefix newListFunc := func() runtime.Object { return &api.NodeList{} } @@ -109,13 +109,36 @@ func NewStorage(opts generic.RESTOptions, connection client.KubeletClient, proxy statusStore := *store statusStore.UpdateStrategy = node.StatusStrategy - nodeREST := &REST{store, connection, proxyTransport} + // Set up REST handlers + nodeREST := &REST{Store: store, proxyTransport: proxyTransport} + statusREST := &StatusREST{store: &statusStore} + proxyREST := &noderest.ProxyREST{Store: store, ProxyTransport: proxyTransport} - return NodeStorage{ - Node: nodeREST, - Status: &StatusREST{store: &statusStore}, - Proxy: &noderest.ProxyREST{Store: store, Connection: client.ConnectionInfoGetter(nodeREST), ProxyTransport: proxyTransport}, + // Build a NodeGetter that looks up nodes using the REST handler + nodeGetter := client.NodeGetterFunc(func(nodeName string) (*api.Node, error) { + obj, err := nodeREST.Get(api.NewContext(), nodeName) + if err != nil { + return nil, err + } + node, ok := obj.(*api.Node) + if !ok { + return nil, fmt.Errorf("unexpected type %T", obj) + } + return node, nil + }) + connectionInfoGetter, err := client.NewNodeConnectionInfoGetter(nodeGetter, kubeletClientConfig) + if err != nil { + return nil, err } + nodeREST.connection = connectionInfoGetter + proxyREST.Connection = connectionInfoGetter + + return &NodeStorage{ + Node: nodeREST, + Status: statusREST, + Proxy: proxyREST, + KubeletConnectionInfo: connectionInfoGetter, + }, nil } // Implement Redirector. @@ -123,36 +146,5 @@ var _ = rest.Redirector(&REST{}) // ResourceLocation returns a URL to which one can send traffic for the specified node. func (r *REST) ResourceLocation(ctx api.Context, id string) (*url.URL, http.RoundTripper, error) { - return node.ResourceLocation(r, r, r.proxyTransport, ctx, id) -} - -var _ = client.ConnectionInfoGetter(&REST{}) - -func (r *REST) GetConnectionInfo(ctx api.Context, nodeName types.NodeName) (string, string, uint, http.RoundTripper, error) { - scheme, port, transport, err := r.connection.GetRawConnectionInfo(ctx, nodeName) - if err != nil { - return "", "", 0, nil, err - } - - // We probably shouldn't care about context when looking for Node object. - obj, err := r.Get(ctx, string(nodeName)) - if err != nil { - return "", "", 0, nil, err - } - node, ok := obj.(*api.Node) - if !ok { - return "", "", 0, nil, fmt.Errorf("Unexpected object type: %#v", node) - } - - hostIP, err := nodeutil.GetNodeHostIP(node) - if err != nil { - return "", "", 0, nil, err - } - host := hostIP.String() - - daemonPort := int(node.Status.DaemonEndpoints.KubeletEndpoint.Port) - if daemonPort > 0 { - return scheme, host, uint(daemonPort), transport, nil - } - return scheme, host, port, transport, nil + return node.ResourceLocation(r, r.connection, r.proxyTransport, ctx, id) } diff --git a/pkg/registry/core/node/etcd/etcd_test.go b/pkg/registry/core/node/etcd/etcd_test.go index 35200e3a59a..d49d8f88f83 100644 --- a/pkg/registry/core/node/etcd/etcd_test.go +++ b/pkg/registry/core/node/etcd/etcd_test.go @@ -17,31 +17,26 @@ limitations under the License. package etcd import ( - "net/http" "testing" "k8s.io/kubernetes/pkg/api" "k8s.io/kubernetes/pkg/api/resource" "k8s.io/kubernetes/pkg/fields" + kubeletclient "k8s.io/kubernetes/pkg/kubelet/client" "k8s.io/kubernetes/pkg/labels" "k8s.io/kubernetes/pkg/registry/generic" "k8s.io/kubernetes/pkg/registry/registrytest" "k8s.io/kubernetes/pkg/runtime" etcdtesting "k8s.io/kubernetes/pkg/storage/etcd/testing" - "k8s.io/kubernetes/pkg/types" ) -type fakeConnectionInfoGetter struct { -} - -func (fakeConnectionInfoGetter) GetRawConnectionInfo(ctx api.Context, nodeName types.NodeName) (string, uint, http.RoundTripper, error) { - return "http", 12345, nil, nil -} - func newStorage(t *testing.T) (*REST, *etcdtesting.EtcdTestServer) { etcdStorage, server := registrytest.NewEtcdStorage(t, "") restOptions := generic.RESTOptions{StorageConfig: etcdStorage, Decorator: generic.UndecoratedStorage, DeleteCollectionWorkers: 1} - storage := NewStorage(restOptions, fakeConnectionInfoGetter{}, nil) + storage, err := NewStorage(restOptions, kubeletclient.KubeletClientConfig{}, nil) + if err != nil { + t.Fatal(err) + } return storage.Node, server } diff --git a/pkg/registry/core/node/strategy.go b/pkg/registry/core/node/strategy.go index c838617aac4..79061f60a4b 100644 --- a/pkg/registry/core/node/strategy.go +++ b/pkg/registry/core/node/strategy.go @@ -21,7 +21,6 @@ import ( "net" "net/http" "net/url" - "strconv" "k8s.io/kubernetes/pkg/api" "k8s.io/kubernetes/pkg/api/errors" @@ -29,13 +28,11 @@ import ( "k8s.io/kubernetes/pkg/fields" "k8s.io/kubernetes/pkg/kubelet/client" "k8s.io/kubernetes/pkg/labels" - "k8s.io/kubernetes/pkg/master/ports" "k8s.io/kubernetes/pkg/registry/generic" "k8s.io/kubernetes/pkg/runtime" pkgstorage "k8s.io/kubernetes/pkg/storage" "k8s.io/kubernetes/pkg/types" utilnet "k8s.io/kubernetes/pkg/util/net" - nodeutil "k8s.io/kubernetes/pkg/util/node" "k8s.io/kubernetes/pkg/util/validation/field" ) @@ -176,39 +173,23 @@ func ResourceLocation(getter ResourceGetter, connection client.ConnectionInfoGet return nil, nil, errors.NewBadRequest(fmt.Sprintf("invalid node request %q", id)) } - nodeObj, err := getter.Get(ctx, name) + info, err := connection.GetConnectionInfo(ctx, types.NodeName(name)) if err != nil { return nil, nil, err } - node := nodeObj.(*api.Node) - hostIP, err := nodeutil.GetNodeHostIP(node) - if err != nil { - return nil, nil, err - } - host := hostIP.String() // We check if we want to get a default Kubelet's transport. It happens if either: - // - no port is specified in request (Kubelet's port is default), - // - we're using Port stored as a DaemonEndpoint and requested port is a Kubelet's port stored in the DaemonEndpoint, - // - there's no information in the API about DaemonEndpoint (legacy cluster) and requested port is equal to ports.KubeletPort (cluster-wide config) - kubeletPort := node.Status.DaemonEndpoints.KubeletEndpoint.Port - if kubeletPort == 0 { - kubeletPort = ports.KubeletPort - } - if portReq == "" || strconv.Itoa(int(kubeletPort)) == portReq { - scheme, host, port, kubeletTransport, err := connection.GetConnectionInfo(ctx, types.NodeName(node.Name)) - if err != nil { - return nil, nil, err - } + // - no port is specified in request (Kubelet's port is default) + // - the requested port matches the kubelet port for this node + if portReq == "" || portReq == info.Port { return &url.URL{ - Scheme: scheme, - Host: net.JoinHostPort( - host, - strconv.FormatUint(uint64(port), 10), - ), + Scheme: info.Scheme, + Host: net.JoinHostPort(info.Hostname, info.Port), }, - kubeletTransport, + info.Transport, nil } - return &url.URL{Scheme: schemeReq, Host: net.JoinHostPort(host, portReq)}, proxyTransport, nil + + // Otherwise, return the requested scheme and port, and the proxy transport + return &url.URL{Scheme: schemeReq, Host: net.JoinHostPort(info.Hostname, portReq)}, proxyTransport, nil } diff --git a/pkg/registry/core/pod/strategy.go b/pkg/registry/core/pod/strategy.go index 0f6376fb2d1..80130ce9c63 100644 --- a/pkg/registry/core/pod/strategy.go +++ b/pkg/registry/core/pod/strategy.go @@ -307,7 +307,7 @@ func LogLocation( // If pod has not been assigned a host, return an empty location return nil, nil, nil } - nodeScheme, nodeHost, nodePort, nodeTransport, err := connInfo.GetConnectionInfo(ctx, nodeName) + nodeInfo, err := connInfo.GetConnectionInfo(ctx, nodeName) if err != nil { return nil, nil, err } @@ -334,12 +334,12 @@ func LogLocation( params.Add("limitBytes", strconv.FormatInt(*opts.LimitBytes, 10)) } loc := &url.URL{ - Scheme: nodeScheme, - Host: fmt.Sprintf("%s:%d", nodeHost, nodePort), + Scheme: nodeInfo.Scheme, + Host: net.JoinHostPort(nodeInfo.Hostname, nodeInfo.Port), Path: fmt.Sprintf("/containerLogs/%s/%s/%s", pod.Namespace, pod.Name, container), RawQuery: params.Encode(), } - return loc, nodeTransport, nil + return loc, nodeInfo.Transport, nil } func podHasContainerWithName(pod *api.Pod, containerName string) bool { @@ -458,7 +458,7 @@ func streamLocation( // If pod has not been assigned a host, return an empty location return nil, nil, errors.NewBadRequest(fmt.Sprintf("pod %s does not have a host assigned", name)) } - nodeScheme, nodeHost, nodePort, nodeTransport, err := connInfo.GetConnectionInfo(ctx, nodeName) + nodeInfo, err := connInfo.GetConnectionInfo(ctx, nodeName) if err != nil { return nil, nil, err } @@ -467,12 +467,12 @@ func streamLocation( return nil, nil, err } loc := &url.URL{ - Scheme: nodeScheme, - Host: fmt.Sprintf("%s:%d", nodeHost, nodePort), + Scheme: nodeInfo.Scheme, + Host: net.JoinHostPort(nodeInfo.Hostname, nodeInfo.Port), Path: fmt.Sprintf("/%s/%s/%s/%s", path, pod.Namespace, pod.Name, container), RawQuery: params.Encode(), } - return loc, nodeTransport, nil + return loc, nodeInfo.Transport, nil } // PortForwardLocation returns the port-forward URL for a pod. @@ -492,14 +492,14 @@ func PortForwardLocation( // If pod has not been assigned a host, return an empty location return nil, nil, errors.NewBadRequest(fmt.Sprintf("pod %s does not have a host assigned", name)) } - nodeScheme, nodeHost, nodePort, nodeTransport, err := connInfo.GetConnectionInfo(ctx, nodeName) + nodeInfo, err := connInfo.GetConnectionInfo(ctx, nodeName) if err != nil { return nil, nil, err } loc := &url.URL{ - Scheme: nodeScheme, - Host: fmt.Sprintf("%s:%d", nodeHost, nodePort), + Scheme: nodeInfo.Scheme, + Host: net.JoinHostPort(nodeInfo.Hostname, nodeInfo.Port), Path: fmt.Sprintf("/portForward/%s/%s", pod.Namespace, pod.Name), } - return loc, nodeTransport, nil + return loc, nodeInfo.Transport, nil } diff --git a/pkg/registry/core/rest/storage_core.go b/pkg/registry/core/rest/storage_core.go index 65135b85a94..d67a1918cd6 100644 --- a/pkg/registry/core/rest/storage_core.go +++ b/pkg/registry/core/rest/storage_core.go @@ -64,9 +64,9 @@ import ( type LegacyRESTStorageProvider struct { StorageFactory genericapiserver.StorageFactory // Used for custom proxy dialing, and proxy TLS options - ProxyTransport http.RoundTripper - KubeletClient kubeletclient.KubeletClient - EventTTL time.Duration + ProxyTransport http.RoundTripper + KubeletClientConfig kubeletclient.KubeletClientConfig + EventTTL time.Duration // ServiceClusterIPRange is used to build cluster IPs for discovery. ServiceClusterIPRange *net.IPNet @@ -135,12 +135,15 @@ func (c LegacyRESTStorageProvider) NewLegacyRESTStorage(restOptionsGetter generi endpointsStorage := endpointsetcd.NewREST(restOptionsGetter(api.Resource("endpoints"))) restStorage.EndpointRegistry = endpoint.NewRegistry(endpointsStorage) - nodeStorage := nodeetcd.NewStorage(restOptionsGetter(api.Resource("nodes")), c.KubeletClient, c.ProxyTransport) + nodeStorage, err := nodeetcd.NewStorage(restOptionsGetter(api.Resource("nodes")), c.KubeletClientConfig, c.ProxyTransport) + if err != nil { + return LegacyRESTStorage{}, genericapiserver.APIGroupInfo{}, err + } restStorage.NodeRegistry = node.NewRegistry(nodeStorage.Node) podStorage := podetcd.NewStorage( restOptionsGetter(api.Resource("pods")), - kubeletclient.ConnectionInfoGetter(nodeStorage.Node), + nodeStorage.KubeletConnectionInfo, c.ProxyTransport, podDisruptionClient, ) diff --git a/test/integration/framework/master_utils.go b/test/integration/framework/master_utils.go index d39ecf377e3..0665c36a42e 100644 --- a/test/integration/framework/master_utils.go +++ b/test/integration/framework/master_utils.go @@ -357,7 +357,7 @@ func NewMasterConfig() *master.Config { StorageFactory: storageFactory, EnableCoreControllers: true, EnableWatchCache: true, - KubeletClient: kubeletclient.FakeKubeletClient{}, + KubeletClientConfig: kubeletclient.KubeletClientConfig{Port: 10250}, } }