Remove static kubelet client, refactor ConnectionInfoGetter

This commit is contained in:
Jordan Liggitt
2016-10-07 15:30:45 -04:00
parent 3f4c438946
commit a082a2e749
11 changed files with 170 additions and 231 deletions

View File

@@ -17,19 +17,17 @@ limitations under the License.
package client
import (
"errors"
"fmt"
"net"
"net/http"
"strings"
"strconv"
"time"
"k8s.io/kubernetes/pkg/api"
"k8s.io/kubernetes/pkg/api/validation"
"k8s.io/kubernetes/pkg/client/restclient"
"k8s.io/kubernetes/pkg/client/transport"
"k8s.io/kubernetes/pkg/types"
utilnet "k8s.io/kubernetes/pkg/util/net"
nodeutil "k8s.io/kubernetes/pkg/util/node"
)
type KubeletClientConfig struct {
@@ -50,19 +48,17 @@ type KubeletClientConfig struct {
Dial func(net, addr string) (net.Conn, error)
}
// KubeletClient is an interface for all kubelet functionality
type KubeletClient interface {
GetRawConnectionInfo(ctx api.Context, nodeName types.NodeName) (scheme string, port uint, transport http.RoundTripper, err error)
// ConnectionInfo provides the information needed to connect to a kubelet
type ConnectionInfo struct {
Scheme string
Hostname string
Port string
Transport http.RoundTripper
}
// ConnectionInfoGetter provides ConnectionInfo for the kubelet running on a named node
type ConnectionInfoGetter interface {
GetConnectionInfo(ctx api.Context, nodeName types.NodeName) (scheme string, host string, port uint, transport http.RoundTripper, err error)
}
// HTTPKubeletClient is the default implementation of KubeletHealthchecker, accesses the kubelet over HTTP.
type HTTPKubeletClient struct {
Client *http.Client
Config *KubeletClientConfig
GetConnectionInfo(ctx api.Context, nodeName types.NodeName) (*ConnectionInfo, error)
}
func MakeTransport(config *KubeletClientConfig) (http.RoundTripper, error) {
@@ -82,43 +78,6 @@ func MakeTransport(config *KubeletClientConfig) (http.RoundTripper, error) {
return transport.HTTPWrappersForConfig(config.transportConfig(), rt)
}
// TODO: this structure is questionable, it should be using client.Config and overriding defaults.
func NewStaticKubeletClient(config *KubeletClientConfig) (KubeletClient, error) {
transport, err := MakeTransport(config)
if err != nil {
return nil, err
}
c := &http.Client{
Transport: transport,
Timeout: config.HTTPTimeout,
}
return &HTTPKubeletClient{
Client: c,
Config: config,
}, nil
}
// In default HTTPKubeletClient ctx is unused.
func (c *HTTPKubeletClient) GetRawConnectionInfo(ctx api.Context, nodeName types.NodeName) (string, uint, http.RoundTripper, error) {
if errs := validation.ValidateNodeName(string(nodeName), false); len(errs) != 0 {
return "", 0, nil, fmt.Errorf("invalid node name: %s", strings.Join(errs, ";"))
}
scheme := "http"
if c.Config.EnableHttps {
scheme = "https"
}
return scheme, c.Config.Port, c.Client.Transport, nil
}
// FakeKubeletClient is a fake implementation of KubeletClient which returns an error
// when called. It is useful to pass to the master in a test configuration with
// no kubelets.
type FakeKubeletClient struct{}
func (c FakeKubeletClient) GetRawConnectionInfo(ctx api.Context, nodeName types.NodeName) (string, uint, http.RoundTripper, error) {
return "", 0, nil, errors.New("Not Implemented")
}
// transportConfig converts a client config to an appropriate transport config.
func (c *KubeletClientConfig) transportConfig() *transport.Config {
cfg := &transport.Config{
@@ -137,3 +96,73 @@ func (c *KubeletClientConfig) transportConfig() *transport.Config {
}
return cfg
}
// NodeGetter defines an interface for looking up a node by name
type NodeGetter interface {
Get(name string) (*api.Node, error)
}
// NodeGetterFunc allows implementing NodeGetter with a function
type NodeGetterFunc func(name string) (*api.Node, error)
func (f NodeGetterFunc) Get(name string) (*api.Node, error) {
return f(name)
}
// NodeConnectionInfoGetter obtains connection info from the status of a Node API object
type NodeConnectionInfoGetter struct {
// nodes is used to look up Node objects
nodes NodeGetter
// scheme is the scheme to use to connect to all kubelets
scheme string
// defaultPort is the port to use if no Kubelet endpoint port is recorded in the node status
defaultPort int
// transport is the transport to use to send a request to all kubelets
transport http.RoundTripper
}
func NewNodeConnectionInfoGetter(nodes NodeGetter, config KubeletClientConfig) (ConnectionInfoGetter, error) {
scheme := "http"
if config.EnableHttps {
scheme = "https"
}
transport, err := MakeTransport(&config)
if err != nil {
return nil, err
}
return &NodeConnectionInfoGetter{
nodes: nodes,
scheme: scheme,
defaultPort: int(config.Port),
transport: transport,
}, nil
}
func (k *NodeConnectionInfoGetter) GetConnectionInfo(ctx api.Context, nodeName types.NodeName) (*ConnectionInfo, error) {
node, err := k.nodes.Get(string(nodeName))
if err != nil {
return nil, err
}
// Find a kubelet-reported address, using preferred address type
hostIP, err := nodeutil.GetNodeHostIP(node)
if err != nil {
return nil, err
}
host := hostIP.String()
// Use the kubelet-reported port, if present
port := int(node.Status.DaemonEndpoints.KubeletEndpoint.Port)
if port <= 0 {
port = k.defaultPort
}
return &ConnectionInfo{
Scheme: k.scheme,
Hostname: host,
Port: strconv.Itoa(port),
Transport: k.transport,
}, nil
}

View File

@@ -17,50 +17,17 @@ limitations under the License.
package client
import (
"encoding/json"
"net/http/httptest"
"net/url"
"testing"
"k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset/typed/core/unversioned"
"k8s.io/kubernetes/pkg/client/restclient"
"k8s.io/kubernetes/pkg/probe"
utiltesting "k8s.io/kubernetes/pkg/util/testing"
)
func TestHTTPKubeletClient(t *testing.T) {
expectObj := probe.Success
body, err := json.Marshal(expectObj)
if err != nil {
t.Errorf("unexpected error: %v", err)
}
// Ensure a node client can be used as a NodeGetter.
// This allows anyone with a node client to easily construct a NewNodeConnectionInfoGetter.
var _ = NodeGetter(unversioned.NodeInterface(nil))
fakeHandler := utiltesting.FakeHandler{
StatusCode: 200,
ResponseBody: string(body),
}
testServer := httptest.NewServer(&fakeHandler)
defer testServer.Close()
if _, err := url.Parse(testServer.URL); err != nil {
t.Errorf("unexpected error: %v", err)
}
}
func TestNewKubeletClient(t *testing.T) {
config := &KubeletClientConfig{
EnableHttps: false,
}
client, err := NewStaticKubeletClient(config)
if err != nil {
t.Errorf("Error while trying to create a client: %v", err)
}
if client == nil {
t.Error("client is nil.")
}
}
func TestNewKubeletClientTLSInvalid(t *testing.T) {
func TestMakeTransportInvalid(t *testing.T) {
config := &KubeletClientConfig{
EnableHttps: true,
//Invalid certificate and key path
@@ -71,16 +38,16 @@ func TestNewKubeletClientTLSInvalid(t *testing.T) {
},
}
client, err := NewStaticKubeletClient(config)
rt, err := MakeTransport(config)
if err == nil {
t.Errorf("Expected an error")
}
if client != nil {
t.Error("client should be nil as we provided invalid cert file")
if rt != nil {
t.Error("rt should be nil as we provided invalid cert file")
}
}
func TestNewKubeletClientTLSValid(t *testing.T) {
func TestMakeTransportValid(t *testing.T) {
config := &KubeletClientConfig{
Port: 1234,
EnableHttps: true,
@@ -93,34 +60,11 @@ func TestNewKubeletClientTLSValid(t *testing.T) {
},
}
client, err := NewStaticKubeletClient(config)
rt, err := MakeTransport(config)
if err != nil {
t.Errorf("Not expecting an error #%v", err)
}
if client == nil {
t.Error("client should not be nil")
}
{
scheme, port, transport, err := client.GetRawConnectionInfo(nil, "foo")
if err != nil {
t.Errorf("Error getting info: %v", err)
}
if scheme != "https" {
t.Errorf("Expected https, got %s", scheme)
}
if port != 1234 {
t.Errorf("Expected 1234, got %d", port)
}
if transport == nil {
t.Errorf("Expected transport, got nil")
}
}
{
_, _, _, err := client.GetRawConnectionInfo(nil, "foo bar")
if err == nil {
t.Errorf("Expected error getting connection info for invalid node name, got none")
}
if rt == nil {
t.Error("rt should not be nil")
}
}