diff --git a/pkg/kubelet/handlers.go b/pkg/kubelet/handlers.go new file mode 100644 index 00000000000..8ae6f7e0771 --- /dev/null +++ b/pkg/kubelet/handlers.go @@ -0,0 +1,97 @@ +/* +Copyright 2014 Google Inc. All rights reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package kubelet + +import ( + "fmt" + "net" + "strconv" + + "github.com/GoogleCloudPlatform/kubernetes/pkg/api" + "github.com/GoogleCloudPlatform/kubernetes/pkg/util" + "github.com/golang/glog" +) + +type execActionHandler struct { + kubelet *Kubelet +} + +func (e *execActionHandler) Run(podFullName string, container *api.Container, handler *api.Handler) error { + _, err := e.kubelet.RunInContainer(podFullName, container.Name, handler.Exec.Command) + return err +} + +type httpActionHandler struct { + kubelet *Kubelet + client httpGetInterface +} + +// ResolvePort attempts to turn a IntOrString port reference into a concrete port number. +// If portReference has an int value, it is treated as a literal, and simply returns that value. +// If portReference is a string, an attempt is first made to parse it as an integer. If that fails, +// an attempt is made to find a port with the same name in the container spec. +// If a port with the same name is found, it's ContainerPort value is returned. If no matching +// port is found, an error is returned. +func ResolvePort(portReference util.IntOrString, container *api.Container) (int, error) { + if portReference.Kind == util.IntstrInt { + return portReference.IntVal, nil + } else { + portName := portReference.StrVal + port, err := strconv.Atoi(portName) + if err == nil { + return port, nil + } + for _, portSpec := range container.Ports { + if portSpec.Name == portName { + return portSpec.ContainerPort, nil + } + } + + } + return -1, fmt.Errorf("couldn't find port: %v in %v", portReference, container) +} + +func (h *httpActionHandler) Run(podFullName string, container *api.Container, handler *api.Handler) error { + host := handler.HTTPGet.Host + if len(host) == 0 { + var info api.PodInfo + info, err := h.kubelet.GetPodInfo(podFullName) + if err != nil { + glog.Errorf("unable to get pod info, event handlers may be invalid.") + return err + } + netInfo, found := info[networkContainerName] + if found && netInfo.NetworkSettings != nil { + host = netInfo.NetworkSettings.IPAddress + } else { + return fmt.Errorf("failed to find networking container: %v", info) + } + } + var port int + if handler.HTTPGet.Port.Kind == util.IntstrString && len(handler.HTTPGet.Port.StrVal) == 0 { + port = 80 + } else { + var err error + port, err = ResolvePort(handler.HTTPGet.Port, container) + if err != nil { + return err + } + } + url := fmt.Sprintf("http://%s/%s", net.JoinHostPort(host, strconv.Itoa(port)), handler.HTTPGet.Path) + _, err := h.client.Get(url) + return err +} diff --git a/pkg/kubelet/handlers_test.go b/pkg/kubelet/handlers_test.go new file mode 100644 index 00000000000..006d819ccc9 --- /dev/null +++ b/pkg/kubelet/handlers_test.go @@ -0,0 +1,69 @@ +/* +Copyright 2014 Google Inc. All rights reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package kubelet + +import ( + "testing" + + "github.com/GoogleCloudPlatform/kubernetes/pkg/api" + "github.com/GoogleCloudPlatform/kubernetes/pkg/util" +) + +func TestResolvePortInt(t *testing.T) { + expected := 80 + port, err := ResolvePort(util.IntOrString{Kind: util.IntstrInt, IntVal: expected}, &api.Container{}) + if port != expected { + t.Errorf("expected: %d, saw: %d", port) + } + if err != nil { + t.Errorf("unexpected error: %v", err) + } +} + +func TestResolvePortString(t *testing.T) { + expected := 80 + name := "foo" + container := &api.Container{ + Ports: []api.Port{ + {Name: name, ContainerPort: expected}, + }, + } + port, err := ResolvePort(util.IntOrString{Kind: util.IntstrString, StrVal: name}, container) + if port != expected { + t.Errorf("expected: %d, saw: %d", port) + } + if err != nil { + t.Errorf("unexpected error: %v", err) + } +} + +func TestResolvePortStringUnknown(t *testing.T) { + expected := 80 + name := "foo" + container := &api.Container{ + Ports: []api.Port{ + {Name: "bar", ContainerPort: expected}, + }, + } + port, err := ResolvePort(util.IntOrString{Kind: util.IntstrString, StrVal: name}, container) + if port != -1 { + t.Errorf("expected: -1, saw: %d", port) + } + if err == nil { + t.Error("unexpected non-error") + } +} diff --git a/pkg/kubelet/kubelet.go b/pkg/kubelet/kubelet.go index ec59b2a229b..b6f0c5ad9fc 100644 --- a/pkg/kubelet/kubelet.go +++ b/pkg/kubelet/kubelet.go @@ -76,6 +76,7 @@ func NewMainKubelet( resyncInterval: ri, podWorkers: newPodWorkers(), runner: NewDockerContainerCommandRunner(), + httpClient: &http.Client{}, } } @@ -95,6 +96,10 @@ type ContainerCommandRunner interface { RunInContainer(containerID string, cmd []string) ([]byte, error) } +type httpGetInterface interface { + Get(url string) (*http.Response, error) +} + // Kubelet is the main kubelet implementation. type Kubelet struct { hostname string @@ -115,6 +120,8 @@ type Kubelet struct { logServer http.Handler // Optional, defaults to simple Docker implementation runner ContainerCommandRunner + // Optional, client for http requests, defaults to empty client + httpClient httpGetInterface } // Run starts the kubelet reacting to config updates @@ -283,6 +290,31 @@ func (kl *Kubelet) mountExternalVolumes(manifest *api.ContainerManifest) (volume return podVolumes, nil } +// A basic interface that knows how to execute handlers +type actionHandler interface { + Run(podFullName string, container *api.Container, handler *api.Handler) error +} + +func (kl *Kubelet) newActionHandler(handler *api.Handler) actionHandler { + switch { + case handler.Exec != nil: + return &execActionHandler{kubelet: kl} + case handler.HTTPGet != nil: + return &httpActionHandler{client: kl.httpClient, kubelet: kl} + default: + glog.Errorf("Invalid handler: %v") + return nil + } +} + +func (kl *Kubelet) runHandler(podFullName string, container *api.Container, handler *api.Handler) error { + actionHandler := kl.newActionHandler(handler) + if actionHandler == nil { + return fmt.Errorf("invalid handler") + } + return actionHandler.Run(podFullName, container, handler) +} + // Run a single container from a pod. Returns the docker container ID func (kl *Kubelet) runContainer(pod *Pod, container *api.Container, podVolumes volumeMap, netMode string) (id DockerID, err error) { envVariables := makeEnvironmentVariables(container) @@ -311,14 +343,28 @@ func (kl *Kubelet) runContainer(pod *Pod, container *api.Container, podVolumes v Binds: binds, NetworkMode: netMode, }) + if err == nil && container.Lifecycle != nil && container.Lifecycle.PostStart != nil { + handlerErr := kl.runHandler(GetPodFullName(pod), container, container.Lifecycle.PostStart) + if handlerErr != nil { + kl.killContainerByID(dockerContainer.ID, "") + return DockerID(""), fmt.Errorf("failed to call event handler: %v", handlerErr) + } + } return DockerID(dockerContainer.ID), err } // Kill a docker container func (kl *Kubelet) killContainer(dockerContainer *docker.APIContainers) error { - glog.Infof("Killing: %s", dockerContainer.ID) - err := kl.dockerClient.StopContainer(dockerContainer.ID, 10) - podFullName, containerName, _ := parseDockerName(dockerContainer.Names[0]) + return kl.killContainerByID(dockerContainer.ID, dockerContainer.Names[0]) +} + +func (kl *Kubelet) killContainerByID(ID, name string) error { + glog.Infof("Killing: %s", ID) + err := kl.dockerClient.StopContainer(ID, 10) + if len(name) == 0 { + return err + } + podFullName, containerName, _ := parseDockerName(name) kl.LogEvent(&api.Event{ Event: "STOP", Manifest: &api.ContainerManifest{ diff --git a/pkg/kubelet/kubelet_test.go b/pkg/kubelet/kubelet_test.go index bdbeeb9e886..92a4eac5274 100644 --- a/pkg/kubelet/kubelet_test.go +++ b/pkg/kubelet/kubelet_test.go @@ -20,6 +20,7 @@ import ( "encoding/json" "fmt" "hash/adler32" + "net/http" "reflect" "regexp" "strconv" @@ -30,6 +31,7 @@ import ( "github.com/GoogleCloudPlatform/kubernetes/pkg/api" "github.com/GoogleCloudPlatform/kubernetes/pkg/health" "github.com/GoogleCloudPlatform/kubernetes/pkg/tools" + "github.com/GoogleCloudPlatform/kubernetes/pkg/util" "github.com/GoogleCloudPlatform/kubernetes/pkg/volume" "github.com/fsouza/go-dockerclient" "github.com/google/cadvisor/info" @@ -346,6 +348,59 @@ func TestSyncPodsWithNetCreatesContainer(t *testing.T) { fakeDocker.lock.Unlock() } +func TestSyncPodsWithNetCreatesContainerCallsHandler(t *testing.T) { + kubelet, _, fakeDocker := newTestKubelet(t) + fakeHttp := fakeHTTP{} + kubelet.httpClient = &fakeHttp + fakeDocker.containerList = []docker.APIContainers{ + { + // network container + Names: []string{"/k8s--net--foo.test--"}, + ID: "9876", + }, + } + err := kubelet.SyncPods([]Pod{ + { + Name: "foo", + Namespace: "test", + Manifest: api.ContainerManifest{ + ID: "foo", + Containers: []api.Container{ + { + Name: "bar", + Lifecycle: &api.Lifecycle{ + PostStart: &api.Handler{ + HTTPGet: &api.HTTPGetAction{ + Host: "foo", + Port: util.IntOrString{IntVal: 8080, Kind: util.IntstrInt}, + Path: "bar", + }, + }, + }, + }, + }, + }, + }, + }) + if err != nil { + t.Errorf("unexpected error: %v", err) + } + kubelet.drainWorkers() + + verifyCalls(t, fakeDocker, []string{ + "list", "list", "list", "inspect", "create", "start"}) + + fakeDocker.lock.Lock() + if len(fakeDocker.Created) != 1 || + !matchString(t, "k8s--bar\\.[a-f0-9]+--foo.test--", fakeDocker.Created[0]) { + t.Errorf("Unexpected containers created %v", fakeDocker.Created) + } + fakeDocker.lock.Unlock() + if fakeHttp.url != "http://foo:8080/bar" { + t.Errorf("Unexpected handler: %s", fakeHttp.url) + } +} + func TestSyncPodsDeletesWithNoNetContainer(t *testing.T) { kubelet, _, fakeDocker := newTestKubelet(t) fakeDocker.containerList = []docker.APIContainers{ @@ -1126,3 +1181,155 @@ func TestParseImageName(t *testing.T) { } } } + +func TestRunHandlerExec(t *testing.T) { + fakeCommandRunner := fakeContainerCommandRunner{} + kubelet, _, fakeDocker := newTestKubelet(t) + kubelet.runner = &fakeCommandRunner + + containerID := "abc1234" + podName := "podFoo" + podNamespace := "etcd" + containerName := "containerFoo" + + fakeDocker.containerList = []docker.APIContainers{ + { + ID: containerID, + Names: []string{"/k8s--" + containerName + "--" + podName + "." + podNamespace + "--1234"}, + }, + } + + container := api.Container{ + Name: containerName, + Lifecycle: &api.Lifecycle{ + PostStart: &api.Handler{ + Exec: &api.ExecAction{ + Command: []string{"ls", "-a"}, + }, + }, + }, + } + err := kubelet.runHandler(podName+"."+podNamespace, &container, container.Lifecycle.PostStart) + if err != nil { + t.Errorf("unexpected error: %v", err) + } + if fakeCommandRunner.ID != containerID || + !reflect.DeepEqual(container.Lifecycle.PostStart.Exec.Command, fakeCommandRunner.Cmd) { + t.Errorf("unexpected commands: %v", fakeCommandRunner) + } +} + +type fakeHTTP struct { + url string + err error +} + +func (f *fakeHTTP) Get(url string) (*http.Response, error) { + f.url = url + return nil, f.err +} + +func TestRunHandlerHttp(t *testing.T) { + fakeHttp := fakeHTTP{} + + kubelet, _, _ := newTestKubelet(t) + kubelet.httpClient = &fakeHttp + + podName := "podFoo" + podNamespace := "etcd" + containerName := "containerFoo" + + container := api.Container{ + Name: containerName, + Lifecycle: &api.Lifecycle{ + PostStart: &api.Handler{ + HTTPGet: &api.HTTPGetAction{ + Host: "foo", + Port: util.IntOrString{IntVal: 8080, Kind: util.IntstrInt}, + Path: "bar", + }, + }, + }, + } + err := kubelet.runHandler(podName+"."+podNamespace, &container, container.Lifecycle.PostStart) + if err != nil { + t.Errorf("unexpected error: %v", err) + } + if fakeHttp.url != "http://foo:8080/bar" { + t.Errorf("unexpected url: %s", fakeHttp.url) + } +} + +func TestNewHandler(t *testing.T) { + kubelet, _, _ := newTestKubelet(t) + handler := &api.Handler{ + HTTPGet: &api.HTTPGetAction{ + Host: "foo", + Port: util.IntOrString{IntVal: 8080, Kind: util.IntstrInt}, + Path: "bar", + }, + } + actionHandler := kubelet.newActionHandler(handler) + if actionHandler == nil { + t.Error("unexpected nil action handler.") + } + + handler = &api.Handler{ + Exec: &api.ExecAction{ + Command: []string{"ls", "-l"}, + }, + } + actionHandler = kubelet.newActionHandler(handler) + if actionHandler == nil { + t.Error("unexpected nil action handler.") + } + + handler = &api.Handler{} + actionHandler = kubelet.newActionHandler(handler) + if actionHandler != nil { + t.Errorf("unexpected non-nil action handler: %v", actionHandler) + } +} + +func TestSyncPodEventHandlerFails(t *testing.T) { + kubelet, _, fakeDocker := newTestKubelet(t) + kubelet.httpClient = &fakeHTTP{ + err: fmt.Errorf("test error"), + } + dockerContainers := DockerContainers{ + "9876": &docker.APIContainers{ + // network container + Names: []string{"/k8s--net--foo.test--"}, + ID: "9876", + }, + } + err := kubelet.syncPod(&Pod{ + Name: "foo", + Namespace: "test", + Manifest: api.ContainerManifest{ + ID: "foo", + Containers: []api.Container{ + {Name: "bar", + Lifecycle: &api.Lifecycle{ + PostStart: &api.Handler{ + HTTPGet: &api.HTTPGetAction{ + Host: "does.no.exist", + Port: util.IntOrString{IntVal: 8080, Kind: util.IntstrInt}, + Path: "bar", + }, + }, + }, + }, + }, + }, + }, dockerContainers) + if err != nil { + t.Errorf("unexpected error: %v", err) + } + + verifyCalls(t, fakeDocker, []string{"list", "create", "start", "stop"}) + + if len(fakeDocker.stopped) != 1 { + t.Errorf("Wrong containers were stopped: %v", fakeDocker.stopped) + } +}