Merge pull request #456 from brendandburns/health

Move health checking logic out to a utility. Add a minion registry that health checks.
This commit is contained in:
Daniel Smith 2014-07-15 15:39:21 -07:00
commit 3d63d733e3
9 changed files with 323 additions and 44 deletions

18
pkg/health/doc.go Normal file
View File

@ -0,0 +1,18 @@
/*
Copyright 2014 Google Inc. All rights reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
// Package health contains utilities for health checking, as well as health status information.
package health

51
pkg/health/health.go Normal file
View File

@ -0,0 +1,51 @@
/*
Copyright 2014 Google Inc. All rights reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package health
import (
"net/http"
"github.com/golang/glog"
)
type Status int
const (
Healthy Status = iota
Unhealthy
Unknown
)
type HTTPGetInterface interface {
Get(url string) (*http.Response, error)
}
func Check(url string, client HTTPGetInterface) (Status, error) {
res, err := client.Get(url)
if res.Body != nil {
defer res.Body.Close()
}
if err != nil {
return Unknown, err
}
if res.StatusCode >= http.StatusOK && res.StatusCode < http.StatusBadRequest {
return Healthy, nil
} else {
glog.V(1).Infof("Health check failed for %s, Response: %v", url, *res)
return Unhealthy, nil
}
}

View File

@ -14,7 +14,7 @@ See the License for the specific language governing permissions and
limitations under the License. limitations under the License.
*/ */
package kubelet package health
import ( import (
"fmt" "fmt"
@ -25,20 +25,8 @@ import (
"github.com/golang/glog" "github.com/golang/glog"
) )
type HealthCheckStatus int
const (
CheckHealthy HealthCheckStatus = 0
CheckUnhealthy HealthCheckStatus = 1
CheckUnknown HealthCheckStatus = 2
)
type HealthChecker interface { type HealthChecker interface {
HealthCheck(container api.Container) (HealthCheckStatus, error) HealthCheck(container api.Container) (Status, error)
}
type httpDoInterface interface {
Get(string) (*http.Response, error)
} }
// MakeHealthChecker creates a new HealthChecker. // MakeHealthChecker creates a new HealthChecker.
@ -57,18 +45,18 @@ type MuxHealthChecker struct {
checkers map[string]HealthChecker checkers map[string]HealthChecker
} }
func (m *MuxHealthChecker) HealthCheck(container api.Container) (HealthCheckStatus, error) { func (m *MuxHealthChecker) HealthCheck(container api.Container) (Status, error) {
checker, ok := m.checkers[container.LivenessProbe.Type] checker, ok := m.checkers[container.LivenessProbe.Type]
if !ok || checker == nil { if !ok || checker == nil {
glog.Warningf("Failed to find health checker for %s %s", container.Name, container.LivenessProbe.Type) glog.Warningf("Failed to find health checker for %s %s", container.Name, container.LivenessProbe.Type)
return CheckUnknown, nil return Unknown, nil
} }
return checker.HealthCheck(container) return checker.HealthCheck(container)
} }
// HTTPHealthChecker is an implementation of HealthChecker which checks container health by sending HTTP Get requests. // HTTPHealthChecker is an implementation of HealthChecker which checks container health by sending HTTP Get requests.
type HTTPHealthChecker struct { type HTTPHealthChecker struct {
client httpDoInterface client HTTPGetInterface
} }
func (h *HTTPHealthChecker) findPort(container api.Container, portName string) int64 { func (h *HTTPHealthChecker) findPort(container api.Container, portName string) int64 {
@ -81,17 +69,17 @@ func (h *HTTPHealthChecker) findPort(container api.Container, portName string) i
return -1 return -1
} }
func (h *HTTPHealthChecker) HealthCheck(container api.Container) (HealthCheckStatus, error) { func (h *HTTPHealthChecker) HealthCheck(container api.Container) (Status, error) {
params := container.LivenessProbe.HTTPGet params := container.LivenessProbe.HTTPGet
if params == nil { if params == nil {
return CheckUnknown, fmt.Errorf("Error, no HTTP parameters specified: %v", container) return Unknown, fmt.Errorf("Error, no HTTP parameters specified: %v", container)
} }
port := h.findPort(container, params.Port) port := h.findPort(container, params.Port)
if port == -1 { if port == -1 {
var err error var err error
port, err = strconv.ParseInt(params.Port, 10, 0) port, err = strconv.ParseInt(params.Port, 10, 0)
if err != nil { if err != nil {
return CheckUnknown, err return Unknown, err
} }
} }
var host string var host string
@ -101,17 +89,5 @@ func (h *HTTPHealthChecker) HealthCheck(container api.Container) (HealthCheckSta
host = "localhost" host = "localhost"
} }
url := fmt.Sprintf("http://%s:%d%s", host, port, params.Path) url := fmt.Sprintf("http://%s:%d%s", host, port, params.Path)
res, err := h.client.Get(url) return Check(url, h.client)
if res != nil && res.Body != nil {
defer res.Body.Close()
}
if err != nil {
// At this point, if it fails, its either a policy (unlikely) or HTTP protocol (likely) error.
return CheckUnhealthy, nil
}
if res.StatusCode == http.StatusOK {
return CheckHealthy, nil
}
glog.V(1).Infof("Health check failed for %v, Response: %v", container, *res)
return CheckUnhealthy, nil
} }

View File

@ -14,7 +14,7 @@ See the License for the specific language governing permissions and
limitations under the License. limitations under the License.
*/ */
package kubelet package health
import ( import (
"net/http" "net/http"
@ -56,7 +56,7 @@ func TestHttpHealth(t *testing.T) {
} }
ok, err := check.HealthCheck(container) ok, err := check.HealthCheck(container)
if ok != CheckHealthy { if ok != Healthy {
t.Error("Unexpected unhealthy") t.Error("Unexpected unhealthy")
} }
if err != nil { if err != nil {

View File

@ -33,6 +33,7 @@ import (
"time" "time"
"github.com/GoogleCloudPlatform/kubernetes/pkg/api" "github.com/GoogleCloudPlatform/kubernetes/pkg/api"
"github.com/GoogleCloudPlatform/kubernetes/pkg/health"
_ "github.com/GoogleCloudPlatform/kubernetes/pkg/healthz" _ "github.com/GoogleCloudPlatform/kubernetes/pkg/healthz"
"github.com/GoogleCloudPlatform/kubernetes/pkg/tools" "github.com/GoogleCloudPlatform/kubernetes/pkg/tools"
"github.com/GoogleCloudPlatform/kubernetes/pkg/util" "github.com/GoogleCloudPlatform/kubernetes/pkg/util"
@ -68,7 +69,7 @@ type Kubelet struct {
SyncFrequency time.Duration SyncFrequency time.Duration
HTTPCheckFrequency time.Duration HTTPCheckFrequency time.Duration
pullLock sync.Mutex pullLock sync.Mutex
HealthChecker HealthChecker HealthChecker health.HealthChecker
} }
type manifestUpdate struct { type manifestUpdate struct {
@ -133,7 +134,7 @@ func (kl *Kubelet) RunKubelet(dockerEndpoint, configPath, manifestURL, etcdServe
} }
go util.Forever(func() { s.ListenAndServe() }, 0) go util.Forever(func() { s.ListenAndServe() }, 0)
} }
kl.HealthChecker = MakeHealthChecker() kl.HealthChecker = health.MakeHealthChecker()
kl.syncLoop(updateChannel, kl) kl.syncLoop(updateChannel, kl)
} }
@ -624,7 +625,7 @@ func (kl *Kubelet) syncManifest(manifest *api.ContainerManifest, keepChannel cha
glog.V(1).Infof("health check errored: %v", err) glog.V(1).Infof("health check errored: %v", err)
continue continue
} }
if healthy != CheckHealthy { if healthy != health.Healthy {
glog.V(1).Infof("manifest %s container %s is unhealthy.", manifest.ID, container.Name) glog.V(1).Infof("manifest %s container %s is unhealthy.", manifest.ID, container.Name)
if err != nil { if err != nil {
glog.V(1).Infof("Failed to get container info %v, for %s", err, containerID) glog.V(1).Infof("Failed to get container info %v, for %s", err, containerID)
@ -881,16 +882,16 @@ func (kl *Kubelet) GetMachineStats() (*api.ContainerStats, error) {
return kl.statsFromContainerPath("/") return kl.statsFromContainerPath("/")
} }
func (kl *Kubelet) healthy(container api.Container, dockerContainer *docker.APIContainers) (HealthCheckStatus, error) { func (kl *Kubelet) healthy(container api.Container, dockerContainer *docker.APIContainers) (health.Status, error) {
// Give the container 60 seconds to start up. // Give the container 60 seconds to start up.
if container.LivenessProbe == nil { if container.LivenessProbe == nil {
return CheckHealthy, nil return health.Healthy, nil
} }
if time.Now().Unix()-dockerContainer.Created < container.LivenessProbe.InitialDelaySeconds { if time.Now().Unix()-dockerContainer.Created < container.LivenessProbe.InitialDelaySeconds {
return CheckHealthy, nil return health.Healthy, nil
} }
if kl.HealthChecker == nil { if kl.HealthChecker == nil {
return CheckHealthy, nil return health.Healthy, nil
} }
return kl.HealthChecker.HealthCheck(container) return kl.HealthChecker.HealthCheck(container)
} }

View File

@ -26,6 +26,7 @@ import (
"testing" "testing"
"github.com/GoogleCloudPlatform/kubernetes/pkg/api" "github.com/GoogleCloudPlatform/kubernetes/pkg/api"
"github.com/GoogleCloudPlatform/kubernetes/pkg/health"
"github.com/GoogleCloudPlatform/kubernetes/pkg/tools" "github.com/GoogleCloudPlatform/kubernetes/pkg/tools"
"github.com/GoogleCloudPlatform/kubernetes/pkg/util" "github.com/GoogleCloudPlatform/kubernetes/pkg/util"
"github.com/coreos/go-etcd/etcd" "github.com/coreos/go-etcd/etcd"
@ -422,8 +423,8 @@ func TestSyncManifestsDeletes(t *testing.T) {
type FalseHealthChecker struct{} type FalseHealthChecker struct{}
func (f *FalseHealthChecker) HealthCheck(container api.Container) (HealthCheckStatus, error) { func (f *FalseHealthChecker) HealthCheck(container api.Container) (health.Status, error) {
return CheckUnhealthy, nil return health.Unhealthy, nil
} }
func TestSyncManifestsUnhealthy(t *testing.T) { func TestSyncManifestsUnhealthy(t *testing.T) {

View File

@ -0,0 +1,86 @@
/*
Copyright 2014 Google Inc. All rights reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package registry
import (
"fmt"
"net/http"
"github.com/GoogleCloudPlatform/kubernetes/pkg/health"
)
type HealthyMinionRegistry struct {
delegate MinionRegistry
client health.HTTPGetInterface
port int
}
func NewHealthyMinionRegistry(delegate MinionRegistry, client *http.Client) MinionRegistry {
return &HealthyMinionRegistry{
delegate: delegate,
client: client,
port: 10250,
}
}
func (h *HealthyMinionRegistry) makeMinionURL(minion string) string {
return fmt.Sprintf("http://%s:%d/healthz", minion, h.port)
}
func (h *HealthyMinionRegistry) List() (currentMinions []string, err error) {
var result []string
list, err := h.delegate.List()
if err != nil {
return result, err
}
for _, minion := range list {
status, err := health.Check(h.makeMinionURL(minion), h.client)
if err != nil {
return result, err
}
if status == health.Healthy {
result = append(result, minion)
}
}
return result, nil
}
func (h *HealthyMinionRegistry) Insert(minion string) error {
return h.delegate.Insert(minion)
}
func (h *HealthyMinionRegistry) Delete(minion string) error {
return h.delegate.Delete(minion)
}
func (h *HealthyMinionRegistry) Contains(minion string) (bool, error) {
contains, err := h.delegate.Contains(minion)
if err != nil {
return false, err
}
if !contains {
return false, nil
}
status, err := health.Check(h.makeMinionURL(minion), h.client)
if err != nil {
return false, err
}
if status == health.Unhealthy {
return false, nil
}
return true, nil
}

View File

@ -0,0 +1,96 @@
/*
Copyright 2014 Google Inc. All rights reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package registry
import (
"net/http"
"reflect"
"testing"
)
type alwaysYes struct{}
func (alwaysYes) Get(url string) (*http.Response, error) {
return &http.Response{
StatusCode: http.StatusOK,
}, nil
}
func TestBasicDelegation(t *testing.T) {
mockMinionRegistry := MockMinionRegistry{
minions: []string{"m1", "m2", "m3"},
}
healthy := HealthyMinionRegistry{
delegate: &mockMinionRegistry,
client: alwaysYes{},
}
list, err := healthy.List()
expectNoError(t, err)
if !reflect.DeepEqual(list, mockMinionRegistry.minions) {
t.Errorf("Expected %v, Got %v", mockMinionRegistry.minions, list)
}
err = healthy.Insert("foo")
expectNoError(t, err)
ok, err := healthy.Contains("m1")
expectNoError(t, err)
if !ok {
t.Errorf("Unexpected absence of 'm1'")
}
ok, err = healthy.Contains("m5")
expectNoError(t, err)
if ok {
t.Errorf("Unexpected presence of 'm5'")
}
}
type notMinion struct {
minion string
}
func (n *notMinion) Get(url string) (*http.Response, error) {
if url != "http://"+n.minion+":10250/healthz" {
return &http.Response{StatusCode: http.StatusOK}, nil
} else {
return &http.Response{StatusCode: http.StatusInternalServerError}, nil
}
}
func TestFiltering(t *testing.T) {
mockMinionRegistry := MockMinionRegistry{
minions: []string{"m1", "m2", "m3"},
}
healthy := HealthyMinionRegistry{
delegate: &mockMinionRegistry,
client: &notMinion{minion: "m1"},
port: 10250,
}
expected := []string{"m2", "m3"}
list, err := healthy.List()
expectNoError(t, err)
if !reflect.DeepEqual(list, expected) {
t.Errorf("Expected %v, Got %v", expected, list)
}
ok, err := healthy.Contains("m1")
expectNoError(t, err)
if ok {
t.Errorf("Unexpected presence of 'm1'")
}
}

View File

@ -75,3 +75,53 @@ func (registry *MockPodRegistry) DeletePod(podId string) error {
defer registry.Unlock() defer registry.Unlock()
return registry.err return registry.err
} }
type MockMinionRegistry struct {
err error
minion string
minions []string
sync.Mutex
}
func MakeMockMinionRegistry(minions []string) *MockMinionRegistry {
return &MockMinionRegistry{
minions: minions,
}
}
func (registry *MockMinionRegistry) List() ([]string, error) {
registry.Lock()
defer registry.Unlock()
return registry.minions, registry.err
}
func (registry *MockMinionRegistry) Insert(minion string) error {
registry.Lock()
defer registry.Unlock()
registry.minion = minion
return registry.err
}
func (registry *MockMinionRegistry) Contains(minion string) (bool, error) {
registry.Lock()
defer registry.Unlock()
for _, name := range registry.minions {
if name == minion {
return true, registry.err
}
}
return false, registry.err
}
func (registry *MockMinionRegistry) Delete(minion string) error {
registry.Lock()
defer registry.Unlock()
var newList []string
for _, name := range registry.minions {
if name != minion {
newList = append(newList, name)
}
}
registry.minions = newList
return registry.err
}