diff --git a/pkg/api/types.go b/pkg/api/types.go index 182b32f0d03..861f2727240 100644 --- a/pkg/api/types.go +++ b/pkg/api/types.go @@ -144,12 +144,20 @@ type HTTPGetProbe struct { Host string `yaml:"host,omitempty" json:"host,omitempty"` } +// TCPSocketProbe describes a liveness probe based on opening a socket +type TCPSocketProbe struct { + // Port is the port to connect to. Required. + Port int `yaml:"port,omitempty" json:"port,omitempty"` +} + // LivenessProbe describes a liveness probe to be examined to the container. type LivenessProbe struct { - // Type of liveness probe. Current legal values "http" + // Type of liveness probe. Current legal values "http", "tcp" Type string `yaml:"type,omitempty" json:"type,omitempty"` // HTTPGetProbe parameters, required if Type == 'http' HTTPGet *HTTPGetProbe `yaml:"httpGet,omitempty" json:"httpGet,omitempty"` + // TCPSocketProbe parameter, required if Type == 'tcp' + TCPSocket *TCPSocketProbe `yaml:"tcpSocket,omitempty" json:"tcpSocket,omitempty"` // Length of time before health checking is activated. In seconds. InitialDelaySeconds int64 `yaml:"initialDelaySeconds,omitempty" json:"initialDelaySeconds,omitempty"` } diff --git a/pkg/api/v1beta1/types.go b/pkg/api/v1beta1/types.go index aa2615f0225..0fcab0a355b 100644 --- a/pkg/api/v1beta1/types.go +++ b/pkg/api/v1beta1/types.go @@ -147,12 +147,20 @@ type HTTPGetProbe struct { Host string `yaml:"host,omitempty" json:"host,omitempty"` } +// TCPSocketProbe describes a liveness probe based on opening a socket +type TCPSocketProbe struct { + // Port is the port to connect to. Required. + Port int `yaml:"port,omitempty" json:"port,omitempty"` +} + // LivenessProbe describes a liveness probe to be examined to the container. type LivenessProbe struct { - // Type of liveness probe. Current legal values "http" + // Type of liveness probe. Current legal values "http", "tcp" Type string `yaml:"type,omitempty" json:"type,omitempty"` // HTTPGetProbe parameters, required if Type == 'http' HTTPGet *HTTPGetProbe `yaml:"httpGet,omitempty" json:"httpGet,omitempty"` + // TCPSocketProbe parameter, required if Type == 'tcp' + TCPSocket *TCPSocketProbe `yaml:"tcpSocket,omitempty" json:"tcpSocket,omitempty"` // Length of time before health checking is activated. In seconds. InitialDelaySeconds int64 `yaml:"initialDelaySeconds,omitempty" json:"initialDelaySeconds,omitempty"` } diff --git a/pkg/client/client.go b/pkg/client/client.go index b86ec2e35d0..76b76e33e81 100644 --- a/pkg/client/client.go +++ b/pkg/client/client.go @@ -192,6 +192,12 @@ func (c *Client) CreatePod(pod api.Pod) (result api.Pod, err error) { // UpdatePod takes the representation of a pod to update. Returns the server's representation of the pod, and an error, if it occurs func (c *Client) UpdatePod(pod api.Pod) (result api.Pod, err error) { + var prev api.Pod + err = c.Get().Path("pod").Path(pod.ID).Do().Into(&prev) + if err != nil { + return + } + pod.ResourceVersion = prev.ResourceVersion err = c.Put().Path("pods").Path(pod.ID).Body(pod).Do().Into(&result) return } @@ -216,6 +222,12 @@ func (c *Client) CreateReplicationController(controller api.ReplicationControlle // UpdateReplicationController updates an existing replication controller func (c *Client) UpdateReplicationController(controller api.ReplicationController) (result api.ReplicationController, err error) { + var prev api.ReplicationController + err = c.Get().Path("replicationControllers").Path(controller.ID).Do().Into(&prev) + if err != nil { + return + } + controller.ResourceVersion = prev.ResourceVersion err = c.Put().Path("replicationControllers").Path(controller.ID).Body(controller).Do().Into(&result) return } @@ -239,6 +251,12 @@ func (c *Client) CreateService(svc api.Service) (result api.Service, err error) // UpdateService updates an existing service. func (c *Client) UpdateService(svc api.Service) (result api.Service, err error) { + var prev api.Service + err = c.Get().Path("services").Path(svc.ID).Do().Into(&prev) + if err != nil { + return + } + svc.ResourceVersion = prev.ResourceVersion err = c.Put().Path("services").Path(svc.ID).Body(svc).Do().Into(&result) return } diff --git a/pkg/health/health_check.go b/pkg/health/health_check.go index 8820b92c5fd..abb06fa7be2 100644 --- a/pkg/health/health_check.go +++ b/pkg/health/health_check.go @@ -18,6 +18,7 @@ package health import ( "fmt" + "net" "net/http" "strconv" @@ -27,7 +28,7 @@ import ( // HealthChecker defines an abstract interface for checking container health. type HealthChecker interface { - HealthCheck(container api.Container) (Status, error) + HealthCheck(currentState api.PodState, container api.Container) (Status, error) } // NewHealthChecker creates a new HealthChecker which supports multiple types of liveness probes. @@ -37,6 +38,7 @@ func NewHealthChecker() HealthChecker { "http": &HTTPHealthChecker{ client: &http.Client{}, }, + "tcp": &TCPHealthChecker{}, }, } } @@ -49,13 +51,13 @@ type MuxHealthChecker struct { // HealthCheck delegates the health-checking of the container to one of the bundled implementations. // It chooses an implementation according to container.LivenessProbe.Type. // If there is no matching health checker it returns Unknown, nil. -func (m *MuxHealthChecker) HealthCheck(container api.Container) (Status, error) { +func (m *MuxHealthChecker) HealthCheck(currentState api.PodState, container api.Container) (Status, error) { checker, ok := m.checkers[container.LivenessProbe.Type] if !ok || checker == nil { glog.Warningf("Failed to find health checker for %s %s", container.Name, container.LivenessProbe.Type) return Unknown, nil } - return checker.HealthCheck(container) + return checker.HealthCheck(currentState, container) } // HTTPHealthChecker is an implementation of HealthChecker which checks container health by sending HTTP Get requests. @@ -74,7 +76,7 @@ func (h *HTTPHealthChecker) findPort(container api.Container, portName string) i } // HealthCheck checks if the container is healthy by trying sending HTTP Get requests to the container. -func (h *HTTPHealthChecker) HealthCheck(container api.Container) (Status, error) { +func (h *HTTPHealthChecker) HealthCheck(currentState api.PodState, container api.Container) (Status, error) { params := container.LivenessProbe.HTTPGet if params == nil { return Unknown, fmt.Errorf("Error, no HTTP parameters specified: %v", container) @@ -91,8 +93,29 @@ func (h *HTTPHealthChecker) HealthCheck(container api.Container) (Status, error) if len(params.Host) > 0 { host = params.Host } else { - host = "localhost" + host = currentState.PodIP } url := fmt.Sprintf("http://%s:%d%s", host, port, params.Path) return Check(url, h.client) } + +type TCPHealthChecker struct{} + +func (t *TCPHealthChecker) HealthCheck(currentState api.PodState, container api.Container) (Status, error) { + params := container.LivenessProbe.TCPSocket + if params == nil { + return Unknown, fmt.Errorf("error, no TCP parameters specified: %v", container) + } + if len(currentState.PodIP) == 0 { + return Unknown, fmt.Errorf("no host specified.") + } + conn, err := net.Dial("tcp", net.JoinHostPort(currentState.PodIP, strconv.Itoa(params.Port))) + if err != nil { + return Unhealthy, nil + } + err = conn.Close() + if err != nil { + glog.Errorf("unexpected error closing health check socket: %v (%#v)", err, err) + } + return Healthy, nil +} diff --git a/pkg/health/health_check_test.go b/pkg/health/health_check_test.go index 3997adac92b..bc9841680cd 100644 --- a/pkg/health/health_check_test.go +++ b/pkg/health/health_check_test.go @@ -21,6 +21,7 @@ import ( "net/http" "net/http/httptest" "net/url" + "strconv" "testing" "github.com/GoogleCloudPlatform/kubernetes/pkg/api" @@ -66,7 +67,7 @@ func TestHealthChecker(t *testing.T) { }, } hc := NewHealthChecker() - health, err := hc.HealthCheck(container) + health, err := hc.HealthCheck(api.PodState{}, container) if err != nil && tt.health != Unknown { t.Errorf("Unexpected error: %v", err) } @@ -139,7 +140,7 @@ func TestHTTPHealthChecker(t *testing.T) { params.Host = host } } - health, err := hc.HealthCheck(container) + health, err := hc.HealthCheck(api.PodState{PodIP: host}, container) if tt.health == Unknown && err == nil { t.Errorf("Expected error") } @@ -152,6 +153,47 @@ func TestHTTPHealthChecker(t *testing.T) { } } +func TestTcpHealthChecker(t *testing.T) { + type tcpHealthTest struct { + probe *api.LivenessProbe + expectedStatus Status + expectError bool + } + + checker := &TCPHealthChecker{} + server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + w.WriteHeader(http.StatusOK) + })) + u, err := url.Parse(server.URL) + if err != nil { + t.Errorf("Unexpected error: %v", err) + } + host, port, err := net.SplitHostPort(u.Host) + portNum, _ := strconv.Atoi(port) + + tests := []tcpHealthTest{ + {&api.LivenessProbe{TCPSocket: &api.TCPSocketProbe{Port: portNum}}, Healthy, false}, + {&api.LivenessProbe{TCPSocket: &api.TCPSocketProbe{Port: 100000}}, Unhealthy, false}, + {&api.LivenessProbe{}, Unknown, true}, + } + for _, test := range tests { + probe := test.probe + container := api.Container{ + LivenessProbe: probe, + } + status, err := checker.HealthCheck(api.PodState{PodIP: host}, container) + if status != test.expectedStatus { + t.Errorf("expected: %v, got: %v", test.expectedStatus, status) + } + if err != nil && !test.expectError { + t.Errorf("unexpected error: %#v", err) + } + if err == nil && test.expectError { + t.Errorf("unexpected non-error.") + } + } +} + func TestMuxHealthChecker(t *testing.T) { muxHealthCheckerTests := []struct { health Status @@ -188,7 +230,7 @@ func TestMuxHealthChecker(t *testing.T) { container.LivenessProbe.Type = tt.probeType container.LivenessProbe.HTTPGet.Port = port container.LivenessProbe.HTTPGet.Host = host - health, err := mc.HealthCheck(container) + health, err := mc.HealthCheck(api.PodState{}, container) if err != nil { t.Errorf("Unexpected error: %v", err) } diff --git a/pkg/kubelet/kubelet.go b/pkg/kubelet/kubelet.go index 70bd76f2f70..6729fb15afc 100644 --- a/pkg/kubelet/kubelet.go +++ b/pkg/kubelet/kubelet.go @@ -376,6 +376,16 @@ func (kl *Kubelet) syncPod(pod *Pod, dockerContainers DockerContainers) error { return err } + podState := api.PodState{} + info, err := kl.GetPodInfo(podFullName) + if err != nil { + glog.Errorf("Unable to get pod info, health checks may be invalid.") + } + netInfo, found := info[networkContainerName] + if found && netInfo.NetworkSettings != nil { + podState.PodIP = netInfo.NetworkSettings.IPAddress + } + for _, container := range pod.Manifest.Containers { if dockerContainer, found := dockerContainers.FindPodContainer(podFullName, container.Name); found { containerID := DockerID(dockerContainer.ID) @@ -383,7 +393,7 @@ func (kl *Kubelet) syncPod(pod *Pod, dockerContainers DockerContainers) error { glog.V(1).Infof("pod %s container %s exists as %v", podFullName, container.Name, containerID) // TODO: This should probably be separated out into a separate goroutine. - healthy, err := kl.healthy(container, dockerContainer) + healthy, err := kl.healthy(podState, container, dockerContainer) if err != nil { glog.V(1).Infof("health check errored: %v", err) continue @@ -592,7 +602,7 @@ func (kl *Kubelet) GetMachineInfo() (*info.MachineInfo, error) { return kl.cadvisorClient.MachineInfo() } -func (kl *Kubelet) healthy(container api.Container, dockerContainer *docker.APIContainers) (health.Status, error) { +func (kl *Kubelet) healthy(currentState api.PodState, container api.Container, dockerContainer *docker.APIContainers) (health.Status, error) { // Give the container 60 seconds to start up. if container.LivenessProbe == nil { return health.Healthy, nil @@ -603,7 +613,7 @@ func (kl *Kubelet) healthy(container api.Container, dockerContainer *docker.APIC if kl.healthChecker == nil { return health.Healthy, nil } - return kl.healthChecker.HealthCheck(container) + return kl.healthChecker.HealthCheck(currentState, container) } // Returns logs of current machine. diff --git a/pkg/kubelet/kubelet_test.go b/pkg/kubelet/kubelet_test.go index 48501845f7d..1d8def44fd4 100644 --- a/pkg/kubelet/kubelet_test.go +++ b/pkg/kubelet/kubelet_test.go @@ -67,6 +67,8 @@ func makeTestKubelet(t *testing.T) (*Kubelet, *tools.FakeEtcdClient, *FakeDocker } func verifyCalls(t *testing.T, fakeDocker *FakeDockerClient, calls []string) { + fakeDocker.lock.Lock() + defer fakeDocker.lock.Unlock() verifyStringArrayEquals(t, fakeDocker.called, calls) } @@ -318,7 +320,7 @@ func TestSyncPodDeletesDuplicate(t *testing.T) { }, }, dockerContainers) expectNoError(t, err) - verifyCalls(t, fakeDocker, []string{"stop"}) + verifyCalls(t, fakeDocker, []string{"list", "stop"}) // Expect one of the duplicates to be killed. if len(fakeDocker.stopped) != 1 || (len(fakeDocker.stopped) != 0 && fakeDocker.stopped[0] != "1234" && fakeDocker.stopped[0] != "4567") { @@ -328,7 +330,7 @@ func TestSyncPodDeletesDuplicate(t *testing.T) { type FalseHealthChecker struct{} -func (f *FalseHealthChecker) HealthCheck(container api.Container) (health.Status, error) { +func (f *FalseHealthChecker) HealthCheck(state api.PodState, container api.Container) (health.Status, error) { return health.Unhealthy, nil } @@ -363,7 +365,7 @@ func TestSyncPodUnhealthy(t *testing.T) { }, }, dockerContainers) expectNoError(t, err) - verifyCalls(t, fakeDocker, []string{"stop", "create", "start"}) + verifyCalls(t, fakeDocker, []string{"list", "stop", "create", "start"}) // A map interation is used to delete containers, so must not depend on // order here.