mirror of
https://github.com/k3s-io/kubernetes.git
synced 2025-08-05 18:24:07 +00:00
Merge pull request #1746 from hmrm/refactor-kubelet-access
Refactor kubelet access and add SSL
This commit is contained in:
commit
dd8c49fc47
@ -55,7 +55,6 @@ var (
|
|||||||
cloudProvider = flag.String("cloud_provider", "", "The provider for cloud services. Empty string for no provider.")
|
cloudProvider = flag.String("cloud_provider", "", "The provider for cloud services. Empty string for no provider.")
|
||||||
cloudConfigFile = flag.String("cloud_config", "", "The path to the cloud provider configuration file. Empty string for no configuration file.")
|
cloudConfigFile = flag.String("cloud_config", "", "The path to the cloud provider configuration file. Empty string for no configuration file.")
|
||||||
minionRegexp = flag.String("minion_regexp", "", "If non empty, and -cloud_provider is specified, a regular expression for matching minion VMs.")
|
minionRegexp = flag.String("minion_regexp", "", "If non empty, and -cloud_provider is specified, a regular expression for matching minion VMs.")
|
||||||
minionPort = flag.Uint("minion_port", 10250, "The port at which kubelet will be listening on the minions.")
|
|
||||||
healthCheckMinions = flag.Bool("health_check_minions", true, "If true, health check minions and filter unhealthy ones. Default true.")
|
healthCheckMinions = flag.Bool("health_check_minions", true, "If true, health check minions and filter unhealthy ones. Default true.")
|
||||||
minionCacheTTL = flag.Duration("minion_cache_ttl", 30*time.Second, "Duration of time to cache minion information. Default 30 seconds.")
|
minionCacheTTL = flag.Duration("minion_cache_ttl", 30*time.Second, "Duration of time to cache minion information. Default 30 seconds.")
|
||||||
eventTTL = flag.Duration("event_ttl", 48*time.Hour, "Amount of time to retain events. Default 2 days.")
|
eventTTL = flag.Duration("event_ttl", 48*time.Hour, "Amount of time to retain events. Default 2 days.")
|
||||||
@ -70,6 +69,10 @@ var (
|
|||||||
nodeMilliCPU = flag.Int("node_milli_cpu", 1000, "The amount of MilliCPU provisioned on each node")
|
nodeMilliCPU = flag.Int("node_milli_cpu", 1000, "The amount of MilliCPU provisioned on each node")
|
||||||
nodeMemory = flag.Int("node_memory", 3*1024*1024*1024, "The amount of memory (in bytes) provisioned on each node")
|
nodeMemory = flag.Int("node_memory", 3*1024*1024*1024, "The amount of memory (in bytes) provisioned on each node")
|
||||||
enableLogsSupport = flag.Bool("enable_logs_support", true, "Enables server endpoint for log collection")
|
enableLogsSupport = flag.Bool("enable_logs_support", true, "Enables server endpoint for log collection")
|
||||||
|
kubeletConfig = client.KubeletConfig{
|
||||||
|
Port: 10250,
|
||||||
|
EnableHttps: false,
|
||||||
|
}
|
||||||
)
|
)
|
||||||
|
|
||||||
func init() {
|
func init() {
|
||||||
@ -78,6 +81,7 @@ func init() {
|
|||||||
flag.Var(&machineList, "machines", "List of machines to schedule onto, comma separated.")
|
flag.Var(&machineList, "machines", "List of machines to schedule onto, comma separated.")
|
||||||
flag.Var(&corsAllowedOriginList, "cors_allowed_origins", "List of allowed origins for CORS, comma separated. An allowed origin can be a regular expression to support subdomain matching. If this list is empty CORS will not be enabled.")
|
flag.Var(&corsAllowedOriginList, "cors_allowed_origins", "List of allowed origins for CORS, comma separated. An allowed origin can be a regular expression to support subdomain matching. If this list is empty CORS will not be enabled.")
|
||||||
flag.Var(&portalNet, "portal_net", "A CIDR notation IP range from which to assign portal IPs. This must not overlap with any IP ranges assigned to nodes for pods.")
|
flag.Var(&portalNet, "portal_net", "A CIDR notation IP range from which to assign portal IPs. This must not overlap with any IP ranges assigned to nodes for pods.")
|
||||||
|
client.BindKubeletClientConfigFlags(flag.CommandLine, &kubeletConfig)
|
||||||
}
|
}
|
||||||
|
|
||||||
func verifyMinionFlags() {
|
func verifyMinionFlags() {
|
||||||
@ -163,9 +167,9 @@ func main() {
|
|||||||
|
|
||||||
cloud := initCloudProvider(*cloudProvider, *cloudConfigFile)
|
cloud := initCloudProvider(*cloudProvider, *cloudConfigFile)
|
||||||
|
|
||||||
podInfoGetter := &client.HTTPPodInfoGetter{
|
kubeletClient, err := client.NewKubeletClient(&kubeletConfig)
|
||||||
Client: http.DefaultClient,
|
if err != nil {
|
||||||
Port: *minionPort,
|
glog.Fatalf("Failure to start kubelet client: %v", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
// TODO: expose same flags as client.BindClientConfigFlags but for a server
|
// TODO: expose same flags as client.BindClientConfigFlags but for a server
|
||||||
@ -193,7 +197,7 @@ func main() {
|
|||||||
MinionCacheTTL: *minionCacheTTL,
|
MinionCacheTTL: *minionCacheTTL,
|
||||||
EventTTL: *eventTTL,
|
EventTTL: *eventTTL,
|
||||||
MinionRegexp: *minionRegexp,
|
MinionRegexp: *minionRegexp,
|
||||||
PodInfoGetter: podInfoGetter,
|
KubeletClient: kubeletClient,
|
||||||
NodeResources: api.NodeResources{
|
NodeResources: api.NodeResources{
|
||||||
Capacity: api.ResourceList{
|
Capacity: api.ResourceList{
|
||||||
resources.CPU: util.NewIntOrStringFromInt(*nodeMilliCPU),
|
resources.CPU: util.NewIntOrStringFromInt(*nodeMilliCPU),
|
||||||
|
@ -37,6 +37,7 @@ import (
|
|||||||
"github.com/GoogleCloudPlatform/kubernetes/pkg/apiserver"
|
"github.com/GoogleCloudPlatform/kubernetes/pkg/apiserver"
|
||||||
"github.com/GoogleCloudPlatform/kubernetes/pkg/client"
|
"github.com/GoogleCloudPlatform/kubernetes/pkg/client"
|
||||||
"github.com/GoogleCloudPlatform/kubernetes/pkg/controller"
|
"github.com/GoogleCloudPlatform/kubernetes/pkg/controller"
|
||||||
|
"github.com/GoogleCloudPlatform/kubernetes/pkg/health"
|
||||||
"github.com/GoogleCloudPlatform/kubernetes/pkg/kubelet"
|
"github.com/GoogleCloudPlatform/kubernetes/pkg/kubelet"
|
||||||
"github.com/GoogleCloudPlatform/kubernetes/pkg/kubelet/config"
|
"github.com/GoogleCloudPlatform/kubernetes/pkg/kubelet/config"
|
||||||
"github.com/GoogleCloudPlatform/kubernetes/pkg/kubelet/dockertools"
|
"github.com/GoogleCloudPlatform/kubernetes/pkg/kubelet/dockertools"
|
||||||
@ -58,20 +59,20 @@ var (
|
|||||||
fakeDocker1, fakeDocker2 dockertools.FakeDockerClient
|
fakeDocker1, fakeDocker2 dockertools.FakeDockerClient
|
||||||
)
|
)
|
||||||
|
|
||||||
type fakePodInfoGetter struct{}
|
type fakeKubeletClient struct{}
|
||||||
|
|
||||||
func (fakePodInfoGetter) GetPodInfo(host, podNamespace, podID string) (api.PodInfo, error) {
|
func (fakeKubeletClient) GetPodInfo(host, podNamespace, podID string) (api.PodInfo, error) {
|
||||||
// This is a horrible hack to get around the fact that we can't provide
|
// This is a horrible hack to get around the fact that we can't provide
|
||||||
// different port numbers per kubelet...
|
// different port numbers per kubelet...
|
||||||
var c client.PodInfoGetter
|
var c client.PodInfoGetter
|
||||||
switch host {
|
switch host {
|
||||||
case "localhost":
|
case "localhost":
|
||||||
c = &client.HTTPPodInfoGetter{
|
c = &client.HTTPKubeletClient{
|
||||||
Client: http.DefaultClient,
|
Client: http.DefaultClient,
|
||||||
Port: 10250,
|
Port: 10250,
|
||||||
}
|
}
|
||||||
case "machine":
|
case "machine":
|
||||||
c = &client.HTTPPodInfoGetter{
|
c = &client.HTTPKubeletClient{
|
||||||
Client: http.DefaultClient,
|
Client: http.DefaultClient,
|
||||||
Port: 10251,
|
Port: 10251,
|
||||||
}
|
}
|
||||||
@ -81,6 +82,10 @@ func (fakePodInfoGetter) GetPodInfo(host, podNamespace, podID string) (api.PodIn
|
|||||||
return c.GetPodInfo("localhost", podNamespace, podID)
|
return c.GetPodInfo("localhost", podNamespace, podID)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (fakeKubeletClient) HealthCheck(host string) (health.Status, error) {
|
||||||
|
return health.Healthy, nil
|
||||||
|
}
|
||||||
|
|
||||||
type delegateHandler struct {
|
type delegateHandler struct {
|
||||||
delegate http.Handler
|
delegate http.Handler
|
||||||
}
|
}
|
||||||
@ -131,7 +136,7 @@ func startComponents(manifestURL string) (apiServerURL string) {
|
|||||||
Client: cl,
|
Client: cl,
|
||||||
EtcdHelper: helper,
|
EtcdHelper: helper,
|
||||||
Minions: machineList,
|
Minions: machineList,
|
||||||
PodInfoGetter: fakePodInfoGetter{},
|
KubeletClient: fakeKubeletClient{},
|
||||||
PortalNet: portalNet,
|
PortalNet: portalNet,
|
||||||
})
|
})
|
||||||
mux := http.NewServeMux()
|
mux := http.NewServeMux()
|
||||||
@ -181,7 +186,7 @@ func startComponents(manifestURL string) (apiServerURL string) {
|
|||||||
|
|
||||||
// podsOnMinions returns true when all of the selected pods exist on a minion.
|
// podsOnMinions returns true when all of the selected pods exist on a minion.
|
||||||
func podsOnMinions(c *client.Client, pods api.PodList) wait.ConditionFunc {
|
func podsOnMinions(c *client.Client, pods api.PodList) wait.ConditionFunc {
|
||||||
podInfo := fakePodInfoGetter{}
|
podInfo := fakeKubeletClient{}
|
||||||
return func() (bool, error) {
|
return func() (bool, error) {
|
||||||
for i := range pods.Items {
|
for i := range pods.Items {
|
||||||
host, id, namespace := pods.Items[i].CurrentState.Host, pods.Items[i].Name, pods.Items[i].Namespace
|
host, id, namespace := pods.Items[i].CurrentState.Host, pods.Items[i].Name, pods.Items[i].Namespace
|
||||||
|
@ -71,8 +71,9 @@ ${GO_OUT}/apiserver \
|
|||||||
--port="${API_PORT}" \
|
--port="${API_PORT}" \
|
||||||
--etcd_servers="http://${ETCD_HOST}:${ETCD_PORT}" \
|
--etcd_servers="http://${ETCD_HOST}:${ETCD_PORT}" \
|
||||||
--machines="127.0.0.1" \
|
--machines="127.0.0.1" \
|
||||||
--minion_port=${KUBELET_PORT} \
|
--kubelet_port=${KUBELET_PORT} \
|
||||||
--portal_net="10.0.0.0/24" 1>&2 &
|
--portal_net="10.0.0.0/24" 1>&2 &
|
||||||
|
|
||||||
APISERVER_PID=$!
|
APISERVER_PID=$!
|
||||||
|
|
||||||
wait_for_url "http://127.0.0.1:${API_PORT}/healthz" "apiserver: "
|
wait_for_url "http://127.0.0.1:${API_PORT}/healthz" "apiserver: "
|
||||||
|
@ -21,6 +21,7 @@ package client
|
|||||||
type FlagSet interface {
|
type FlagSet interface {
|
||||||
StringVar(p *string, name, value, usage string)
|
StringVar(p *string, name, value, usage string)
|
||||||
BoolVar(p *bool, name string, value bool, usage string)
|
BoolVar(p *bool, name string, value bool, usage string)
|
||||||
|
UintVar(p *uint, name string, value uint, usage string)
|
||||||
}
|
}
|
||||||
|
|
||||||
// BindClientConfigFlags registers a standard set of CLI flags for connecting to a Kubernetes API server.
|
// BindClientConfigFlags registers a standard set of CLI flags for connecting to a Kubernetes API server.
|
||||||
@ -30,5 +31,13 @@ func BindClientConfigFlags(flags FlagSet, config *Config) {
|
|||||||
flags.BoolVar(&config.Insecure, "insecure_skip_tls_verify", config.Insecure, "If true, the server's certificate will not be checked for validity. This will make your HTTPS connections insecure.")
|
flags.BoolVar(&config.Insecure, "insecure_skip_tls_verify", config.Insecure, "If true, the server's certificate will not be checked for validity. This will make your HTTPS connections insecure.")
|
||||||
flags.StringVar(&config.CertFile, "client_certificate", config.CertFile, "Path to a client key file for TLS.")
|
flags.StringVar(&config.CertFile, "client_certificate", config.CertFile, "Path to a client key file for TLS.")
|
||||||
flags.StringVar(&config.KeyFile, "client_key", config.KeyFile, "Path to a client key file for TLS.")
|
flags.StringVar(&config.KeyFile, "client_key", config.KeyFile, "Path to a client key file for TLS.")
|
||||||
flags.StringVar(&config.CAFile, "certificate_authority", config.CAFile, "Path to a cert. file for the certificate authority")
|
flags.StringVar(&config.CAFile, "certificate_authority", config.CAFile, "Path to a cert. file for the certificate authority.")
|
||||||
|
}
|
||||||
|
|
||||||
|
func BindKubeletClientConfigFlags(flags FlagSet, config *KubeletConfig) {
|
||||||
|
flags.BoolVar(&config.EnableHttps, "kubelet_https", config.EnableHttps, "Use https for kubelet connections")
|
||||||
|
flags.UintVar(&config.Port, "kubelet_port", config.Port, "Kubelet port")
|
||||||
|
flags.StringVar(&config.CertFile, "kubelet_client_certificate", config.CertFile, "Path to a client key file for TLS.")
|
||||||
|
flags.StringVar(&config.KeyFile, "kubelet_client_key", config.KeyFile, "Path to a client key file for TLS.")
|
||||||
|
flags.StringVar(&config.CAFile, "kubelet_certificate_authority", config.CAFile, "Path to a cert. file for the certificate authority.")
|
||||||
}
|
}
|
||||||
|
@ -61,6 +61,19 @@ type Config struct {
|
|||||||
Transport http.RoundTripper
|
Transport http.RoundTripper
|
||||||
}
|
}
|
||||||
|
|
||||||
|
type KubeletConfig struct {
|
||||||
|
// ToDo: Add support for different kubelet instances exposing different ports
|
||||||
|
Port uint
|
||||||
|
EnableHttps bool
|
||||||
|
|
||||||
|
// TLS Configuration, only applies if EnableHttps is true.
|
||||||
|
CertFile string
|
||||||
|
// TLS Configuration, only applies if EnableHttps is true.
|
||||||
|
KeyFile string
|
||||||
|
// TLS Configuration, only applies if EnableHttps is true.
|
||||||
|
CAFile string
|
||||||
|
}
|
||||||
|
|
||||||
// New creates a Kubernetes client for the given config. This client works with pods,
|
// New creates a Kubernetes client for the given config. This client works with pods,
|
||||||
// replication controllers and services. It allows operations such as list, get, update
|
// replication controllers and services. It allows operations such as list, get, update
|
||||||
// and delete on these objects. An error is returned if the provided configuration
|
// and delete on these objects. An error is returned if the provided configuration
|
||||||
|
@ -26,11 +26,23 @@ import (
|
|||||||
"strconv"
|
"strconv"
|
||||||
|
|
||||||
"github.com/GoogleCloudPlatform/kubernetes/pkg/api"
|
"github.com/GoogleCloudPlatform/kubernetes/pkg/api"
|
||||||
|
"github.com/GoogleCloudPlatform/kubernetes/pkg/health"
|
||||||
)
|
)
|
||||||
|
|
||||||
// ErrPodInfoNotAvailable may be returned when the requested pod info is not available.
|
// ErrPodInfoNotAvailable may be returned when the requested pod info is not available.
|
||||||
var ErrPodInfoNotAvailable = errors.New("no pod info available")
|
var ErrPodInfoNotAvailable = errors.New("no pod info available")
|
||||||
|
|
||||||
|
// KubeletClient is an interface for all kubelet functionality
|
||||||
|
type KubeletClient interface {
|
||||||
|
KubeletHealthChecker
|
||||||
|
PodInfoGetter
|
||||||
|
}
|
||||||
|
|
||||||
|
// KubeletHealthchecker is an interface for healthchecking kubelets
|
||||||
|
type KubeletHealthChecker interface {
|
||||||
|
HealthCheck(host string) (health.Status, error)
|
||||||
|
}
|
||||||
|
|
||||||
// PodInfoGetter is an interface for things that can get information about a pod's containers.
|
// PodInfoGetter is an interface for things that can get information about a pod's containers.
|
||||||
// Injectable for easy testing.
|
// Injectable for easy testing.
|
||||||
type PodInfoGetter interface {
|
type PodInfoGetter interface {
|
||||||
@ -39,19 +51,50 @@ type PodInfoGetter interface {
|
|||||||
GetPodInfo(host, podNamespace, podID string) (api.PodInfo, error)
|
GetPodInfo(host, podNamespace, podID string) (api.PodInfo, error)
|
||||||
}
|
}
|
||||||
|
|
||||||
// HTTPPodInfoGetter is the default implementation of PodInfoGetter, accesses the kubelet over HTTP.
|
// HTTPKubeletClient is the default implementation of PodInfoGetter and KubeletHealthchecker, accesses the kubelet over HTTP.
|
||||||
type HTTPPodInfoGetter struct {
|
type HTTPKubeletClient struct {
|
||||||
Client *http.Client
|
Client *http.Client
|
||||||
Port uint
|
Port uint
|
||||||
|
EnableHttps bool
|
||||||
|
}
|
||||||
|
|
||||||
|
func NewKubeletClient(config *KubeletConfig) (KubeletClient, error) {
|
||||||
|
transport := http.DefaultTransport
|
||||||
|
if config.CAFile != "" {
|
||||||
|
t, err := NewClientCertTLSTransport(config.CertFile, config.KeyFile, config.CAFile)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
transport = t
|
||||||
|
}
|
||||||
|
|
||||||
|
c := &http.Client{Transport: transport}
|
||||||
|
return &HTTPKubeletClient{
|
||||||
|
Client: c,
|
||||||
|
Port: config.Port,
|
||||||
|
EnableHttps: config.EnableHttps,
|
||||||
|
}, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (c *HTTPKubeletClient) url(host string) string {
|
||||||
|
scheme := "http://"
|
||||||
|
if c.EnableHttps {
|
||||||
|
scheme = "https://"
|
||||||
|
}
|
||||||
|
|
||||||
|
return fmt.Sprintf(
|
||||||
|
"%s%s",
|
||||||
|
scheme,
|
||||||
|
net.JoinHostPort(host, strconv.FormatUint(uint64(c.Port), 10)))
|
||||||
}
|
}
|
||||||
|
|
||||||
// GetPodInfo gets information about the specified pod.
|
// GetPodInfo gets information about the specified pod.
|
||||||
func (c *HTTPPodInfoGetter) GetPodInfo(host, podNamespace, podID string) (api.PodInfo, error) {
|
func (c *HTTPKubeletClient) GetPodInfo(host, podNamespace, podID string) (api.PodInfo, error) {
|
||||||
request, err := http.NewRequest(
|
request, err := http.NewRequest(
|
||||||
"GET",
|
"GET",
|
||||||
fmt.Sprintf(
|
fmt.Sprintf(
|
||||||
"http://%s/podInfo?podID=%s&podNamespace=%s",
|
"%s/podInfo?podID=%s&podNamespace=%s",
|
||||||
net.JoinHostPort(host, strconv.FormatUint(uint64(c.Port), 10)),
|
c.url(host),
|
||||||
podID,
|
podID,
|
||||||
podNamespace),
|
podNamespace),
|
||||||
nil)
|
nil)
|
||||||
@ -79,7 +122,11 @@ func (c *HTTPPodInfoGetter) GetPodInfo(host, podNamespace, podID string) (api.Po
|
|||||||
return info, nil
|
return info, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// FakePodInfoGetter is a fake implementation of PodInfoGetter. It is useful for testing.
|
func (c *HTTPKubeletClient) HealthCheck(host string) (health.Status, error) {
|
||||||
|
return health.DoHTTPCheck(fmt.Sprintf("%s/healthz", c.url(host)), c.Client)
|
||||||
|
}
|
||||||
|
|
||||||
|
// FakeKubeletClient is a fake implementation of PodInfoGetter. It is useful for testing.
|
||||||
type FakePodInfoGetter struct {
|
type FakePodInfoGetter struct {
|
||||||
data api.PodInfo
|
data api.PodInfo
|
||||||
err error
|
err error
|
@ -29,7 +29,7 @@ import (
|
|||||||
"github.com/GoogleCloudPlatform/kubernetes/pkg/util"
|
"github.com/GoogleCloudPlatform/kubernetes/pkg/util"
|
||||||
)
|
)
|
||||||
|
|
||||||
func TestHTTPPodInfoGetter(t *testing.T) {
|
func TestHTTPKubeletClient(t *testing.T) {
|
||||||
expectObj := api.PodInfo{
|
expectObj := api.PodInfo{
|
||||||
"myID": api.ContainerStatus{},
|
"myID": api.ContainerStatus{},
|
||||||
}
|
}
|
||||||
@ -56,7 +56,7 @@ func TestHTTPPodInfoGetter(t *testing.T) {
|
|||||||
t.Errorf("unexpected error: %v", err)
|
t.Errorf("unexpected error: %v", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
podInfoGetter := &HTTPPodInfoGetter{
|
podInfoGetter := &HTTPKubeletClient{
|
||||||
Client: http.DefaultClient,
|
Client: http.DefaultClient,
|
||||||
Port: uint(port),
|
Port: uint(port),
|
||||||
}
|
}
|
||||||
@ -71,7 +71,7 @@ func TestHTTPPodInfoGetter(t *testing.T) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestHTTPPodInfoGetterNotFound(t *testing.T) {
|
func TestHTTPKubeletClientNotFound(t *testing.T) {
|
||||||
expectObj := api.PodInfo{
|
expectObj := api.PodInfo{
|
||||||
"myID": api.ContainerStatus{},
|
"myID": api.ContainerStatus{},
|
||||||
}
|
}
|
||||||
@ -98,7 +98,7 @@ func TestHTTPPodInfoGetterNotFound(t *testing.T) {
|
|||||||
t.Errorf("unexpected error: %v", err)
|
t.Errorf("unexpected error: %v", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
podInfoGetter := &HTTPPodInfoGetter{
|
podInfoGetter := &HTTPKubeletClient{
|
||||||
Client: http.DefaultClient,
|
Client: http.DefaultClient,
|
||||||
Port: uint(port),
|
Port: uint(port),
|
||||||
}
|
}
|
@ -18,7 +18,6 @@ package master
|
|||||||
|
|
||||||
import (
|
import (
|
||||||
"net"
|
"net"
|
||||||
"net/http"
|
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
"github.com/GoogleCloudPlatform/kubernetes/pkg/api"
|
"github.com/GoogleCloudPlatform/kubernetes/pkg/api"
|
||||||
@ -53,7 +52,7 @@ type Config struct {
|
|||||||
MinionCacheTTL time.Duration
|
MinionCacheTTL time.Duration
|
||||||
EventTTL time.Duration
|
EventTTL time.Duration
|
||||||
MinionRegexp string
|
MinionRegexp string
|
||||||
PodInfoGetter client.PodInfoGetter
|
KubeletClient client.KubeletClient
|
||||||
NodeResources api.NodeResources
|
NodeResources api.NodeResources
|
||||||
PortalNet *net.IPNet
|
PortalNet *net.IPNet
|
||||||
}
|
}
|
||||||
@ -110,14 +109,14 @@ func New(c *Config) *Master {
|
|||||||
func makeMinionRegistry(c *Config) minion.Registry {
|
func makeMinionRegistry(c *Config) minion.Registry {
|
||||||
var minionRegistry minion.Registry = etcd.NewRegistry(c.EtcdHelper, nil)
|
var minionRegistry minion.Registry = etcd.NewRegistry(c.EtcdHelper, nil)
|
||||||
if c.HealthCheckMinions {
|
if c.HealthCheckMinions {
|
||||||
minionRegistry = minion.NewHealthyRegistry(minionRegistry, &http.Client{})
|
minionRegistry = minion.NewHealthyRegistry(minionRegistry, c.KubeletClient)
|
||||||
}
|
}
|
||||||
return minionRegistry
|
return minionRegistry
|
||||||
}
|
}
|
||||||
|
|
||||||
// init initializes master.
|
// init initializes master.
|
||||||
func (m *Master) init(c *Config) {
|
func (m *Master) init(c *Config) {
|
||||||
podCache := NewPodCache(c.PodInfoGetter, m.podRegistry)
|
podCache := NewPodCache(c.KubeletClient, m.podRegistry)
|
||||||
go util.Forever(func() { podCache.UpdateAllContainers() }, time.Second*30)
|
go util.Forever(func() { podCache.UpdateAllContainers() }, time.Second*30)
|
||||||
|
|
||||||
if c.Cloud != nil && len(c.MinionRegexp) > 0 {
|
if c.Cloud != nil && len(c.MinionRegexp) > 0 {
|
||||||
@ -136,7 +135,7 @@ func (m *Master) init(c *Config) {
|
|||||||
"pods": pod.NewREST(&pod.RESTConfig{
|
"pods": pod.NewREST(&pod.RESTConfig{
|
||||||
CloudProvider: c.Cloud,
|
CloudProvider: c.Cloud,
|
||||||
PodCache: podCache,
|
PodCache: podCache,
|
||||||
PodInfoGetter: c.PodInfoGetter,
|
PodInfoGetter: c.KubeletClient,
|
||||||
Registry: m.podRegistry,
|
Registry: m.podRegistry,
|
||||||
Minions: m.client,
|
Minions: m.client,
|
||||||
}),
|
}),
|
||||||
|
@ -17,10 +17,8 @@ limitations under the License.
|
|||||||
package minion
|
package minion
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"fmt"
|
|
||||||
"net/http"
|
|
||||||
|
|
||||||
"github.com/GoogleCloudPlatform/kubernetes/pkg/api"
|
"github.com/GoogleCloudPlatform/kubernetes/pkg/api"
|
||||||
|
"github.com/GoogleCloudPlatform/kubernetes/pkg/client"
|
||||||
"github.com/GoogleCloudPlatform/kubernetes/pkg/health"
|
"github.com/GoogleCloudPlatform/kubernetes/pkg/health"
|
||||||
|
|
||||||
"github.com/golang/glog"
|
"github.com/golang/glog"
|
||||||
@ -28,15 +26,13 @@ import (
|
|||||||
|
|
||||||
type HealthyRegistry struct {
|
type HealthyRegistry struct {
|
||||||
delegate Registry
|
delegate Registry
|
||||||
client health.HTTPGetInterface
|
client client.KubeletHealthChecker
|
||||||
port int
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func NewHealthyRegistry(delegate Registry, client *http.Client) Registry {
|
func NewHealthyRegistry(delegate Registry, client client.KubeletHealthChecker) Registry {
|
||||||
return &HealthyRegistry{
|
return &HealthyRegistry{
|
||||||
delegate: delegate,
|
delegate: delegate,
|
||||||
client: client,
|
client: client,
|
||||||
port: 10250,
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -48,7 +44,7 @@ func (r *HealthyRegistry) GetMinion(ctx api.Context, minionID string) (*api.Mini
|
|||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
status, err := health.DoHTTPCheck(r.makeMinionURL(minionID), r.client)
|
status, err := r.client.HealthCheck(minionID)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
@ -73,7 +69,7 @@ func (r *HealthyRegistry) ListMinions(ctx api.Context) (currentMinions *api.Mini
|
|||||||
return result, err
|
return result, err
|
||||||
}
|
}
|
||||||
for _, minion := range list.Items {
|
for _, minion := range list.Items {
|
||||||
status, err := health.DoHTTPCheck(r.makeMinionURL(minion.Name), r.client)
|
status, err := r.client.HealthCheck(minion.Name)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
glog.V(1).Infof("%#v failed health check with error: %s", minion, err)
|
glog.V(1).Infof("%#v failed health check with error: %s", minion, err)
|
||||||
continue
|
continue
|
||||||
@ -86,7 +82,3 @@ func (r *HealthyRegistry) ListMinions(ctx api.Context) (currentMinions *api.Mini
|
|||||||
}
|
}
|
||||||
return result, nil
|
return result, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (r *HealthyRegistry) makeMinionURL(minion string) string {
|
|
||||||
return fmt.Sprintf("http://%s:%d/healthz", minion, r.port)
|
|
||||||
}
|
|
||||||
|
@ -17,27 +17,18 @@ limitations under the License.
|
|||||||
package minion
|
package minion
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"bytes"
|
|
||||||
"io/ioutil"
|
|
||||||
"net/http"
|
|
||||||
"reflect"
|
"reflect"
|
||||||
"testing"
|
"testing"
|
||||||
|
|
||||||
"github.com/GoogleCloudPlatform/kubernetes/pkg/api"
|
"github.com/GoogleCloudPlatform/kubernetes/pkg/api"
|
||||||
|
"github.com/GoogleCloudPlatform/kubernetes/pkg/health"
|
||||||
"github.com/GoogleCloudPlatform/kubernetes/pkg/registry/registrytest"
|
"github.com/GoogleCloudPlatform/kubernetes/pkg/registry/registrytest"
|
||||||
)
|
)
|
||||||
|
|
||||||
type alwaysYes struct{}
|
type alwaysYes struct{}
|
||||||
|
|
||||||
func fakeHTTPResponse(status int) *http.Response {
|
func (alwaysYes) HealthCheck(host string) (health.Status, error) {
|
||||||
return &http.Response{
|
return health.Healthy, nil
|
||||||
StatusCode: status,
|
|
||||||
Body: ioutil.NopCloser(&bytes.Buffer{}),
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func (alwaysYes) Get(url string) (*http.Response, error) {
|
|
||||||
return fakeHTTPResponse(http.StatusOK), nil
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestBasicDelegation(t *testing.T) {
|
func TestBasicDelegation(t *testing.T) {
|
||||||
@ -80,11 +71,11 @@ type notMinion struct {
|
|||||||
minion string
|
minion string
|
||||||
}
|
}
|
||||||
|
|
||||||
func (n *notMinion) Get(url string) (*http.Response, error) {
|
func (n *notMinion) HealthCheck(host string) (health.Status, error) {
|
||||||
if url != "http://"+n.minion+":10250/healthz" {
|
if host != n.minion {
|
||||||
return fakeHTTPResponse(http.StatusOK), nil
|
return health.Healthy, nil
|
||||||
} else {
|
} else {
|
||||||
return fakeHTTPResponse(http.StatusInternalServerError), nil
|
return health.Unhealthy, nil
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -94,7 +85,6 @@ func TestFiltering(t *testing.T) {
|
|||||||
healthy := HealthyRegistry{
|
healthy := HealthyRegistry{
|
||||||
delegate: mockMinionRegistry,
|
delegate: mockMinionRegistry,
|
||||||
client: ¬Minion{minion: "m1"},
|
client: ¬Minion{minion: "m1"},
|
||||||
port: 10250,
|
|
||||||
}
|
}
|
||||||
expected := []string{"m2", "m3"}
|
expected := []string{"m2", "m3"}
|
||||||
list, err := healthy.ListMinions(ctx)
|
list, err := healthy.ListMinions(ctx)
|
||||||
|
Loading…
Reference in New Issue
Block a user