Add support for the PostStart event handler.

This commit is contained in:
Brendan Burns 2014-09-03 13:39:56 -07:00
parent 42eea82461
commit d0884accd7
4 changed files with 422 additions and 3 deletions

97
pkg/kubelet/handlers.go Normal file
View File

@ -0,0 +1,97 @@
/*
Copyright 2014 Google Inc. All rights reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package kubelet
import (
"fmt"
"net"
"strconv"
"github.com/GoogleCloudPlatform/kubernetes/pkg/api"
"github.com/GoogleCloudPlatform/kubernetes/pkg/util"
"github.com/golang/glog"
)
type execActionHandler struct {
kubelet *Kubelet
}
func (e *execActionHandler) Run(podFullName string, container *api.Container, handler *api.Handler) error {
_, err := e.kubelet.RunInContainer(podFullName, container.Name, handler.Exec.Command)
return err
}
type httpActionHandler struct {
kubelet *Kubelet
client httpGetInterface
}
// ResolvePort attempts to turn a IntOrString port reference into a concrete port number.
// If portReference has an int value, it is treated as a literal, and simply returns that value.
// If portReference is a string, an attempt is first made to parse it as an integer. If that fails,
// an attempt is made to find a port with the same name in the container spec.
// If a port with the same name is found, it's ContainerPort value is returned. If no matching
// port is found, an error is returned.
func ResolvePort(portReference util.IntOrString, container *api.Container) (int, error) {
if portReference.Kind == util.IntstrInt {
return portReference.IntVal, nil
} else {
portName := portReference.StrVal
port, err := strconv.Atoi(portName)
if err == nil {
return port, nil
}
for _, portSpec := range container.Ports {
if portSpec.Name == portName {
return portSpec.ContainerPort, nil
}
}
}
return -1, fmt.Errorf("couldn't find port: %v in %v", portReference, container)
}
func (h *httpActionHandler) Run(podFullName string, container *api.Container, handler *api.Handler) error {
host := handler.HTTPGet.Host
if len(host) == 0 {
var info api.PodInfo
info, err := h.kubelet.GetPodInfo(podFullName)
if err != nil {
glog.Errorf("unable to get pod info, event handlers may be invalid.")
return err
}
netInfo, found := info[networkContainerName]
if found && netInfo.NetworkSettings != nil {
host = netInfo.NetworkSettings.IPAddress
} else {
return fmt.Errorf("failed to find networking container: %v", info)
}
}
var port int
if handler.HTTPGet.Port.Kind == util.IntstrString && len(handler.HTTPGet.Port.StrVal) == 0 {
port = 80
} else {
var err error
port, err = ResolvePort(handler.HTTPGet.Port, container)
if err != nil {
return err
}
}
url := fmt.Sprintf("http://%s/%s", net.JoinHostPort(host, strconv.Itoa(port)), handler.HTTPGet.Path)
_, err := h.client.Get(url)
return err
}

View File

@ -0,0 +1,69 @@
/*
Copyright 2014 Google Inc. All rights reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package kubelet
import (
"testing"
"github.com/GoogleCloudPlatform/kubernetes/pkg/api"
"github.com/GoogleCloudPlatform/kubernetes/pkg/util"
)
func TestResolvePortInt(t *testing.T) {
expected := 80
port, err := ResolvePort(util.IntOrString{Kind: util.IntstrInt, IntVal: expected}, &api.Container{})
if port != expected {
t.Errorf("expected: %d, saw: %d", port)
}
if err != nil {
t.Errorf("unexpected error: %v", err)
}
}
func TestResolvePortString(t *testing.T) {
expected := 80
name := "foo"
container := &api.Container{
Ports: []api.Port{
{Name: name, ContainerPort: expected},
},
}
port, err := ResolvePort(util.IntOrString{Kind: util.IntstrString, StrVal: name}, container)
if port != expected {
t.Errorf("expected: %d, saw: %d", port)
}
if err != nil {
t.Errorf("unexpected error: %v", err)
}
}
func TestResolvePortStringUnknown(t *testing.T) {
expected := 80
name := "foo"
container := &api.Container{
Ports: []api.Port{
{Name: "bar", ContainerPort: expected},
},
}
port, err := ResolvePort(util.IntOrString{Kind: util.IntstrString, StrVal: name}, container)
if port != -1 {
t.Errorf("expected: -1, saw: %d", port)
}
if err == nil {
t.Error("unexpected non-error")
}
}

View File

@ -76,6 +76,7 @@ func NewMainKubelet(
resyncInterval: ri,
podWorkers: newPodWorkers(),
runner: NewDockerContainerCommandRunner(),
httpClient: &http.Client{},
}
}
@ -95,6 +96,10 @@ type ContainerCommandRunner interface {
RunInContainer(containerID string, cmd []string) ([]byte, error)
}
type httpGetInterface interface {
Get(url string) (*http.Response, error)
}
// Kubelet is the main kubelet implementation.
type Kubelet struct {
hostname string
@ -115,6 +120,8 @@ type Kubelet struct {
logServer http.Handler
// Optional, defaults to simple Docker implementation
runner ContainerCommandRunner
// Optional, client for http requests, defaults to empty client
httpClient httpGetInterface
}
// Run starts the kubelet reacting to config updates
@ -283,6 +290,31 @@ func (kl *Kubelet) mountExternalVolumes(manifest *api.ContainerManifest) (volume
return podVolumes, nil
}
// A basic interface that knows how to execute handlers
type actionHandler interface {
Run(podFullName string, container *api.Container, handler *api.Handler) error
}
func (kl *Kubelet) newActionHandler(handler *api.Handler) actionHandler {
switch {
case handler.Exec != nil:
return &execActionHandler{kubelet: kl}
case handler.HTTPGet != nil:
return &httpActionHandler{client: kl.httpClient, kubelet: kl}
default:
glog.Errorf("Invalid handler: %v")
return nil
}
}
func (kl *Kubelet) runHandler(podFullName string, container *api.Container, handler *api.Handler) error {
actionHandler := kl.newActionHandler(handler)
if actionHandler == nil {
return fmt.Errorf("invalid handler")
}
return actionHandler.Run(podFullName, container, handler)
}
// Run a single container from a pod. Returns the docker container ID
func (kl *Kubelet) runContainer(pod *Pod, container *api.Container, podVolumes volumeMap, netMode string) (id DockerID, err error) {
envVariables := makeEnvironmentVariables(container)
@ -311,14 +343,28 @@ func (kl *Kubelet) runContainer(pod *Pod, container *api.Container, podVolumes v
Binds: binds,
NetworkMode: netMode,
})
if err == nil && container.Lifecycle != nil && container.Lifecycle.PostStart != nil {
handlerErr := kl.runHandler(GetPodFullName(pod), container, container.Lifecycle.PostStart)
if handlerErr != nil {
kl.killContainerByID(dockerContainer.ID, "")
return DockerID(""), fmt.Errorf("failed to call event handler: %v", handlerErr)
}
}
return DockerID(dockerContainer.ID), err
}
// Kill a docker container
func (kl *Kubelet) killContainer(dockerContainer *docker.APIContainers) error {
glog.Infof("Killing: %s", dockerContainer.ID)
err := kl.dockerClient.StopContainer(dockerContainer.ID, 10)
podFullName, containerName, _ := parseDockerName(dockerContainer.Names[0])
return kl.killContainerByID(dockerContainer.ID, dockerContainer.Names[0])
}
func (kl *Kubelet) killContainerByID(ID, name string) error {
glog.Infof("Killing: %s", ID)
err := kl.dockerClient.StopContainer(ID, 10)
if len(name) == 0 {
return err
}
podFullName, containerName, _ := parseDockerName(name)
kl.LogEvent(&api.Event{
Event: "STOP",
Manifest: &api.ContainerManifest{

View File

@ -20,6 +20,7 @@ import (
"encoding/json"
"fmt"
"hash/adler32"
"net/http"
"reflect"
"regexp"
"strconv"
@ -30,6 +31,7 @@ import (
"github.com/GoogleCloudPlatform/kubernetes/pkg/api"
"github.com/GoogleCloudPlatform/kubernetes/pkg/health"
"github.com/GoogleCloudPlatform/kubernetes/pkg/tools"
"github.com/GoogleCloudPlatform/kubernetes/pkg/util"
"github.com/GoogleCloudPlatform/kubernetes/pkg/volume"
"github.com/fsouza/go-dockerclient"
"github.com/google/cadvisor/info"
@ -346,6 +348,59 @@ func TestSyncPodsWithNetCreatesContainer(t *testing.T) {
fakeDocker.lock.Unlock()
}
func TestSyncPodsWithNetCreatesContainerCallsHandler(t *testing.T) {
kubelet, _, fakeDocker := newTestKubelet(t)
fakeHttp := fakeHTTP{}
kubelet.httpClient = &fakeHttp
fakeDocker.containerList = []docker.APIContainers{
{
// network container
Names: []string{"/k8s--net--foo.test--"},
ID: "9876",
},
}
err := kubelet.SyncPods([]Pod{
{
Name: "foo",
Namespace: "test",
Manifest: api.ContainerManifest{
ID: "foo",
Containers: []api.Container{
{
Name: "bar",
Lifecycle: &api.Lifecycle{
PostStart: &api.Handler{
HTTPGet: &api.HTTPGetAction{
Host: "foo",
Port: util.IntOrString{IntVal: 8080, Kind: util.IntstrInt},
Path: "bar",
},
},
},
},
},
},
},
})
if err != nil {
t.Errorf("unexpected error: %v", err)
}
kubelet.drainWorkers()
verifyCalls(t, fakeDocker, []string{
"list", "list", "list", "inspect", "create", "start"})
fakeDocker.lock.Lock()
if len(fakeDocker.Created) != 1 ||
!matchString(t, "k8s--bar\\.[a-f0-9]+--foo.test--", fakeDocker.Created[0]) {
t.Errorf("Unexpected containers created %v", fakeDocker.Created)
}
fakeDocker.lock.Unlock()
if fakeHttp.url != "http://foo:8080/bar" {
t.Errorf("Unexpected handler: %s", fakeHttp.url)
}
}
func TestSyncPodsDeletesWithNoNetContainer(t *testing.T) {
kubelet, _, fakeDocker := newTestKubelet(t)
fakeDocker.containerList = []docker.APIContainers{
@ -1126,3 +1181,155 @@ func TestParseImageName(t *testing.T) {
}
}
}
func TestRunHandlerExec(t *testing.T) {
fakeCommandRunner := fakeContainerCommandRunner{}
kubelet, _, fakeDocker := newTestKubelet(t)
kubelet.runner = &fakeCommandRunner
containerID := "abc1234"
podName := "podFoo"
podNamespace := "etcd"
containerName := "containerFoo"
fakeDocker.containerList = []docker.APIContainers{
{
ID: containerID,
Names: []string{"/k8s--" + containerName + "--" + podName + "." + podNamespace + "--1234"},
},
}
container := api.Container{
Name: containerName,
Lifecycle: &api.Lifecycle{
PostStart: &api.Handler{
Exec: &api.ExecAction{
Command: []string{"ls", "-a"},
},
},
},
}
err := kubelet.runHandler(podName+"."+podNamespace, &container, container.Lifecycle.PostStart)
if err != nil {
t.Errorf("unexpected error: %v", err)
}
if fakeCommandRunner.ID != containerID ||
!reflect.DeepEqual(container.Lifecycle.PostStart.Exec.Command, fakeCommandRunner.Cmd) {
t.Errorf("unexpected commands: %v", fakeCommandRunner)
}
}
type fakeHTTP struct {
url string
err error
}
func (f *fakeHTTP) Get(url string) (*http.Response, error) {
f.url = url
return nil, f.err
}
func TestRunHandlerHttp(t *testing.T) {
fakeHttp := fakeHTTP{}
kubelet, _, _ := newTestKubelet(t)
kubelet.httpClient = &fakeHttp
podName := "podFoo"
podNamespace := "etcd"
containerName := "containerFoo"
container := api.Container{
Name: containerName,
Lifecycle: &api.Lifecycle{
PostStart: &api.Handler{
HTTPGet: &api.HTTPGetAction{
Host: "foo",
Port: util.IntOrString{IntVal: 8080, Kind: util.IntstrInt},
Path: "bar",
},
},
},
}
err := kubelet.runHandler(podName+"."+podNamespace, &container, container.Lifecycle.PostStart)
if err != nil {
t.Errorf("unexpected error: %v", err)
}
if fakeHttp.url != "http://foo:8080/bar" {
t.Errorf("unexpected url: %s", fakeHttp.url)
}
}
func TestNewHandler(t *testing.T) {
kubelet, _, _ := newTestKubelet(t)
handler := &api.Handler{
HTTPGet: &api.HTTPGetAction{
Host: "foo",
Port: util.IntOrString{IntVal: 8080, Kind: util.IntstrInt},
Path: "bar",
},
}
actionHandler := kubelet.newActionHandler(handler)
if actionHandler == nil {
t.Error("unexpected nil action handler.")
}
handler = &api.Handler{
Exec: &api.ExecAction{
Command: []string{"ls", "-l"},
},
}
actionHandler = kubelet.newActionHandler(handler)
if actionHandler == nil {
t.Error("unexpected nil action handler.")
}
handler = &api.Handler{}
actionHandler = kubelet.newActionHandler(handler)
if actionHandler != nil {
t.Errorf("unexpected non-nil action handler: %v", actionHandler)
}
}
func TestSyncPodEventHandlerFails(t *testing.T) {
kubelet, _, fakeDocker := newTestKubelet(t)
kubelet.httpClient = &fakeHTTP{
err: fmt.Errorf("test error"),
}
dockerContainers := DockerContainers{
"9876": &docker.APIContainers{
// network container
Names: []string{"/k8s--net--foo.test--"},
ID: "9876",
},
}
err := kubelet.syncPod(&Pod{
Name: "foo",
Namespace: "test",
Manifest: api.ContainerManifest{
ID: "foo",
Containers: []api.Container{
{Name: "bar",
Lifecycle: &api.Lifecycle{
PostStart: &api.Handler{
HTTPGet: &api.HTTPGetAction{
Host: "does.no.exist",
Port: util.IntOrString{IntVal: 8080, Kind: util.IntstrInt},
Path: "bar",
},
},
},
},
},
},
}, dockerContainers)
if err != nil {
t.Errorf("unexpected error: %v", err)
}
verifyCalls(t, fakeDocker, []string{"list", "create", "start", "stop"})
if len(fakeDocker.stopped) != 1 {
t.Errorf("Wrong containers were stopped: %v", fakeDocker.stopped)
}
}