diff --git a/pkg/apiserver/apiserver.go b/pkg/apiserver/apiserver.go index f6d0b725f78..6166202befc 100644 --- a/pkg/apiserver/apiserver.go +++ b/pkg/apiserver/apiserver.go @@ -85,6 +85,15 @@ func NewAPIGroup(storage map[string]RESTStorage, codec runtime.Codec, canonicalP }} } +func InstallValidator(mux Mux, servers map[string]Server) { + validator, err := NewValidator(servers) + if err != nil { + glog.Errorf("failed to set up validator: %v", err) + return + } + mux.Handle("/validate", validator) +} + // InstallREST registers the REST handlers (storage, watch, and operations) into a mux. // It is expected that the provided prefix will serve all operations. Path MUST NOT end // in a slash. @@ -99,16 +108,6 @@ func (g *APIGroup) InstallREST(mux Mux, paths ...string) { redirectHandler := &RedirectHandler{g.handler.storage, g.handler.codec} opHandler := &OperationHandler{g.handler.ops, g.handler.codec} - servers := map[string]string{ - "controller-manager": "127.0.0.1:10252", - "scheduler": "127.0.0.1:10251", - // TODO: Add minion health checks here too. - } - validator, err := NewValidator(servers) - if err != nil { - glog.Errorf("failed to set up validator: %v", err) - validator = nil - } for _, prefix := range paths { prefix = strings.TrimRight(prefix, "/") proxyHandler := &ProxyHandler{prefix + "/proxy/", g.handler.storage, g.handler.codec} @@ -119,9 +118,6 @@ func (g *APIGroup) InstallREST(mux Mux, paths ...string) { mux.Handle(prefix+"/redirect/", http.StripPrefix(prefix+"/redirect/", redirectHandler)) mux.Handle(prefix+"/operations", http.StripPrefix(prefix+"/operations", opHandler)) mux.Handle(prefix+"/operations/", http.StripPrefix(prefix+"/operations/", opHandler)) - if validator != nil { - mux.Handle(prefix+"/validate", validator) - } } } diff --git a/pkg/apiserver/validator.go b/pkg/apiserver/validator.go index 4926f1a92a5..d8a918e6b38 100644 --- a/pkg/apiserver/validator.go +++ b/pkg/apiserver/validator.go @@ -20,7 +20,6 @@ import ( "encoding/json" "fmt" "io/ioutil" - "log" "net" "net/http" "strconv" @@ -33,20 +32,21 @@ type httpGet interface { Get(url string) (*http.Response, error) } -type server struct { - addr string - port int +type Server struct { + Addr string + Port int + Path string } // validator is responsible for validating the cluster and serving type validator struct { // a list of servers to health check - servers map[string]server + servers map[string]Server client httpGet } -func (s *server) check(client httpGet) (health.Status, string, error) { - resp, err := client.Get("http://" + net.JoinHostPort(s.addr, strconv.Itoa(s.port)) + "/healthz") +func (s *Server) check(client httpGet) (health.Status, string, error) { + resp, err := client.Get("http://" + net.JoinHostPort(s.Addr, strconv.Itoa(s.Port)) + s.Path) if err != nil { return health.Unknown, "", err } @@ -82,8 +82,7 @@ func (v *validator) ServeHTTP(w http.ResponseWriter, r *http.Request) { } reply = append(reply, ServerStatus{name, status.String(), status, msg, errorMsg}) } - data, err := json.Marshal(reply) - log.Printf("FOO: %s", string(data)) + data, err := json.MarshalIndent(reply, "", " ") if err != nil { w.WriteHeader(http.StatusInternalServerError) w.Write([]byte(err.Error())) @@ -94,8 +93,15 @@ func (v *validator) ServeHTTP(w http.ResponseWriter, r *http.Request) { } // NewValidator creates a validator for a set of servers. -func NewValidator(servers map[string]string) (http.Handler, error) { - result := map[string]server{} +func NewValidator(servers map[string]Server) (http.Handler, error) { + return &validator{ + servers: servers, + client: &http.Client{}, + }, nil +} + +func makeTestValidator(servers map[string]string, get httpGet) (http.Handler, error) { + result := map[string]Server{} for name, value := range servers { host, port, err := net.SplitHostPort(value) if err != nil { @@ -105,16 +111,10 @@ func NewValidator(servers map[string]string) (http.Handler, error) { if err != nil { return nil, fmt.Errorf("invalid server spec: %s (%v)", port, err) } - result[name] = server{host, val} + result[name] = Server{Addr: host, Port: val, Path: "/healthz"} } - return &validator{ - servers: result, - client: &http.Client{}, - }, nil -} -func makeTestValidator(servers map[string]string, get httpGet) (http.Handler, error) { - v, e := NewValidator(servers) + v, e := NewValidator(result) if e == nil { v.(*validator).client = get } diff --git a/pkg/apiserver/validator_test.go b/pkg/apiserver/validator_test.go index 9c4bcf1d38d..ee4d5f03b86 100644 --- a/pkg/apiserver/validator_test.go +++ b/pkg/apiserver/validator_test.go @@ -63,12 +63,12 @@ func TestValidate(t *testing.T) { {nil, "foo", health.Unhealthy, 500, true}, } - s := server{addr: "foo.com", port: 8080} + s := Server{Addr: "foo.com", Port: 8080, Path: "/healthz"} for _, test := range tests { fake := makeFake(test.data, test.code, test.err) status, data, err := s.check(fake) - expect := fmt.Sprintf("http://%s:%d/healthz", s.addr, s.port) + expect := fmt.Sprintf("http://%s:%d/healthz", s.Addr, s.Port) if fake.url != expect { t.Errorf("expected %s, got %s", expect, fake.url) } diff --git a/pkg/master/master.go b/pkg/master/master.go index f13a33b12bc..b79671310d4 100644 --- a/pkg/master/master.go +++ b/pkg/master/master.go @@ -17,12 +17,15 @@ limitations under the License. package master import ( + "fmt" "net" "net/http" + "net/url" "strconv" "strings" "time" + "github.com/GoogleCloudPlatform/kubernetes/pkg/api" "github.com/GoogleCloudPlatform/kubernetes/pkg/api/latest" "github.com/GoogleCloudPlatform/kubernetes/pkg/api/v1beta1" "github.com/GoogleCloudPlatform/kubernetes/pkg/api/v1beta2" @@ -306,6 +309,9 @@ func (m *Master) init(c *Config) { versionHandler := apiserver.APIVersionHandler("v1beta1", "v1beta2") m.mux.Handle(c.APIPrefix, versionHandler) apiserver.InstallSupport(m.mux) + serversToValidate := m.getServersToValidate(c) + + apiserver.InstallValidator(m.mux, serversToValidate) if c.EnableLogsSupport { apiserver.InstallLogsSupport(m.mux) } @@ -340,6 +346,43 @@ func (m *Master) init(c *Config) { m.masterServices.Start() } +func (m *Master) getServersToValidate(c *Config) map[string]apiserver.Server { + serversToValidate := map[string]apiserver.Server{ + "controller-manager": {Addr: "127.0.0.1", Port: 10252, Path: "/healthz"}, + "scheduler": {Addr: "127.0.0.1", Port: 10251, Path: "/healthz"}, + } + for ix, machine := range c.EtcdHelper.Client.GetCluster() { + etcdUrl, err := url.Parse(machine) + if err != nil { + glog.Errorf("Failed to parse etcd url for validation: %v", err) + continue + } + var port int + var addr string + if strings.Contains(etcdUrl.Host, ":") { + var portString string + addr, portString, err = net.SplitHostPort(etcdUrl.Host) + if err != nil { + glog.Errorf("Failed to split host/port: %s (%v)", etcdUrl.Host, err) + continue + } + port, _ = strconv.Atoi(portString) + } else { + addr = etcdUrl.Host + port = 4001 + } + serversToValidate[fmt.Sprintf("etcd-%d", ix)] = apiserver.Server{Addr: addr, Port: port, Path: "/v2/keys/"} + } + nodes, err := m.minionRegistry.ListMinions(api.NewDefaultContext()) + if err != nil { + glog.Errorf("Failed to list minions: %v", err) + } + for ix, node := range nodes.Items { + serversToValidate[fmt.Sprintf("node-%d", ix)] = apiserver.Server{Addr: node.HostIP, Port: 10250, Path: "/healthz"} + } + return serversToValidate +} + // API_v1beta1 returns the resources and codec for API version v1beta1. func (m *Master) API_v1beta1() (map[string]apiserver.RESTStorage, runtime.Codec, string, runtime.SelfLinker) { storage := make(map[string]apiserver.RESTStorage) diff --git a/pkg/master/master_test.go b/pkg/master/master_test.go new file mode 100644 index 00000000000..77492d7c616 --- /dev/null +++ b/pkg/master/master_test.go @@ -0,0 +1,47 @@ +/* +Copyright 2014 Google Inc. All rights reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package master + +import ( + "testing" + + "github.com/GoogleCloudPlatform/kubernetes/pkg/api" + "github.com/GoogleCloudPlatform/kubernetes/pkg/api/latest" + "github.com/GoogleCloudPlatform/kubernetes/pkg/registry/registrytest" + "github.com/GoogleCloudPlatform/kubernetes/pkg/tools" +) + +func TestGetServersToValidate(t *testing.T) { + master := Master{} + config := Config{} + fakeClient := tools.NewFakeEtcdClient(t) + fakeClient.Machines = []string{"http://machine1:4001", "http://machine2", "http://machine3:4003"} + config.EtcdHelper = tools.EtcdHelper{fakeClient, latest.Codec, nil} + + master.minionRegistry = registrytest.NewMinionRegistry([]string{"node1", "node2"}, api.NodeResources{}) + + servers := master.getServersToValidate(&config) + + if len(servers) != 7 { + t.Errorf("unexpected server list: %#v", servers) + } + for _, server := range []string{"scheduler", "controller-manager", "etcd-0", "etcd-1", "etcd-2", "node-0", "node-1"} { + if _, ok := servers[server]; !ok { + t.Errorf("server list missing: %s", server) + } + } +} diff --git a/pkg/tools/etcd_tools.go b/pkg/tools/etcd_tools.go index 0a5c0a31d27..b38d0d8be85 100644 --- a/pkg/tools/etcd_tools.go +++ b/pkg/tools/etcd_tools.go @@ -43,6 +43,7 @@ var ( // EtcdClient is an injectable interface for testing. type EtcdClient interface { + GetCluster() []string AddChild(key, data string, ttl uint64) (*etcd.Response, error) Get(key string, sort, recursive bool) (*etcd.Response, error) Set(key, value string, ttl uint64) (*etcd.Response, error) @@ -56,6 +57,7 @@ type EtcdClient interface { // EtcdGetSet interface exposes only the etcd operations needed by EtcdHelper. type EtcdGetSet interface { + GetCluster() []string Get(key string, sort, recursive bool) (*etcd.Response, error) Set(key, value string, ttl uint64) (*etcd.Response, error) Create(key, value string, ttl uint64) (*etcd.Response, error) diff --git a/pkg/tools/fake_etcd_client.go b/pkg/tools/fake_etcd_client.go index 46ee2b52f26..811f4c8004e 100644 --- a/pkg/tools/fake_etcd_client.go +++ b/pkg/tools/fake_etcd_client.go @@ -50,6 +50,7 @@ type FakeEtcdClient struct { TestIndex bool ChangeIndex uint64 LastSetTTL uint64 + Machines []string // Will become valid after Watch is called; tester may write to it. Tester may // also read from it to verify that it's closed after injecting an error. @@ -83,6 +84,10 @@ func NewFakeEtcdClient(t TestLogger) *FakeEtcdClient { return ret } +func (f *FakeEtcdClient) GetCluster() []string { + return f.Machines +} + func (f *FakeEtcdClient) ExpectNotFoundGet(key string) { f.expectNotFoundGetSet[key] = struct{}{} }