Merge pull request #875 from brendandburns/exec

Add a in container exec based health check.
This commit is contained in:
Tim Hockin 2014-08-18 12:19:22 -07:00
commit e472752ff9
14 changed files with 207 additions and 26 deletions

View File

@ -21,7 +21,7 @@ DAEMON_ARGS=""
DAEMON_LOG_FILE=/var/log/$NAME.log
PIDFILE=/var/run/$NAME.pid
SCRIPTNAME=/etc/init.d/$NAME
DAEMON_USER=kubelet
DAEMON_USER=root
# Exit if the package is not installed
[ -x "$DAEMON" ] || exit 0

View File

@ -30,6 +30,7 @@ import (
"strings"
"time"
"github.com/GoogleCloudPlatform/kubernetes/pkg/health"
_ "github.com/GoogleCloudPlatform/kubernetes/pkg/healthz"
"github.com/GoogleCloudPlatform/kubernetes/pkg/kubelet"
kconfig "github.com/GoogleCloudPlatform/kubernetes/pkg/kubelet/config"
@ -151,6 +152,10 @@ func main() {
*rootDirectory,
*syncFrequency)
health.AddHealthChecker("exec", health.NewExecHealthChecker(k))
health.AddHealthChecker("http", health.NewHTTPHealthChecker(&http.Client{}))
health.AddHealthChecker("tcp", &health.TCPHealthChecker{})
// start the kubelet
go util.Forever(func() { k.Run(cfg.Updates()) }, 0)

View File

@ -150,6 +150,15 @@ type TCPSocketProbe struct {
Port util.IntOrString `yaml:"port,omitempty" json:"port,omitempty"`
}
// ExecProbe describes a "run in container" health probe.
type ExecProbe struct {
// Command is the command line to execute inside the container, the working directory for the
// command is root ('/') in the container's filesystem. The command is simply exec'd, it is
// not run inside a shell, so traditional shell instructions ('|', etc) won't work. To use
// a shell, you need to explicitly call out to that shell
Command []string `yaml:"command,omitempty" json:"command,omitempty"`
}
// LivenessProbe describes a liveness probe to be examined to the container.
type LivenessProbe struct {
// Type of liveness probe. Current legal values "http", "tcp"
@ -158,6 +167,8 @@ type LivenessProbe struct {
HTTPGet *HTTPGetProbe `yaml:"httpGet,omitempty" json:"httpGet,omitempty"`
// TCPSocketProbe parameter, required if Type == 'tcp'
TCPSocket *TCPSocketProbe `yaml:"tcpSocket,omitempty" json:"tcpSocket,omitempty"`
// ExecProbe parameter, required if Type == 'exec'
Exec *ExecProbe `yaml:"exec,omitempty" json:"exec,omitempty"`
// Length of time before health checking is activated. In seconds.
InitialDelaySeconds int64 `yaml:"initialDelaySeconds,omitempty" json:"initialDelaySeconds,omitempty"`
}

View File

@ -153,6 +153,15 @@ type TCPSocketProbe struct {
Port util.IntOrString `yaml:"port,omitempty" json:"port,omitempty"`
}
// ExecProbe describes a "run in container" health probe.
type ExecProbe struct {
// Command is the command line to execute inside the container, the working directory for the
// command is root ('/') in the container's filesystem. The command is simply exec'd, it is
// not run inside a shell, so traditional shell instructions ('|', etc) won't work. To use
// a shell, you need to explicitly call out to that shell
Command []string `yaml:"command,omitempty" json:"command,omitempty"`
}
// LivenessProbe describes a liveness probe to be examined to the container.
type LivenessProbe struct {
// Type of liveness probe. Current legal values "http", "tcp"
@ -161,6 +170,8 @@ type LivenessProbe struct {
HTTPGet *HTTPGetProbe `yaml:"httpGet,omitempty" json:"httpGet,omitempty"`
// TCPSocketProbe parameter, required if Type == 'tcp'
TCPSocket *TCPSocketProbe `yaml:"tcpSocket,omitempty" json:"tcpSocket,omitempty"`
// ExecProbe parameter, required if Type == 'exec'
Exec *ExecProbe `yaml:"exec,omitempty" json:"exec,omitempty"`
// Length of time before health checking is activated. In seconds.
InitialDelaySeconds int64 `yaml:"initialDelaySeconds,omitempty" json:"initialDelaySeconds,omitempty"`
}

59
pkg/health/exec.go Normal file
View File

@ -0,0 +1,59 @@
/*
Copyright 2014 Google Inc. All rights reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package health
import (
"fmt"
"os/exec"
"github.com/GoogleCloudPlatform/kubernetes/pkg/api"
"github.com/golang/glog"
)
const defaultHealthyRegex = "^OK$"
type CommandRunner interface {
RunInContainer(podFullName, containerName string, cmd []string) ([]byte, error)
}
type ExecHealthChecker struct {
runner CommandRunner
}
func NewExecHealthChecker(runner CommandRunner) HealthChecker {
return &ExecHealthChecker{runner}
}
func IsExitError(err error) bool {
_, ok := err.(*exec.ExitError)
return ok
}
func (e *ExecHealthChecker) HealthCheck(podFullName string, currentState api.PodState, container api.Container) (Status, error) {
if container.LivenessProbe.Exec == nil {
return Unknown, fmt.Errorf("Missing exec parameters")
}
data, err := e.runner.RunInContainer(podFullName, container.Name, container.LivenessProbe.Exec.Command)
glog.V(1).Infof("container %s failed health check: %s", podFullName, string(data))
if err != nil {
if IsExitError(err) {
return Unhealthy, nil
}
return Unknown, err
}
return Healthy, nil
}

90
pkg/health/exec_test.go Normal file
View File

@ -0,0 +1,90 @@
/*
Copyright 2014 Google Inc. All rights reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package health
import (
"fmt"
"os/exec"
"reflect"
"testing"
"github.com/GoogleCloudPlatform/kubernetes/pkg/api"
)
type FakeExec struct {
cmd []string
out []byte
err error
}
func (f *FakeExec) RunInContainer(podFullName, container string, cmd []string) ([]byte, error) {
f.cmd = cmd
return f.out, f.err
}
type healthCheckTest struct {
expectedStatus Status
probe *api.LivenessProbe
expectError bool
output []byte
err error
}
func TestExec(t *testing.T) {
fake := FakeExec{}
checker := ExecHealthChecker{&fake}
tests := []healthCheckTest{
// Missing parameters
{Unknown, &api.LivenessProbe{Type: "exec"}, true, nil, nil},
// Ok
{Healthy, &api.LivenessProbe{
Type: "exec",
Exec: &api.ExecProbe{Command: []string{"ls", "-l"}},
}, false, []byte("OK"), nil},
// Run returns error
{Unknown, &api.LivenessProbe{
Type: "exec",
Exec: &api.ExecProbe{
Command: []string{"ls", "-l"},
},
}, true, []byte("OK, NOT"), fmt.Errorf("test error")},
// Command error
{Unhealthy, &api.LivenessProbe{
Type: "exec",
Exec: &api.ExecProbe{
Command: []string{"ls", "-l"},
},
}, false, []byte{}, &exec.ExitError{}},
}
for _, test := range tests {
fake.out = test.output
fake.err = test.err
status, err := checker.HealthCheck("test", api.PodState{}, api.Container{LivenessProbe: test.probe})
if status != test.expectedStatus {
t.Errorf("expected %v, got %v", test.expectedStatus, status)
}
if err != nil && test.expectError == false {
t.Errorf("unexpected error: %v", err)
}
if err == nil && test.expectError == true {
t.Errorf("unexpected non-error")
}
if test.probe.Exec != nil && !reflect.DeepEqual(fake.cmd, test.probe.Exec.Command) {
t.Errorf("expected: %v, got %v", test.probe.Exec.Command, fake.cmd)
}
}
}

View File

@ -17,8 +17,6 @@ limitations under the License.
package health
import (
"net/http"
"github.com/GoogleCloudPlatform/kubernetes/pkg/api"
"github.com/golang/glog"
)
@ -35,18 +33,19 @@ const (
// HealthChecker defines an abstract interface for checking container health.
type HealthChecker interface {
HealthCheck(currentState api.PodState, container api.Container) (Status, error)
HealthCheck(podFullName string, currentState api.PodState, container api.Container) (Status, error)
}
var checkers = map[string]HealthChecker{}
func AddHealthChecker(key string, checker HealthChecker) {
checkers[key] = checker
}
// NewHealthChecker creates a new HealthChecker which supports multiple types of liveness probes.
func NewHealthChecker() HealthChecker {
return &muxHealthChecker{
checkers: map[string]HealthChecker{
"http": &HTTPHealthChecker{
client: &http.Client{},
},
"tcp": &TCPHealthChecker{},
},
checkers: checkers,
}
}
@ -58,13 +57,13 @@ type muxHealthChecker struct {
// HealthCheck delegates the health-checking of the container to one of the bundled implementations.
// It chooses an implementation according to container.LivenessProbe.Type.
// If there is no matching health checker it returns Unknown, nil.
func (m *muxHealthChecker) HealthCheck(currentState api.PodState, container api.Container) (Status, error) {
func (m *muxHealthChecker) HealthCheck(podFullName string, currentState api.PodState, container api.Container) (Status, error) {
checker, ok := m.checkers[container.LivenessProbe.Type]
if !ok || checker == nil {
glog.Warningf("Failed to find health checker for %s %s", container.Name, container.LivenessProbe.Type)
return Unknown, nil
}
return checker.HealthCheck(currentState, container)
return checker.HealthCheck(podFullName, currentState, container)
}
// A helper function to look up a port in a container by name.

View File

@ -30,6 +30,7 @@ import (
const statusServerEarlyShutdown = -1
func TestHealthChecker(t *testing.T) {
AddHealthChecker("http", &HTTPHealthChecker{client: &http.Client{}})
var healthCheckerTests = []struct {
status int
health Status
@ -67,7 +68,7 @@ func TestHealthChecker(t *testing.T) {
},
}
hc := NewHealthChecker()
health, err := hc.HealthCheck(api.PodState{}, container)
health, err := hc.HealthCheck("test", api.PodState{}, container)
if err != nil && tt.health != Unknown {
t.Errorf("Unexpected error: %v", err)
}
@ -133,7 +134,7 @@ func TestMuxHealthChecker(t *testing.T) {
container.LivenessProbe.Type = tt.probeType
container.LivenessProbe.HTTPGet.Port = util.MakeIntOrStringFromString(port)
container.LivenessProbe.HTTPGet.Host = host
health, err := mc.HealthCheck(api.PodState{}, container)
health, err := mc.HealthCheck("test", api.PodState{}, container)
if err != nil {
t.Errorf("Unexpected error: %v", err)
}

View File

@ -38,6 +38,10 @@ type HTTPHealthChecker struct {
client HTTPGetInterface
}
func NewHTTPHealthChecker(client *http.Client) HealthChecker {
return &HTTPHealthChecker{client: &http.Client{}}
}
// Get the components of the target URL. For testability.
func getURLParts(currentState api.PodState, container api.Container) (string, int, string, error) {
params := container.LivenessProbe.HTTPGet
@ -94,7 +98,7 @@ func DoHTTPCheck(url string, client HTTPGetInterface) (Status, error) {
}
// HealthCheck checks if the container is healthy by trying sending HTTP Get requests to the container.
func (h *HTTPHealthChecker) HealthCheck(currentState api.PodState, container api.Container) (Status, error) {
func (h *HTTPHealthChecker) HealthCheck(podFullName string, currentState api.PodState, container api.Container) (Status, error) {
host, port, path, err := getURLParts(currentState, container)
if err != nil {
return Unknown, err

View File

@ -125,7 +125,7 @@ func TestHTTPHealthChecker(t *testing.T) {
params.Port = util.MakeIntOrStringFromString(port)
params.Host = host
}
health, err := hc.HealthCheck(api.PodState{PodIP: host}, container)
health, err := hc.HealthCheck("test", api.PodState{PodIP: host}, container)
if test.health == Unknown && err == nil {
t.Errorf("Expected error")
}

View File

@ -74,7 +74,7 @@ func DoTCPCheck(addr string) (Status, error) {
return Healthy, nil
}
func (t *TCPHealthChecker) HealthCheck(currentState api.PodState, container api.Container) (Status, error) {
func (t *TCPHealthChecker) HealthCheck(podFullName string, currentState api.PodState, container api.Container) (Status, error) {
host, port, err := getTCPAddrParts(currentState, container)
if err != nil {
return Unknown, err

View File

@ -102,7 +102,7 @@ func TestTcpHealthChecker(t *testing.T) {
if params != nil && test.expectedStatus == Healthy {
params.Port = util.MakeIntOrStringFromString(port)
}
status, err := checker.HealthCheck(api.PodState{PodIP: host}, container)
status, err := checker.HealthCheck("test", api.PodState{PodIP: host}, container)
if status != test.expectedStatus {
t.Errorf("expected: %v, got: %v", test.expectedStatus, status)
}

View File

@ -74,6 +74,7 @@ func NewMainKubelet(
rootDirectory: rd,
resyncInterval: ri,
podWorkers: newPodWorkers(),
runner: NewDockerContainerCommandRunner(),
}
}
@ -451,9 +452,10 @@ func (kl *Kubelet) syncPod(pod *Pod, dockerContainers DockerContainers) error {
// look for changes in the container.
if hash == 0 || hash == expectedHash {
// TODO: This should probably be separated out into a separate goroutine.
healthy, err := kl.healthy(podState, container, dockerContainer)
healthy, err := kl.healthy(podFullName, podState, container, dockerContainer)
if err != nil {
glog.V(1).Infof("health check errored: %v", err)
containersToKeep[containerID] = empty{}
continue
}
if healthy == health.Healthy {
@ -702,7 +704,7 @@ func (kl *Kubelet) GetMachineInfo() (*info.MachineInfo, error) {
return kl.cadvisorClient.MachineInfo()
}
func (kl *Kubelet) healthy(currentState api.PodState, container api.Container, dockerContainer *docker.APIContainers) (health.Status, error) {
func (kl *Kubelet) healthy(podFullName string, currentState api.PodState, container api.Container, dockerContainer *docker.APIContainers) (health.Status, error) {
// Give the container 60 seconds to start up.
if container.LivenessProbe == nil {
return health.Healthy, nil
@ -713,7 +715,7 @@ func (kl *Kubelet) healthy(currentState api.PodState, container api.Container, d
if kl.healthChecker == nil {
return health.Healthy, nil
}
return kl.healthChecker.HealthCheck(currentState, container)
return kl.healthChecker.HealthCheck(podFullName, currentState, container)
}
// Returns logs of current machine.
@ -723,11 +725,10 @@ func (kl *Kubelet) ServeLogs(w http.ResponseWriter, req *http.Request) {
}
// Run a command in a container, returns the combined stdout, stderr as an array of bytes
func (kl *Kubelet) RunInContainer(pod *Pod, container string, cmd []string) ([]byte, error) {
func (kl *Kubelet) RunInContainer(podFullName, container string, cmd []string) ([]byte, error) {
if kl.runner == nil {
return nil, fmt.Errorf("no runner specified.")
}
podFullName := GetPodFullName(pod)
dockerContainers, err := getKubeletDockerContainers(kl.dockerClient)
if err != nil {
return nil, err

View File

@ -473,7 +473,7 @@ func TestSyncPodDeletesDuplicate(t *testing.T) {
type FalseHealthChecker struct{}
func (f *FalseHealthChecker) HealthCheck(state api.PodState, container api.Container) (health.Status, error) {
func (f *FalseHealthChecker) HealthCheck(podFullName string, state api.PodState, container api.Container) (health.Status, error) {
return health.Unhealthy, nil
}
@ -1056,7 +1056,7 @@ func TestRunInContainerNoSuchPod(t *testing.T) {
podNamespace := "etcd"
containerName := "containerFoo"
output, err := kubelet.RunInContainer(
&Pod{Name: podName, Namespace: podNamespace},
podName+"."+podNamespace,
containerName,
[]string{"ls"})
if output != nil {
@ -1086,7 +1086,7 @@ func TestRunInContainer(t *testing.T) {
cmd := []string{"ls"}
_, err := kubelet.RunInContainer(
&Pod{Name: podName, Namespace: podNamespace},
podName+"."+podNamespace,
containerName,
cmd)
if fakeCommandRunner.ID != containerID {