Merge pull request #2126 from brendandburns/validatez

Add etcd to the list of services to validate.
This commit is contained in:
Clayton Coleman 2014-11-10 14:53:41 -05:00
commit 21a6e96418
7 changed files with 127 additions and 34 deletions

View File

@ -85,6 +85,15 @@ func NewAPIGroup(storage map[string]RESTStorage, codec runtime.Codec, canonicalP
}}
}
func InstallValidator(mux Mux, servers map[string]Server) {
validator, err := NewValidator(servers)
if err != nil {
glog.Errorf("failed to set up validator: %v", err)
return
}
mux.Handle("/validate", validator)
}
// InstallREST registers the REST handlers (storage, watch, and operations) into a mux.
// It is expected that the provided prefix will serve all operations. Path MUST NOT end
// in a slash.
@ -99,16 +108,6 @@ func (g *APIGroup) InstallREST(mux Mux, paths ...string) {
redirectHandler := &RedirectHandler{g.handler.storage, g.handler.codec}
opHandler := &OperationHandler{g.handler.ops, g.handler.codec}
servers := map[string]string{
"controller-manager": "127.0.0.1:10252",
"scheduler": "127.0.0.1:10251",
// TODO: Add minion health checks here too.
}
validator, err := NewValidator(servers)
if err != nil {
glog.Errorf("failed to set up validator: %v", err)
validator = nil
}
for _, prefix := range paths {
prefix = strings.TrimRight(prefix, "/")
proxyHandler := &ProxyHandler{prefix + "/proxy/", g.handler.storage, g.handler.codec}
@ -119,9 +118,6 @@ func (g *APIGroup) InstallREST(mux Mux, paths ...string) {
mux.Handle(prefix+"/redirect/", http.StripPrefix(prefix+"/redirect/", redirectHandler))
mux.Handle(prefix+"/operations", http.StripPrefix(prefix+"/operations", opHandler))
mux.Handle(prefix+"/operations/", http.StripPrefix(prefix+"/operations/", opHandler))
if validator != nil {
mux.Handle(prefix+"/validate", validator)
}
}
}

View File

@ -20,7 +20,6 @@ import (
"encoding/json"
"fmt"
"io/ioutil"
"log"
"net"
"net/http"
"strconv"
@ -33,20 +32,21 @@ type httpGet interface {
Get(url string) (*http.Response, error)
}
type server struct {
addr string
port int
type Server struct {
Addr string
Port int
Path string
}
// validator is responsible for validating the cluster and serving
type validator struct {
// a list of servers to health check
servers map[string]server
servers map[string]Server
client httpGet
}
func (s *server) check(client httpGet) (health.Status, string, error) {
resp, err := client.Get("http://" + net.JoinHostPort(s.addr, strconv.Itoa(s.port)) + "/healthz")
func (s *Server) check(client httpGet) (health.Status, string, error) {
resp, err := client.Get("http://" + net.JoinHostPort(s.Addr, strconv.Itoa(s.Port)) + s.Path)
if err != nil {
return health.Unknown, "", err
}
@ -82,8 +82,7 @@ func (v *validator) ServeHTTP(w http.ResponseWriter, r *http.Request) {
}
reply = append(reply, ServerStatus{name, status.String(), status, msg, errorMsg})
}
data, err := json.Marshal(reply)
log.Printf("FOO: %s", string(data))
data, err := json.MarshalIndent(reply, "", " ")
if err != nil {
w.WriteHeader(http.StatusInternalServerError)
w.Write([]byte(err.Error()))
@ -94,8 +93,15 @@ func (v *validator) ServeHTTP(w http.ResponseWriter, r *http.Request) {
}
// NewValidator creates a validator for a set of servers.
func NewValidator(servers map[string]string) (http.Handler, error) {
result := map[string]server{}
func NewValidator(servers map[string]Server) (http.Handler, error) {
return &validator{
servers: servers,
client: &http.Client{},
}, nil
}
func makeTestValidator(servers map[string]string, get httpGet) (http.Handler, error) {
result := map[string]Server{}
for name, value := range servers {
host, port, err := net.SplitHostPort(value)
if err != nil {
@ -105,16 +111,10 @@ func NewValidator(servers map[string]string) (http.Handler, error) {
if err != nil {
return nil, fmt.Errorf("invalid server spec: %s (%v)", port, err)
}
result[name] = server{host, val}
result[name] = Server{Addr: host, Port: val, Path: "/healthz"}
}
return &validator{
servers: result,
client: &http.Client{},
}, nil
}
func makeTestValidator(servers map[string]string, get httpGet) (http.Handler, error) {
v, e := NewValidator(servers)
v, e := NewValidator(result)
if e == nil {
v.(*validator).client = get
}

View File

@ -63,12 +63,12 @@ func TestValidate(t *testing.T) {
{nil, "foo", health.Unhealthy, 500, true},
}
s := server{addr: "foo.com", port: 8080}
s := Server{Addr: "foo.com", Port: 8080, Path: "/healthz"}
for _, test := range tests {
fake := makeFake(test.data, test.code, test.err)
status, data, err := s.check(fake)
expect := fmt.Sprintf("http://%s:%d/healthz", s.addr, s.port)
expect := fmt.Sprintf("http://%s:%d/healthz", s.Addr, s.Port)
if fake.url != expect {
t.Errorf("expected %s, got %s", expect, fake.url)
}

View File

@ -17,12 +17,15 @@ limitations under the License.
package master
import (
"fmt"
"net"
"net/http"
"net/url"
"strconv"
"strings"
"time"
"github.com/GoogleCloudPlatform/kubernetes/pkg/api"
"github.com/GoogleCloudPlatform/kubernetes/pkg/api/latest"
"github.com/GoogleCloudPlatform/kubernetes/pkg/api/v1beta1"
"github.com/GoogleCloudPlatform/kubernetes/pkg/api/v1beta2"
@ -306,6 +309,9 @@ func (m *Master) init(c *Config) {
versionHandler := apiserver.APIVersionHandler("v1beta1", "v1beta2")
m.mux.Handle(c.APIPrefix, versionHandler)
apiserver.InstallSupport(m.mux)
serversToValidate := m.getServersToValidate(c)
apiserver.InstallValidator(m.mux, serversToValidate)
if c.EnableLogsSupport {
apiserver.InstallLogsSupport(m.mux)
}
@ -340,6 +346,43 @@ func (m *Master) init(c *Config) {
m.masterServices.Start()
}
func (m *Master) getServersToValidate(c *Config) map[string]apiserver.Server {
serversToValidate := map[string]apiserver.Server{
"controller-manager": {Addr: "127.0.0.1", Port: 10252, Path: "/healthz"},
"scheduler": {Addr: "127.0.0.1", Port: 10251, Path: "/healthz"},
}
for ix, machine := range c.EtcdHelper.Client.GetCluster() {
etcdUrl, err := url.Parse(machine)
if err != nil {
glog.Errorf("Failed to parse etcd url for validation: %v", err)
continue
}
var port int
var addr string
if strings.Contains(etcdUrl.Host, ":") {
var portString string
addr, portString, err = net.SplitHostPort(etcdUrl.Host)
if err != nil {
glog.Errorf("Failed to split host/port: %s (%v)", etcdUrl.Host, err)
continue
}
port, _ = strconv.Atoi(portString)
} else {
addr = etcdUrl.Host
port = 4001
}
serversToValidate[fmt.Sprintf("etcd-%d", ix)] = apiserver.Server{Addr: addr, Port: port, Path: "/v2/keys/"}
}
nodes, err := m.minionRegistry.ListMinions(api.NewDefaultContext())
if err != nil {
glog.Errorf("Failed to list minions: %v", err)
}
for ix, node := range nodes.Items {
serversToValidate[fmt.Sprintf("node-%d", ix)] = apiserver.Server{Addr: node.HostIP, Port: 10250, Path: "/healthz"}
}
return serversToValidate
}
// API_v1beta1 returns the resources and codec for API version v1beta1.
func (m *Master) API_v1beta1() (map[string]apiserver.RESTStorage, runtime.Codec, string, runtime.SelfLinker) {
storage := make(map[string]apiserver.RESTStorage)

47
pkg/master/master_test.go Normal file
View File

@ -0,0 +1,47 @@
/*
Copyright 2014 Google Inc. All rights reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package master
import (
"testing"
"github.com/GoogleCloudPlatform/kubernetes/pkg/api"
"github.com/GoogleCloudPlatform/kubernetes/pkg/api/latest"
"github.com/GoogleCloudPlatform/kubernetes/pkg/registry/registrytest"
"github.com/GoogleCloudPlatform/kubernetes/pkg/tools"
)
func TestGetServersToValidate(t *testing.T) {
master := Master{}
config := Config{}
fakeClient := tools.NewFakeEtcdClient(t)
fakeClient.Machines = []string{"http://machine1:4001", "http://machine2", "http://machine3:4003"}
config.EtcdHelper = tools.EtcdHelper{fakeClient, latest.Codec, nil}
master.minionRegistry = registrytest.NewMinionRegistry([]string{"node1", "node2"}, api.NodeResources{})
servers := master.getServersToValidate(&config)
if len(servers) != 7 {
t.Errorf("unexpected server list: %#v", servers)
}
for _, server := range []string{"scheduler", "controller-manager", "etcd-0", "etcd-1", "etcd-2", "node-0", "node-1"} {
if _, ok := servers[server]; !ok {
t.Errorf("server list missing: %s", server)
}
}
}

View File

@ -43,6 +43,7 @@ var (
// EtcdClient is an injectable interface for testing.
type EtcdClient interface {
GetCluster() []string
AddChild(key, data string, ttl uint64) (*etcd.Response, error)
Get(key string, sort, recursive bool) (*etcd.Response, error)
Set(key, value string, ttl uint64) (*etcd.Response, error)
@ -56,6 +57,7 @@ type EtcdClient interface {
// EtcdGetSet interface exposes only the etcd operations needed by EtcdHelper.
type EtcdGetSet interface {
GetCluster() []string
Get(key string, sort, recursive bool) (*etcd.Response, error)
Set(key, value string, ttl uint64) (*etcd.Response, error)
Create(key, value string, ttl uint64) (*etcd.Response, error)

View File

@ -50,6 +50,7 @@ type FakeEtcdClient struct {
TestIndex bool
ChangeIndex uint64
LastSetTTL uint64
Machines []string
// Will become valid after Watch is called; tester may write to it. Tester may
// also read from it to verify that it's closed after injecting an error.
@ -83,6 +84,10 @@ func NewFakeEtcdClient(t TestLogger) *FakeEtcdClient {
return ret
}
func (f *FakeEtcdClient) GetCluster() []string {
return f.Machines
}
func (f *FakeEtcdClient) ExpectNotFoundGet(key string) {
f.expectNotFoundGetSet[key] = struct{}{}
}