mirror of
https://github.com/k3s-io/kubernetes.git
synced 2026-01-06 07:57:35 +00:00
Merge pull request #34474 from liggitt/connection-info-refactor
Automatic merge from submit-queue Remove static kubelet client, refactor ConnectionInfoGetter Follow up to https://github.com/kubernetes/kubernetes/pull/33718 * Collapses the multi-valued return to a `ConnectionInfo` struct * Removes the "raw" connection info method and interface, since it was only used in a single non-test location (by the "real" connection info method) * Disentangles the node REST object from being a ConnectionInfoProvider itself by extracting an implementation of ConnectionInfoProvider that takes a node (using a provided NodeGetter) and determines ConnectionInfo * Plumbs the KubeletClientConfig to the point where we construct the helper object that combines the config and the node lookup. I anticipate adding a preference order for choosing an address type in https://github.com/kubernetes/kubernetes/pull/34259
This commit is contained in:
@@ -30,8 +30,6 @@ import (
|
||||
"k8s.io/kubernetes/pkg/registry/generic"
|
||||
"k8s.io/kubernetes/pkg/registry/generic/registry"
|
||||
"k8s.io/kubernetes/pkg/runtime"
|
||||
"k8s.io/kubernetes/pkg/types"
|
||||
nodeutil "k8s.io/kubernetes/pkg/util/node"
|
||||
)
|
||||
|
||||
// NodeStorage includes storage for nodes and all sub resources
|
||||
@@ -39,11 +37,13 @@ type NodeStorage struct {
|
||||
Node *REST
|
||||
Status *StatusREST
|
||||
Proxy *noderest.ProxyREST
|
||||
|
||||
KubeletConnectionInfo client.ConnectionInfoGetter
|
||||
}
|
||||
|
||||
type REST struct {
|
||||
*registry.Store
|
||||
connection client.KubeletClient
|
||||
connection client.ConnectionInfoGetter
|
||||
proxyTransport http.RoundTripper
|
||||
}
|
||||
|
||||
@@ -67,7 +67,7 @@ func (r *StatusREST) Update(ctx api.Context, name string, objInfo rest.UpdatedOb
|
||||
}
|
||||
|
||||
// NewStorage returns a NodeStorage object that will work against nodes.
|
||||
func NewStorage(opts generic.RESTOptions, connection client.KubeletClient, proxyTransport http.RoundTripper) NodeStorage {
|
||||
func NewStorage(opts generic.RESTOptions, kubeletClientConfig client.KubeletClientConfig, proxyTransport http.RoundTripper) (*NodeStorage, error) {
|
||||
prefix := "/" + opts.ResourcePrefix
|
||||
|
||||
newListFunc := func() runtime.Object { return &api.NodeList{} }
|
||||
@@ -109,13 +109,36 @@ func NewStorage(opts generic.RESTOptions, connection client.KubeletClient, proxy
|
||||
statusStore := *store
|
||||
statusStore.UpdateStrategy = node.StatusStrategy
|
||||
|
||||
nodeREST := &REST{store, connection, proxyTransport}
|
||||
// Set up REST handlers
|
||||
nodeREST := &REST{Store: store, proxyTransport: proxyTransport}
|
||||
statusREST := &StatusREST{store: &statusStore}
|
||||
proxyREST := &noderest.ProxyREST{Store: store, ProxyTransport: proxyTransport}
|
||||
|
||||
return NodeStorage{
|
||||
Node: nodeREST,
|
||||
Status: &StatusREST{store: &statusStore},
|
||||
Proxy: &noderest.ProxyREST{Store: store, Connection: client.ConnectionInfoGetter(nodeREST), ProxyTransport: proxyTransport},
|
||||
// Build a NodeGetter that looks up nodes using the REST handler
|
||||
nodeGetter := client.NodeGetterFunc(func(nodeName string) (*api.Node, error) {
|
||||
obj, err := nodeREST.Get(api.NewContext(), nodeName)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
node, ok := obj.(*api.Node)
|
||||
if !ok {
|
||||
return nil, fmt.Errorf("unexpected type %T", obj)
|
||||
}
|
||||
return node, nil
|
||||
})
|
||||
connectionInfoGetter, err := client.NewNodeConnectionInfoGetter(nodeGetter, kubeletClientConfig)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
nodeREST.connection = connectionInfoGetter
|
||||
proxyREST.Connection = connectionInfoGetter
|
||||
|
||||
return &NodeStorage{
|
||||
Node: nodeREST,
|
||||
Status: statusREST,
|
||||
Proxy: proxyREST,
|
||||
KubeletConnectionInfo: connectionInfoGetter,
|
||||
}, nil
|
||||
}
|
||||
|
||||
// Implement Redirector.
|
||||
@@ -123,36 +146,5 @@ var _ = rest.Redirector(&REST{})
|
||||
|
||||
// ResourceLocation returns a URL to which one can send traffic for the specified node.
|
||||
func (r *REST) ResourceLocation(ctx api.Context, id string) (*url.URL, http.RoundTripper, error) {
|
||||
return node.ResourceLocation(r, r, r.proxyTransport, ctx, id)
|
||||
}
|
||||
|
||||
var _ = client.ConnectionInfoGetter(&REST{})
|
||||
|
||||
func (r *REST) GetConnectionInfo(ctx api.Context, nodeName types.NodeName) (string, string, uint, http.RoundTripper, error) {
|
||||
scheme, port, transport, err := r.connection.GetRawConnectionInfo(ctx, nodeName)
|
||||
if err != nil {
|
||||
return "", "", 0, nil, err
|
||||
}
|
||||
|
||||
// We probably shouldn't care about context when looking for Node object.
|
||||
obj, err := r.Get(ctx, string(nodeName))
|
||||
if err != nil {
|
||||
return "", "", 0, nil, err
|
||||
}
|
||||
node, ok := obj.(*api.Node)
|
||||
if !ok {
|
||||
return "", "", 0, nil, fmt.Errorf("Unexpected object type: %#v", node)
|
||||
}
|
||||
|
||||
hostIP, err := nodeutil.GetNodeHostIP(node)
|
||||
if err != nil {
|
||||
return "", "", 0, nil, err
|
||||
}
|
||||
host := hostIP.String()
|
||||
|
||||
daemonPort := int(node.Status.DaemonEndpoints.KubeletEndpoint.Port)
|
||||
if daemonPort > 0 {
|
||||
return scheme, host, uint(daemonPort), transport, nil
|
||||
}
|
||||
return scheme, host, port, transport, nil
|
||||
return node.ResourceLocation(r, r.connection, r.proxyTransport, ctx, id)
|
||||
}
|
||||
|
||||
@@ -17,31 +17,26 @@ limitations under the License.
|
||||
package etcd
|
||||
|
||||
import (
|
||||
"net/http"
|
||||
"testing"
|
||||
|
||||
"k8s.io/kubernetes/pkg/api"
|
||||
"k8s.io/kubernetes/pkg/api/resource"
|
||||
"k8s.io/kubernetes/pkg/fields"
|
||||
kubeletclient "k8s.io/kubernetes/pkg/kubelet/client"
|
||||
"k8s.io/kubernetes/pkg/labels"
|
||||
"k8s.io/kubernetes/pkg/registry/generic"
|
||||
"k8s.io/kubernetes/pkg/registry/registrytest"
|
||||
"k8s.io/kubernetes/pkg/runtime"
|
||||
etcdtesting "k8s.io/kubernetes/pkg/storage/etcd/testing"
|
||||
"k8s.io/kubernetes/pkg/types"
|
||||
)
|
||||
|
||||
type fakeConnectionInfoGetter struct {
|
||||
}
|
||||
|
||||
func (fakeConnectionInfoGetter) GetRawConnectionInfo(ctx api.Context, nodeName types.NodeName) (string, uint, http.RoundTripper, error) {
|
||||
return "http", 12345, nil, nil
|
||||
}
|
||||
|
||||
func newStorage(t *testing.T) (*REST, *etcdtesting.EtcdTestServer) {
|
||||
etcdStorage, server := registrytest.NewEtcdStorage(t, "")
|
||||
restOptions := generic.RESTOptions{StorageConfig: etcdStorage, Decorator: generic.UndecoratedStorage, DeleteCollectionWorkers: 1}
|
||||
storage := NewStorage(restOptions, fakeConnectionInfoGetter{}, nil)
|
||||
storage, err := NewStorage(restOptions, kubeletclient.KubeletClientConfig{}, nil)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
return storage.Node, server
|
||||
}
|
||||
|
||||
|
||||
@@ -21,7 +21,6 @@ import (
|
||||
"net"
|
||||
"net/http"
|
||||
"net/url"
|
||||
"strconv"
|
||||
|
||||
"k8s.io/kubernetes/pkg/api"
|
||||
"k8s.io/kubernetes/pkg/api/errors"
|
||||
@@ -29,13 +28,11 @@ import (
|
||||
"k8s.io/kubernetes/pkg/fields"
|
||||
"k8s.io/kubernetes/pkg/kubelet/client"
|
||||
"k8s.io/kubernetes/pkg/labels"
|
||||
"k8s.io/kubernetes/pkg/master/ports"
|
||||
"k8s.io/kubernetes/pkg/registry/generic"
|
||||
"k8s.io/kubernetes/pkg/runtime"
|
||||
pkgstorage "k8s.io/kubernetes/pkg/storage"
|
||||
"k8s.io/kubernetes/pkg/types"
|
||||
utilnet "k8s.io/kubernetes/pkg/util/net"
|
||||
nodeutil "k8s.io/kubernetes/pkg/util/node"
|
||||
"k8s.io/kubernetes/pkg/util/validation/field"
|
||||
)
|
||||
|
||||
@@ -176,39 +173,23 @@ func ResourceLocation(getter ResourceGetter, connection client.ConnectionInfoGet
|
||||
return nil, nil, errors.NewBadRequest(fmt.Sprintf("invalid node request %q", id))
|
||||
}
|
||||
|
||||
nodeObj, err := getter.Get(ctx, name)
|
||||
info, err := connection.GetConnectionInfo(ctx, types.NodeName(name))
|
||||
if err != nil {
|
||||
return nil, nil, err
|
||||
}
|
||||
node := nodeObj.(*api.Node)
|
||||
hostIP, err := nodeutil.GetNodeHostIP(node)
|
||||
if err != nil {
|
||||
return nil, nil, err
|
||||
}
|
||||
host := hostIP.String()
|
||||
|
||||
// We check if we want to get a default Kubelet's transport. It happens if either:
|
||||
// - no port is specified in request (Kubelet's port is default),
|
||||
// - we're using Port stored as a DaemonEndpoint and requested port is a Kubelet's port stored in the DaemonEndpoint,
|
||||
// - there's no information in the API about DaemonEndpoint (legacy cluster) and requested port is equal to ports.KubeletPort (cluster-wide config)
|
||||
kubeletPort := node.Status.DaemonEndpoints.KubeletEndpoint.Port
|
||||
if kubeletPort == 0 {
|
||||
kubeletPort = ports.KubeletPort
|
||||
}
|
||||
if portReq == "" || strconv.Itoa(int(kubeletPort)) == portReq {
|
||||
scheme, host, port, kubeletTransport, err := connection.GetConnectionInfo(ctx, types.NodeName(node.Name))
|
||||
if err != nil {
|
||||
return nil, nil, err
|
||||
}
|
||||
// - no port is specified in request (Kubelet's port is default)
|
||||
// - the requested port matches the kubelet port for this node
|
||||
if portReq == "" || portReq == info.Port {
|
||||
return &url.URL{
|
||||
Scheme: scheme,
|
||||
Host: net.JoinHostPort(
|
||||
host,
|
||||
strconv.FormatUint(uint64(port), 10),
|
||||
),
|
||||
Scheme: info.Scheme,
|
||||
Host: net.JoinHostPort(info.Hostname, info.Port),
|
||||
},
|
||||
kubeletTransport,
|
||||
info.Transport,
|
||||
nil
|
||||
}
|
||||
return &url.URL{Scheme: schemeReq, Host: net.JoinHostPort(host, portReq)}, proxyTransport, nil
|
||||
|
||||
// Otherwise, return the requested scheme and port, and the proxy transport
|
||||
return &url.URL{Scheme: schemeReq, Host: net.JoinHostPort(info.Hostname, portReq)}, proxyTransport, nil
|
||||
}
|
||||
|
||||
@@ -307,7 +307,7 @@ func LogLocation(
|
||||
// If pod has not been assigned a host, return an empty location
|
||||
return nil, nil, nil
|
||||
}
|
||||
nodeScheme, nodeHost, nodePort, nodeTransport, err := connInfo.GetConnectionInfo(ctx, nodeName)
|
||||
nodeInfo, err := connInfo.GetConnectionInfo(ctx, nodeName)
|
||||
if err != nil {
|
||||
return nil, nil, err
|
||||
}
|
||||
@@ -334,12 +334,12 @@ func LogLocation(
|
||||
params.Add("limitBytes", strconv.FormatInt(*opts.LimitBytes, 10))
|
||||
}
|
||||
loc := &url.URL{
|
||||
Scheme: nodeScheme,
|
||||
Host: fmt.Sprintf("%s:%d", nodeHost, nodePort),
|
||||
Scheme: nodeInfo.Scheme,
|
||||
Host: net.JoinHostPort(nodeInfo.Hostname, nodeInfo.Port),
|
||||
Path: fmt.Sprintf("/containerLogs/%s/%s/%s", pod.Namespace, pod.Name, container),
|
||||
RawQuery: params.Encode(),
|
||||
}
|
||||
return loc, nodeTransport, nil
|
||||
return loc, nodeInfo.Transport, nil
|
||||
}
|
||||
|
||||
func podHasContainerWithName(pod *api.Pod, containerName string) bool {
|
||||
@@ -458,7 +458,7 @@ func streamLocation(
|
||||
// If pod has not been assigned a host, return an empty location
|
||||
return nil, nil, errors.NewBadRequest(fmt.Sprintf("pod %s does not have a host assigned", name))
|
||||
}
|
||||
nodeScheme, nodeHost, nodePort, nodeTransport, err := connInfo.GetConnectionInfo(ctx, nodeName)
|
||||
nodeInfo, err := connInfo.GetConnectionInfo(ctx, nodeName)
|
||||
if err != nil {
|
||||
return nil, nil, err
|
||||
}
|
||||
@@ -467,12 +467,12 @@ func streamLocation(
|
||||
return nil, nil, err
|
||||
}
|
||||
loc := &url.URL{
|
||||
Scheme: nodeScheme,
|
||||
Host: fmt.Sprintf("%s:%d", nodeHost, nodePort),
|
||||
Scheme: nodeInfo.Scheme,
|
||||
Host: net.JoinHostPort(nodeInfo.Hostname, nodeInfo.Port),
|
||||
Path: fmt.Sprintf("/%s/%s/%s/%s", path, pod.Namespace, pod.Name, container),
|
||||
RawQuery: params.Encode(),
|
||||
}
|
||||
return loc, nodeTransport, nil
|
||||
return loc, nodeInfo.Transport, nil
|
||||
}
|
||||
|
||||
// PortForwardLocation returns the port-forward URL for a pod.
|
||||
@@ -492,14 +492,14 @@ func PortForwardLocation(
|
||||
// If pod has not been assigned a host, return an empty location
|
||||
return nil, nil, errors.NewBadRequest(fmt.Sprintf("pod %s does not have a host assigned", name))
|
||||
}
|
||||
nodeScheme, nodeHost, nodePort, nodeTransport, err := connInfo.GetConnectionInfo(ctx, nodeName)
|
||||
nodeInfo, err := connInfo.GetConnectionInfo(ctx, nodeName)
|
||||
if err != nil {
|
||||
return nil, nil, err
|
||||
}
|
||||
loc := &url.URL{
|
||||
Scheme: nodeScheme,
|
||||
Host: fmt.Sprintf("%s:%d", nodeHost, nodePort),
|
||||
Scheme: nodeInfo.Scheme,
|
||||
Host: net.JoinHostPort(nodeInfo.Hostname, nodeInfo.Port),
|
||||
Path: fmt.Sprintf("/portForward/%s/%s", pod.Namespace, pod.Name),
|
||||
}
|
||||
return loc, nodeTransport, nil
|
||||
return loc, nodeInfo.Transport, nil
|
||||
}
|
||||
|
||||
@@ -64,9 +64,9 @@ import (
|
||||
type LegacyRESTStorageProvider struct {
|
||||
StorageFactory genericapiserver.StorageFactory
|
||||
// Used for custom proxy dialing, and proxy TLS options
|
||||
ProxyTransport http.RoundTripper
|
||||
KubeletClient kubeletclient.KubeletClient
|
||||
EventTTL time.Duration
|
||||
ProxyTransport http.RoundTripper
|
||||
KubeletClientConfig kubeletclient.KubeletClientConfig
|
||||
EventTTL time.Duration
|
||||
|
||||
// ServiceClusterIPRange is used to build cluster IPs for discovery.
|
||||
ServiceClusterIPRange *net.IPNet
|
||||
@@ -135,12 +135,15 @@ func (c LegacyRESTStorageProvider) NewLegacyRESTStorage(restOptionsGetter generi
|
||||
endpointsStorage := endpointsetcd.NewREST(restOptionsGetter(api.Resource("endpoints")))
|
||||
restStorage.EndpointRegistry = endpoint.NewRegistry(endpointsStorage)
|
||||
|
||||
nodeStorage := nodeetcd.NewStorage(restOptionsGetter(api.Resource("nodes")), c.KubeletClient, c.ProxyTransport)
|
||||
nodeStorage, err := nodeetcd.NewStorage(restOptionsGetter(api.Resource("nodes")), c.KubeletClientConfig, c.ProxyTransport)
|
||||
if err != nil {
|
||||
return LegacyRESTStorage{}, genericapiserver.APIGroupInfo{}, err
|
||||
}
|
||||
restStorage.NodeRegistry = node.NewRegistry(nodeStorage.Node)
|
||||
|
||||
podStorage := podetcd.NewStorage(
|
||||
restOptionsGetter(api.Resource("pods")),
|
||||
kubeletclient.ConnectionInfoGetter(nodeStorage.Node),
|
||||
nodeStorage.KubeletConnectionInfo,
|
||||
c.ProxyTransport,
|
||||
podDisruptionClient,
|
||||
)
|
||||
|
||||
Reference in New Issue
Block a user