Refactor kubelet access and add SSL

This commit is contained in:
Haney Maxwell 2014-10-10 15:19:23 -07:00
parent f0f4092fc5
commit 3160500940
10 changed files with 120 additions and 60 deletions

View File

@ -55,7 +55,6 @@ var (
cloudProvider = flag.String("cloud_provider", "", "The provider for cloud services. Empty string for no provider.")
cloudConfigFile = flag.String("cloud_config", "", "The path to the cloud provider configuration file. Empty string for no configuration file.")
minionRegexp = flag.String("minion_regexp", "", "If non empty, and -cloud_provider is specified, a regular expression for matching minion VMs.")
minionPort = flag.Uint("minion_port", 10250, "The port at which kubelet will be listening on the minions.")
healthCheckMinions = flag.Bool("health_check_minions", true, "If true, health check minions and filter unhealthy ones. Default true.")
minionCacheTTL = flag.Duration("minion_cache_ttl", 30*time.Second, "Duration of time to cache minion information. Default 30 seconds.")
eventTTL = flag.Duration("event_ttl", 48*time.Hour, "Amount of time to retain events. Default 2 days.")
@ -70,6 +69,10 @@ var (
nodeMilliCPU = flag.Int("node_milli_cpu", 1000, "The amount of MilliCPU provisioned on each node")
nodeMemory = flag.Int("node_memory", 3*1024*1024*1024, "The amount of memory (in bytes) provisioned on each node")
enableLogsSupport = flag.Bool("enable_logs_support", true, "Enables server endpoint for log collection")
kubeletConfig = client.KubeletConfig{
Port: 10250,
EnableHttps: false,
}
)
func init() {
@ -78,6 +81,7 @@ func init() {
flag.Var(&machineList, "machines", "List of machines to schedule onto, comma separated.")
flag.Var(&corsAllowedOriginList, "cors_allowed_origins", "List of allowed origins for CORS, comma separated. An allowed origin can be a regular expression to support subdomain matching. If this list is empty CORS will not be enabled.")
flag.Var(&portalNet, "portal_net", "A CIDR notation IP range from which to assign portal IPs. This must not overlap with any IP ranges assigned to nodes for pods.")
client.BindKubeletClientConfigFlags(flag.CommandLine, &kubeletConfig)
}
func verifyMinionFlags() {
@ -163,9 +167,9 @@ func main() {
cloud := initCloudProvider(*cloudProvider, *cloudConfigFile)
podInfoGetter := &client.HTTPPodInfoGetter{
Client: http.DefaultClient,
Port: *minionPort,
kubeletClient, err := client.NewKubeletClient(&kubeletConfig)
if err != nil {
glog.Fatalf("Failure to start kubelet client: %v", err)
}
// TODO: expose same flags as client.BindClientConfigFlags but for a server
@ -193,7 +197,7 @@ func main() {
MinionCacheTTL: *minionCacheTTL,
EventTTL: *eventTTL,
MinionRegexp: *minionRegexp,
PodInfoGetter: podInfoGetter,
KubeletClient: kubeletClient,
NodeResources: api.NodeResources{
Capacity: api.ResourceList{
resources.CPU: util.NewIntOrStringFromInt(*nodeMilliCPU),

View File

@ -37,6 +37,7 @@ import (
"github.com/GoogleCloudPlatform/kubernetes/pkg/apiserver"
"github.com/GoogleCloudPlatform/kubernetes/pkg/client"
"github.com/GoogleCloudPlatform/kubernetes/pkg/controller"
"github.com/GoogleCloudPlatform/kubernetes/pkg/health"
"github.com/GoogleCloudPlatform/kubernetes/pkg/kubelet"
"github.com/GoogleCloudPlatform/kubernetes/pkg/kubelet/config"
"github.com/GoogleCloudPlatform/kubernetes/pkg/kubelet/dockertools"
@ -58,20 +59,20 @@ var (
fakeDocker1, fakeDocker2 dockertools.FakeDockerClient
)
type fakePodInfoGetter struct{}
type fakeKubeletClient struct{}
func (fakePodInfoGetter) GetPodInfo(host, podNamespace, podID string) (api.PodInfo, error) {
func (fakeKubeletClient) GetPodInfo(host, podNamespace, podID string) (api.PodInfo, error) {
// This is a horrible hack to get around the fact that we can't provide
// different port numbers per kubelet...
var c client.PodInfoGetter
switch host {
case "localhost":
c = &client.HTTPPodInfoGetter{
c = &client.HTTPKubeletClient{
Client: http.DefaultClient,
Port: 10250,
}
case "machine":
c = &client.HTTPPodInfoGetter{
c = &client.HTTPKubeletClient{
Client: http.DefaultClient,
Port: 10251,
}
@ -81,6 +82,10 @@ func (fakePodInfoGetter) GetPodInfo(host, podNamespace, podID string) (api.PodIn
return c.GetPodInfo("localhost", podNamespace, podID)
}
func (fakeKubeletClient) HealthCheck(host string) (health.Status, error) {
return health.Healthy, nil
}
type delegateHandler struct {
delegate http.Handler
}
@ -131,7 +136,7 @@ func startComponents(manifestURL string) (apiServerURL string) {
Client: cl,
EtcdHelper: helper,
Minions: machineList,
PodInfoGetter: fakePodInfoGetter{},
KubeletClient: fakeKubeletClient{},
PortalNet: portalNet,
})
mux := http.NewServeMux()
@ -181,7 +186,7 @@ func startComponents(manifestURL string) (apiServerURL string) {
// podsOnMinions returns true when all of the selected pods exist on a minion.
func podsOnMinions(c *client.Client, pods api.PodList) wait.ConditionFunc {
podInfo := fakePodInfoGetter{}
podInfo := fakeKubeletClient{}
return func() (bool, error) {
for i := range pods.Items {
host, id, namespace := pods.Items[i].CurrentState.Host, pods.Items[i].Name, pods.Items[i].Namespace

View File

@ -71,8 +71,9 @@ ${GO_OUT}/apiserver \
--port="${API_PORT}" \
--etcd_servers="http://${ETCD_HOST}:${ETCD_PORT}" \
--machines="127.0.0.1" \
--minion_port=${KUBELET_PORT} \
--kubelet_port=${KUBELET_PORT} \
--portal_net="10.0.0.0/24" 1>&2 &
APISERVER_PID=$!
wait_for_url "http://127.0.0.1:${API_PORT}/healthz" "apiserver: "

View File

@ -21,6 +21,7 @@ package client
type FlagSet interface {
StringVar(p *string, name, value, usage string)
BoolVar(p *bool, name string, value bool, usage string)
UintVar(p *uint, name string, value uint, usage string)
}
// BindClientConfigFlags registers a standard set of CLI flags for connecting to a Kubernetes API server.
@ -30,5 +31,13 @@ func BindClientConfigFlags(flags FlagSet, config *Config) {
flags.BoolVar(&config.Insecure, "insecure_skip_tls_verify", config.Insecure, "If true, the server's certificate will not be checked for validity. This will make your HTTPS connections insecure.")
flags.StringVar(&config.CertFile, "client_certificate", config.CertFile, "Path to a client key file for TLS.")
flags.StringVar(&config.KeyFile, "client_key", config.KeyFile, "Path to a client key file for TLS.")
flags.StringVar(&config.CAFile, "certificate_authority", config.CAFile, "Path to a cert. file for the certificate authority")
flags.StringVar(&config.CAFile, "certificate_authority", config.CAFile, "Path to a cert. file for the certificate authority.")
}
func BindKubeletClientConfigFlags(flags FlagSet, config *KubeletConfig) {
flags.BoolVar(&config.EnableHttps, "kubelet_https", config.EnableHttps, "Use https for kubelet connections")
flags.UintVar(&config.Port, "kubelet_port", config.Port, "Kubelet port")
flags.StringVar(&config.CertFile, "kubelet_client_certificate", config.CertFile, "Path to a client key file for TLS.")
flags.StringVar(&config.KeyFile, "kubelet_client_key", config.KeyFile, "Path to a client key file for TLS.")
flags.StringVar(&config.CAFile, "kubelet_certificate_authority", config.CAFile, "Path to a cert. file for the certificate authority.")
}

View File

@ -61,6 +61,19 @@ type Config struct {
Transport http.RoundTripper
}
type KubeletConfig struct {
// ToDo: Add support for different kubelet instances exposing different ports
Port uint
EnableHttps bool
// TLS Configuration, only applies if EnableHttps is true.
CertFile string
// TLS Configuration, only applies if EnableHttps is true.
KeyFile string
// TLS Configuration, only applies if EnableHttps is true.
CAFile string
}
// New creates a Kubernetes client for the given config. This client works with pods,
// replication controllers and services. It allows operations such as list, get, update
// and delete on these objects. An error is returned if the provided configuration

View File

@ -26,11 +26,23 @@ import (
"strconv"
"github.com/GoogleCloudPlatform/kubernetes/pkg/api"
"github.com/GoogleCloudPlatform/kubernetes/pkg/health"
)
// ErrPodInfoNotAvailable may be returned when the requested pod info is not available.
var ErrPodInfoNotAvailable = errors.New("no pod info available")
// KubeletClient is an interface for all kubelet functionality
type KubeletClient interface {
KubeletHealthChecker
PodInfoGetter
}
// KubeletHealthchecker is an interface for healthchecking kubelets
type KubeletHealthChecker interface {
HealthCheck(host string) (health.Status, error)
}
// PodInfoGetter is an interface for things that can get information about a pod's containers.
// Injectable for easy testing.
type PodInfoGetter interface {
@ -39,19 +51,50 @@ type PodInfoGetter interface {
GetPodInfo(host, podNamespace, podID string) (api.PodInfo, error)
}
// HTTPPodInfoGetter is the default implementation of PodInfoGetter, accesses the kubelet over HTTP.
type HTTPPodInfoGetter struct {
Client *http.Client
Port uint
// HTTPKubeletClient is the default implementation of PodInfoGetter and KubeletHealthchecker, accesses the kubelet over HTTP.
type HTTPKubeletClient struct {
Client *http.Client
Port uint
EnableHttps bool
}
func NewKubeletClient(config *KubeletConfig) (KubeletClient, error) {
transport := http.DefaultTransport
if config.CAFile != "" {
t, err := NewClientCertTLSTransport(config.CertFile, config.KeyFile, config.CAFile)
if err != nil {
return nil, err
}
transport = t
}
c := &http.Client{Transport: transport}
return &HTTPKubeletClient{
Client: c,
Port: config.Port,
EnableHttps: config.EnableHttps,
}, nil
}
func (c *HTTPKubeletClient) url(host string) string {
scheme := "http://"
if c.EnableHttps {
scheme = "https://"
}
return fmt.Sprintf(
"%s%s",
scheme,
net.JoinHostPort(host, strconv.FormatUint(uint64(c.Port), 10)))
}
// GetPodInfo gets information about the specified pod.
func (c *HTTPPodInfoGetter) GetPodInfo(host, podNamespace, podID string) (api.PodInfo, error) {
func (c *HTTPKubeletClient) GetPodInfo(host, podNamespace, podID string) (api.PodInfo, error) {
request, err := http.NewRequest(
"GET",
fmt.Sprintf(
"http://%s/podInfo?podID=%s&podNamespace=%s",
net.JoinHostPort(host, strconv.FormatUint(uint64(c.Port), 10)),
"%s/podInfo?podID=%s&podNamespace=%s",
c.url(host),
podID,
podNamespace),
nil)
@ -79,7 +122,11 @@ func (c *HTTPPodInfoGetter) GetPodInfo(host, podNamespace, podID string) (api.Po
return info, nil
}
// FakePodInfoGetter is a fake implementation of PodInfoGetter. It is useful for testing.
func (c *HTTPKubeletClient) HealthCheck(host string) (health.Status, error) {
return health.DoHTTPCheck(fmt.Sprintf("%s/healthz", c.url(host)), c.Client)
}
// FakeKubeletClient is a fake implementation of PodInfoGetter. It is useful for testing.
type FakePodInfoGetter struct {
data api.PodInfo
err error

View File

@ -29,7 +29,7 @@ import (
"github.com/GoogleCloudPlatform/kubernetes/pkg/util"
)
func TestHTTPPodInfoGetter(t *testing.T) {
func TestHTTPKubeletClient(t *testing.T) {
expectObj := api.PodInfo{
"myID": api.ContainerStatus{},
}
@ -56,7 +56,7 @@ func TestHTTPPodInfoGetter(t *testing.T) {
t.Errorf("unexpected error: %v", err)
}
podInfoGetter := &HTTPPodInfoGetter{
podInfoGetter := &HTTPKubeletClient{
Client: http.DefaultClient,
Port: uint(port),
}
@ -71,7 +71,7 @@ func TestHTTPPodInfoGetter(t *testing.T) {
}
}
func TestHTTPPodInfoGetterNotFound(t *testing.T) {
func TestHTTPKubeletClientNotFound(t *testing.T) {
expectObj := api.PodInfo{
"myID": api.ContainerStatus{},
}
@ -98,7 +98,7 @@ func TestHTTPPodInfoGetterNotFound(t *testing.T) {
t.Errorf("unexpected error: %v", err)
}
podInfoGetter := &HTTPPodInfoGetter{
podInfoGetter := &HTTPKubeletClient{
Client: http.DefaultClient,
Port: uint(port),
}

View File

@ -18,7 +18,6 @@ package master
import (
"net"
"net/http"
"time"
"github.com/GoogleCloudPlatform/kubernetes/pkg/api"
@ -53,7 +52,7 @@ type Config struct {
MinionCacheTTL time.Duration
EventTTL time.Duration
MinionRegexp string
PodInfoGetter client.PodInfoGetter
KubeletClient client.KubeletClient
NodeResources api.NodeResources
PortalNet *net.IPNet
}
@ -110,14 +109,14 @@ func New(c *Config) *Master {
func makeMinionRegistry(c *Config) minion.Registry {
var minionRegistry minion.Registry = etcd.NewRegistry(c.EtcdHelper, nil)
if c.HealthCheckMinions {
minionRegistry = minion.NewHealthyRegistry(minionRegistry, &http.Client{})
minionRegistry = minion.NewHealthyRegistry(minionRegistry, c.KubeletClient)
}
return minionRegistry
}
// init initializes master.
func (m *Master) init(c *Config) {
podCache := NewPodCache(c.PodInfoGetter, m.podRegistry)
podCache := NewPodCache(c.KubeletClient, m.podRegistry)
go util.Forever(func() { podCache.UpdateAllContainers() }, time.Second*30)
if c.Cloud != nil && len(c.MinionRegexp) > 0 {
@ -136,7 +135,7 @@ func (m *Master) init(c *Config) {
"pods": pod.NewREST(&pod.RESTConfig{
CloudProvider: c.Cloud,
PodCache: podCache,
PodInfoGetter: c.PodInfoGetter,
PodInfoGetter: c.KubeletClient,
Registry: m.podRegistry,
Minions: m.client,
}),

View File

@ -17,10 +17,8 @@ limitations under the License.
package minion
import (
"fmt"
"net/http"
"github.com/GoogleCloudPlatform/kubernetes/pkg/api"
"github.com/GoogleCloudPlatform/kubernetes/pkg/client"
"github.com/GoogleCloudPlatform/kubernetes/pkg/health"
"github.com/golang/glog"
@ -28,15 +26,13 @@ import (
type HealthyRegistry struct {
delegate Registry
client health.HTTPGetInterface
port int
client client.KubeletHealthChecker
}
func NewHealthyRegistry(delegate Registry, client *http.Client) Registry {
func NewHealthyRegistry(delegate Registry, client client.KubeletHealthChecker) Registry {
return &HealthyRegistry{
delegate: delegate,
client: client,
port: 10250,
}
}
@ -48,7 +44,7 @@ func (r *HealthyRegistry) GetMinion(ctx api.Context, minionID string) (*api.Mini
if err != nil {
return nil, err
}
status, err := health.DoHTTPCheck(r.makeMinionURL(minionID), r.client)
status, err := r.client.HealthCheck(minionID)
if err != nil {
return nil, err
}
@ -73,7 +69,7 @@ func (r *HealthyRegistry) ListMinions(ctx api.Context) (currentMinions *api.Mini
return result, err
}
for _, minion := range list.Items {
status, err := health.DoHTTPCheck(r.makeMinionURL(minion.Name), r.client)
status, err := r.client.HealthCheck(minion.Name)
if err != nil {
glog.V(1).Infof("%#v failed health check with error: %s", minion, err)
continue
@ -86,7 +82,3 @@ func (r *HealthyRegistry) ListMinions(ctx api.Context) (currentMinions *api.Mini
}
return result, nil
}
func (r *HealthyRegistry) makeMinionURL(minion string) string {
return fmt.Sprintf("http://%s:%d/healthz", minion, r.port)
}

View File

@ -17,27 +17,18 @@ limitations under the License.
package minion
import (
"bytes"
"io/ioutil"
"net/http"
"reflect"
"testing"
"github.com/GoogleCloudPlatform/kubernetes/pkg/api"
"github.com/GoogleCloudPlatform/kubernetes/pkg/health"
"github.com/GoogleCloudPlatform/kubernetes/pkg/registry/registrytest"
)
type alwaysYes struct{}
func fakeHTTPResponse(status int) *http.Response {
return &http.Response{
StatusCode: status,
Body: ioutil.NopCloser(&bytes.Buffer{}),
}
}
func (alwaysYes) Get(url string) (*http.Response, error) {
return fakeHTTPResponse(http.StatusOK), nil
func (alwaysYes) HealthCheck(host string) (health.Status, error) {
return health.Healthy, nil
}
func TestBasicDelegation(t *testing.T) {
@ -80,11 +71,11 @@ type notMinion struct {
minion string
}
func (n *notMinion) Get(url string) (*http.Response, error) {
if url != "http://"+n.minion+":10250/healthz" {
return fakeHTTPResponse(http.StatusOK), nil
func (n *notMinion) HealthCheck(host string) (health.Status, error) {
if host != n.minion {
return health.Healthy, nil
} else {
return fakeHTTPResponse(http.StatusInternalServerError), nil
return health.Unhealthy, nil
}
}
@ -94,7 +85,6 @@ func TestFiltering(t *testing.T) {
healthy := HealthyRegistry{
delegate: mockMinionRegistry,
client: &notMinion{minion: "m1"},
port: 10250,
}
expected := []string{"m2", "m3"}
list, err := healthy.ListMinions(ctx)