From 3160500940633a25a9cdfbec6520d4a20af9fd20 Mon Sep 17 00:00:00 2001 From: Haney Maxwell Date: Fri, 10 Oct 2014 15:19:23 -0700 Subject: [PATCH] Refactor kubelet access and add SSL --- cmd/apiserver/apiserver.go | 14 +++-- cmd/integration/integration.go | 17 +++-- hack/test-cmd.sh | 3 +- pkg/client/flags.go | 11 +++- pkg/client/helper.go | 13 ++++ pkg/client/{podinfo.go => kubelet.go} | 63 ++++++++++++++++--- .../{podinfo_test.go => kubelet_test.go} | 8 +-- pkg/master/master.go | 9 ++- pkg/registry/minion/healthy_registry.go | 18 ++---- pkg/registry/minion/healthy_registry_test.go | 24 +++---- 10 files changed, 120 insertions(+), 60 deletions(-) rename pkg/client/{podinfo.go => kubelet.go} (58%) rename pkg/client/{podinfo_test.go => kubelet_test.go} (93%) diff --git a/cmd/apiserver/apiserver.go b/cmd/apiserver/apiserver.go index 137a3f15981..2c4be13f889 100644 --- a/cmd/apiserver/apiserver.go +++ b/cmd/apiserver/apiserver.go @@ -55,7 +55,6 @@ var ( cloudProvider = flag.String("cloud_provider", "", "The provider for cloud services. Empty string for no provider.") cloudConfigFile = flag.String("cloud_config", "", "The path to the cloud provider configuration file. Empty string for no configuration file.") minionRegexp = flag.String("minion_regexp", "", "If non empty, and -cloud_provider is specified, a regular expression for matching minion VMs.") - minionPort = flag.Uint("minion_port", 10250, "The port at which kubelet will be listening on the minions.") healthCheckMinions = flag.Bool("health_check_minions", true, "If true, health check minions and filter unhealthy ones. Default true.") minionCacheTTL = flag.Duration("minion_cache_ttl", 30*time.Second, "Duration of time to cache minion information. Default 30 seconds.") eventTTL = flag.Duration("event_ttl", 48*time.Hour, "Amount of time to retain events. Default 2 days.") @@ -70,6 +69,10 @@ var ( nodeMilliCPU = flag.Int("node_milli_cpu", 1000, "The amount of MilliCPU provisioned on each node") nodeMemory = flag.Int("node_memory", 3*1024*1024*1024, "The amount of memory (in bytes) provisioned on each node") enableLogsSupport = flag.Bool("enable_logs_support", true, "Enables server endpoint for log collection") + kubeletConfig = client.KubeletConfig{ + Port: 10250, + EnableHttps: false, + } ) func init() { @@ -78,6 +81,7 @@ func init() { flag.Var(&machineList, "machines", "List of machines to schedule onto, comma separated.") flag.Var(&corsAllowedOriginList, "cors_allowed_origins", "List of allowed origins for CORS, comma separated. An allowed origin can be a regular expression to support subdomain matching. If this list is empty CORS will not be enabled.") flag.Var(&portalNet, "portal_net", "A CIDR notation IP range from which to assign portal IPs. This must not overlap with any IP ranges assigned to nodes for pods.") + client.BindKubeletClientConfigFlags(flag.CommandLine, &kubeletConfig) } func verifyMinionFlags() { @@ -163,9 +167,9 @@ func main() { cloud := initCloudProvider(*cloudProvider, *cloudConfigFile) - podInfoGetter := &client.HTTPPodInfoGetter{ - Client: http.DefaultClient, - Port: *minionPort, + kubeletClient, err := client.NewKubeletClient(&kubeletConfig) + if err != nil { + glog.Fatalf("Failure to start kubelet client: %v", err) } // TODO: expose same flags as client.BindClientConfigFlags but for a server @@ -193,7 +197,7 @@ func main() { MinionCacheTTL: *minionCacheTTL, EventTTL: *eventTTL, MinionRegexp: *minionRegexp, - PodInfoGetter: podInfoGetter, + KubeletClient: kubeletClient, NodeResources: api.NodeResources{ Capacity: api.ResourceList{ resources.CPU: util.NewIntOrStringFromInt(*nodeMilliCPU), diff --git a/cmd/integration/integration.go b/cmd/integration/integration.go index 0edfdb9f2e6..d1709fb5eb8 100644 --- a/cmd/integration/integration.go +++ b/cmd/integration/integration.go @@ -37,6 +37,7 @@ import ( "github.com/GoogleCloudPlatform/kubernetes/pkg/apiserver" "github.com/GoogleCloudPlatform/kubernetes/pkg/client" "github.com/GoogleCloudPlatform/kubernetes/pkg/controller" + "github.com/GoogleCloudPlatform/kubernetes/pkg/health" "github.com/GoogleCloudPlatform/kubernetes/pkg/kubelet" "github.com/GoogleCloudPlatform/kubernetes/pkg/kubelet/config" "github.com/GoogleCloudPlatform/kubernetes/pkg/kubelet/dockertools" @@ -58,20 +59,20 @@ var ( fakeDocker1, fakeDocker2 dockertools.FakeDockerClient ) -type fakePodInfoGetter struct{} +type fakeKubeletClient struct{} -func (fakePodInfoGetter) GetPodInfo(host, podNamespace, podID string) (api.PodInfo, error) { +func (fakeKubeletClient) GetPodInfo(host, podNamespace, podID string) (api.PodInfo, error) { // This is a horrible hack to get around the fact that we can't provide // different port numbers per kubelet... var c client.PodInfoGetter switch host { case "localhost": - c = &client.HTTPPodInfoGetter{ + c = &client.HTTPKubeletClient{ Client: http.DefaultClient, Port: 10250, } case "machine": - c = &client.HTTPPodInfoGetter{ + c = &client.HTTPKubeletClient{ Client: http.DefaultClient, Port: 10251, } @@ -81,6 +82,10 @@ func (fakePodInfoGetter) GetPodInfo(host, podNamespace, podID string) (api.PodIn return c.GetPodInfo("localhost", podNamespace, podID) } +func (fakeKubeletClient) HealthCheck(host string) (health.Status, error) { + return health.Healthy, nil +} + type delegateHandler struct { delegate http.Handler } @@ -131,7 +136,7 @@ func startComponents(manifestURL string) (apiServerURL string) { Client: cl, EtcdHelper: helper, Minions: machineList, - PodInfoGetter: fakePodInfoGetter{}, + KubeletClient: fakeKubeletClient{}, PortalNet: portalNet, }) mux := http.NewServeMux() @@ -181,7 +186,7 @@ func startComponents(manifestURL string) (apiServerURL string) { // podsOnMinions returns true when all of the selected pods exist on a minion. func podsOnMinions(c *client.Client, pods api.PodList) wait.ConditionFunc { - podInfo := fakePodInfoGetter{} + podInfo := fakeKubeletClient{} return func() (bool, error) { for i := range pods.Items { host, id, namespace := pods.Items[i].CurrentState.Host, pods.Items[i].Name, pods.Items[i].Namespace diff --git a/hack/test-cmd.sh b/hack/test-cmd.sh index d5fb2cd3a72..7016344acd8 100755 --- a/hack/test-cmd.sh +++ b/hack/test-cmd.sh @@ -71,8 +71,9 @@ ${GO_OUT}/apiserver \ --port="${API_PORT}" \ --etcd_servers="http://${ETCD_HOST}:${ETCD_PORT}" \ --machines="127.0.0.1" \ - --minion_port=${KUBELET_PORT} \ + --kubelet_port=${KUBELET_PORT} \ --portal_net="10.0.0.0/24" 1>&2 & + APISERVER_PID=$! wait_for_url "http://127.0.0.1:${API_PORT}/healthz" "apiserver: " diff --git a/pkg/client/flags.go b/pkg/client/flags.go index cfd2356085c..074e038ec96 100644 --- a/pkg/client/flags.go +++ b/pkg/client/flags.go @@ -21,6 +21,7 @@ package client type FlagSet interface { StringVar(p *string, name, value, usage string) BoolVar(p *bool, name string, value bool, usage string) + UintVar(p *uint, name string, value uint, usage string) } // BindClientConfigFlags registers a standard set of CLI flags for connecting to a Kubernetes API server. @@ -30,5 +31,13 @@ func BindClientConfigFlags(flags FlagSet, config *Config) { flags.BoolVar(&config.Insecure, "insecure_skip_tls_verify", config.Insecure, "If true, the server's certificate will not be checked for validity. This will make your HTTPS connections insecure.") flags.StringVar(&config.CertFile, "client_certificate", config.CertFile, "Path to a client key file for TLS.") flags.StringVar(&config.KeyFile, "client_key", config.KeyFile, "Path to a client key file for TLS.") - flags.StringVar(&config.CAFile, "certificate_authority", config.CAFile, "Path to a cert. file for the certificate authority") + flags.StringVar(&config.CAFile, "certificate_authority", config.CAFile, "Path to a cert. file for the certificate authority.") +} + +func BindKubeletClientConfigFlags(flags FlagSet, config *KubeletConfig) { + flags.BoolVar(&config.EnableHttps, "kubelet_https", config.EnableHttps, "Use https for kubelet connections") + flags.UintVar(&config.Port, "kubelet_port", config.Port, "Kubelet port") + flags.StringVar(&config.CertFile, "kubelet_client_certificate", config.CertFile, "Path to a client key file for TLS.") + flags.StringVar(&config.KeyFile, "kubelet_client_key", config.KeyFile, "Path to a client key file for TLS.") + flags.StringVar(&config.CAFile, "kubelet_certificate_authority", config.CAFile, "Path to a cert. file for the certificate authority.") } diff --git a/pkg/client/helper.go b/pkg/client/helper.go index 6c83f2f456b..176259014b6 100644 --- a/pkg/client/helper.go +++ b/pkg/client/helper.go @@ -61,6 +61,19 @@ type Config struct { Transport http.RoundTripper } +type KubeletConfig struct { + // ToDo: Add support for different kubelet instances exposing different ports + Port uint + EnableHttps bool + + // TLS Configuration, only applies if EnableHttps is true. + CertFile string + // TLS Configuration, only applies if EnableHttps is true. + KeyFile string + // TLS Configuration, only applies if EnableHttps is true. + CAFile string +} + // New creates a Kubernetes client for the given config. This client works with pods, // replication controllers and services. It allows operations such as list, get, update // and delete on these objects. An error is returned if the provided configuration diff --git a/pkg/client/podinfo.go b/pkg/client/kubelet.go similarity index 58% rename from pkg/client/podinfo.go rename to pkg/client/kubelet.go index 174f35738e1..3aa5399e726 100644 --- a/pkg/client/podinfo.go +++ b/pkg/client/kubelet.go @@ -26,11 +26,23 @@ import ( "strconv" "github.com/GoogleCloudPlatform/kubernetes/pkg/api" + "github.com/GoogleCloudPlatform/kubernetes/pkg/health" ) // ErrPodInfoNotAvailable may be returned when the requested pod info is not available. var ErrPodInfoNotAvailable = errors.New("no pod info available") +// KubeletClient is an interface for all kubelet functionality +type KubeletClient interface { + KubeletHealthChecker + PodInfoGetter +} + +// KubeletHealthchecker is an interface for healthchecking kubelets +type KubeletHealthChecker interface { + HealthCheck(host string) (health.Status, error) +} + // PodInfoGetter is an interface for things that can get information about a pod's containers. // Injectable for easy testing. type PodInfoGetter interface { @@ -39,19 +51,50 @@ type PodInfoGetter interface { GetPodInfo(host, podNamespace, podID string) (api.PodInfo, error) } -// HTTPPodInfoGetter is the default implementation of PodInfoGetter, accesses the kubelet over HTTP. -type HTTPPodInfoGetter struct { - Client *http.Client - Port uint +// HTTPKubeletClient is the default implementation of PodInfoGetter and KubeletHealthchecker, accesses the kubelet over HTTP. +type HTTPKubeletClient struct { + Client *http.Client + Port uint + EnableHttps bool +} + +func NewKubeletClient(config *KubeletConfig) (KubeletClient, error) { + transport := http.DefaultTransport + if config.CAFile != "" { + t, err := NewClientCertTLSTransport(config.CertFile, config.KeyFile, config.CAFile) + if err != nil { + return nil, err + } + transport = t + } + + c := &http.Client{Transport: transport} + return &HTTPKubeletClient{ + Client: c, + Port: config.Port, + EnableHttps: config.EnableHttps, + }, nil +} + +func (c *HTTPKubeletClient) url(host string) string { + scheme := "http://" + if c.EnableHttps { + scheme = "https://" + } + + return fmt.Sprintf( + "%s%s", + scheme, + net.JoinHostPort(host, strconv.FormatUint(uint64(c.Port), 10))) } // GetPodInfo gets information about the specified pod. -func (c *HTTPPodInfoGetter) GetPodInfo(host, podNamespace, podID string) (api.PodInfo, error) { +func (c *HTTPKubeletClient) GetPodInfo(host, podNamespace, podID string) (api.PodInfo, error) { request, err := http.NewRequest( "GET", fmt.Sprintf( - "http://%s/podInfo?podID=%s&podNamespace=%s", - net.JoinHostPort(host, strconv.FormatUint(uint64(c.Port), 10)), + "%s/podInfo?podID=%s&podNamespace=%s", + c.url(host), podID, podNamespace), nil) @@ -79,7 +122,11 @@ func (c *HTTPPodInfoGetter) GetPodInfo(host, podNamespace, podID string) (api.Po return info, nil } -// FakePodInfoGetter is a fake implementation of PodInfoGetter. It is useful for testing. +func (c *HTTPKubeletClient) HealthCheck(host string) (health.Status, error) { + return health.DoHTTPCheck(fmt.Sprintf("%s/healthz", c.url(host)), c.Client) +} + +// FakeKubeletClient is a fake implementation of PodInfoGetter. It is useful for testing. type FakePodInfoGetter struct { data api.PodInfo err error diff --git a/pkg/client/podinfo_test.go b/pkg/client/kubelet_test.go similarity index 93% rename from pkg/client/podinfo_test.go rename to pkg/client/kubelet_test.go index 8ca2e006aca..c2ef815074e 100644 --- a/pkg/client/podinfo_test.go +++ b/pkg/client/kubelet_test.go @@ -29,7 +29,7 @@ import ( "github.com/GoogleCloudPlatform/kubernetes/pkg/util" ) -func TestHTTPPodInfoGetter(t *testing.T) { +func TestHTTPKubeletClient(t *testing.T) { expectObj := api.PodInfo{ "myID": api.ContainerStatus{}, } @@ -56,7 +56,7 @@ func TestHTTPPodInfoGetter(t *testing.T) { t.Errorf("unexpected error: %v", err) } - podInfoGetter := &HTTPPodInfoGetter{ + podInfoGetter := &HTTPKubeletClient{ Client: http.DefaultClient, Port: uint(port), } @@ -71,7 +71,7 @@ func TestHTTPPodInfoGetter(t *testing.T) { } } -func TestHTTPPodInfoGetterNotFound(t *testing.T) { +func TestHTTPKubeletClientNotFound(t *testing.T) { expectObj := api.PodInfo{ "myID": api.ContainerStatus{}, } @@ -98,7 +98,7 @@ func TestHTTPPodInfoGetterNotFound(t *testing.T) { t.Errorf("unexpected error: %v", err) } - podInfoGetter := &HTTPPodInfoGetter{ + podInfoGetter := &HTTPKubeletClient{ Client: http.DefaultClient, Port: uint(port), } diff --git a/pkg/master/master.go b/pkg/master/master.go index fdc02ce49b2..538e8341acc 100644 --- a/pkg/master/master.go +++ b/pkg/master/master.go @@ -18,7 +18,6 @@ package master import ( "net" - "net/http" "time" "github.com/GoogleCloudPlatform/kubernetes/pkg/api" @@ -53,7 +52,7 @@ type Config struct { MinionCacheTTL time.Duration EventTTL time.Duration MinionRegexp string - PodInfoGetter client.PodInfoGetter + KubeletClient client.KubeletClient NodeResources api.NodeResources PortalNet *net.IPNet } @@ -110,14 +109,14 @@ func New(c *Config) *Master { func makeMinionRegistry(c *Config) minion.Registry { var minionRegistry minion.Registry = etcd.NewRegistry(c.EtcdHelper, nil) if c.HealthCheckMinions { - minionRegistry = minion.NewHealthyRegistry(minionRegistry, &http.Client{}) + minionRegistry = minion.NewHealthyRegistry(minionRegistry, c.KubeletClient) } return minionRegistry } // init initializes master. func (m *Master) init(c *Config) { - podCache := NewPodCache(c.PodInfoGetter, m.podRegistry) + podCache := NewPodCache(c.KubeletClient, m.podRegistry) go util.Forever(func() { podCache.UpdateAllContainers() }, time.Second*30) if c.Cloud != nil && len(c.MinionRegexp) > 0 { @@ -136,7 +135,7 @@ func (m *Master) init(c *Config) { "pods": pod.NewREST(&pod.RESTConfig{ CloudProvider: c.Cloud, PodCache: podCache, - PodInfoGetter: c.PodInfoGetter, + PodInfoGetter: c.KubeletClient, Registry: m.podRegistry, Minions: m.client, }), diff --git a/pkg/registry/minion/healthy_registry.go b/pkg/registry/minion/healthy_registry.go index 39f339fe82e..6c436029b29 100644 --- a/pkg/registry/minion/healthy_registry.go +++ b/pkg/registry/minion/healthy_registry.go @@ -17,10 +17,8 @@ limitations under the License. package minion import ( - "fmt" - "net/http" - "github.com/GoogleCloudPlatform/kubernetes/pkg/api" + "github.com/GoogleCloudPlatform/kubernetes/pkg/client" "github.com/GoogleCloudPlatform/kubernetes/pkg/health" "github.com/golang/glog" @@ -28,15 +26,13 @@ import ( type HealthyRegistry struct { delegate Registry - client health.HTTPGetInterface - port int + client client.KubeletHealthChecker } -func NewHealthyRegistry(delegate Registry, client *http.Client) Registry { +func NewHealthyRegistry(delegate Registry, client client.KubeletHealthChecker) Registry { return &HealthyRegistry{ delegate: delegate, client: client, - port: 10250, } } @@ -48,7 +44,7 @@ func (r *HealthyRegistry) GetMinion(ctx api.Context, minionID string) (*api.Mini if err != nil { return nil, err } - status, err := health.DoHTTPCheck(r.makeMinionURL(minionID), r.client) + status, err := r.client.HealthCheck(minionID) if err != nil { return nil, err } @@ -73,7 +69,7 @@ func (r *HealthyRegistry) ListMinions(ctx api.Context) (currentMinions *api.Mini return result, err } for _, minion := range list.Items { - status, err := health.DoHTTPCheck(r.makeMinionURL(minion.Name), r.client) + status, err := r.client.HealthCheck(minion.Name) if err != nil { glog.V(1).Infof("%#v failed health check with error: %s", minion, err) continue @@ -86,7 +82,3 @@ func (r *HealthyRegistry) ListMinions(ctx api.Context) (currentMinions *api.Mini } return result, nil } - -func (r *HealthyRegistry) makeMinionURL(minion string) string { - return fmt.Sprintf("http://%s:%d/healthz", minion, r.port) -} diff --git a/pkg/registry/minion/healthy_registry_test.go b/pkg/registry/minion/healthy_registry_test.go index 838ea0d5019..b3c5fd6b026 100644 --- a/pkg/registry/minion/healthy_registry_test.go +++ b/pkg/registry/minion/healthy_registry_test.go @@ -17,27 +17,18 @@ limitations under the License. package minion import ( - "bytes" - "io/ioutil" - "net/http" "reflect" "testing" "github.com/GoogleCloudPlatform/kubernetes/pkg/api" + "github.com/GoogleCloudPlatform/kubernetes/pkg/health" "github.com/GoogleCloudPlatform/kubernetes/pkg/registry/registrytest" ) type alwaysYes struct{} -func fakeHTTPResponse(status int) *http.Response { - return &http.Response{ - StatusCode: status, - Body: ioutil.NopCloser(&bytes.Buffer{}), - } -} - -func (alwaysYes) Get(url string) (*http.Response, error) { - return fakeHTTPResponse(http.StatusOK), nil +func (alwaysYes) HealthCheck(host string) (health.Status, error) { + return health.Healthy, nil } func TestBasicDelegation(t *testing.T) { @@ -80,11 +71,11 @@ type notMinion struct { minion string } -func (n *notMinion) Get(url string) (*http.Response, error) { - if url != "http://"+n.minion+":10250/healthz" { - return fakeHTTPResponse(http.StatusOK), nil +func (n *notMinion) HealthCheck(host string) (health.Status, error) { + if host != n.minion { + return health.Healthy, nil } else { - return fakeHTTPResponse(http.StatusInternalServerError), nil + return health.Unhealthy, nil } } @@ -94,7 +85,6 @@ func TestFiltering(t *testing.T) { healthy := HealthyRegistry{ delegate: mockMinionRegistry, client: ¬Minion{minion: "m1"}, - port: 10250, } expected := []string{"m2", "m3"} list, err := healthy.ListMinions(ctx)