mirror of
https://github.com/k3s-io/kubernetes.git
synced 2025-08-01 15:58:37 +00:00
kubelet: Refactor runHandler().
Use new HandlerRunner type to replace runHandler()
This commit is contained in:
parent
01b891770f
commit
c23b83b0ca
@ -22,32 +22,52 @@ import (
|
|||||||
"strconv"
|
"strconv"
|
||||||
|
|
||||||
"github.com/GoogleCloudPlatform/kubernetes/pkg/api"
|
"github.com/GoogleCloudPlatform/kubernetes/pkg/api"
|
||||||
"github.com/GoogleCloudPlatform/kubernetes/pkg/types"
|
"github.com/GoogleCloudPlatform/kubernetes/pkg/kubelet/dockertools"
|
||||||
"github.com/GoogleCloudPlatform/kubernetes/pkg/util"
|
"github.com/GoogleCloudPlatform/kubernetes/pkg/util"
|
||||||
"github.com/golang/glog"
|
"github.com/golang/glog"
|
||||||
)
|
)
|
||||||
|
|
||||||
type execActionHandler struct {
|
type HandlerRunner interface {
|
||||||
kubelet *Kubelet
|
Run(containerID string, pod *api.Pod, container *api.Container, handler *api.Handler) error
|
||||||
}
|
}
|
||||||
|
|
||||||
func (e *execActionHandler) Run(podFullName string, uid types.UID, container *api.Container, handler *api.Handler) error {
|
type handlerRunner struct {
|
||||||
_, err := e.kubelet.RunInContainer(podFullName, uid, container.Name, handler.Exec.Command)
|
httpGetter httpGetter
|
||||||
return err
|
commandRunner dockertools.ContainerCommandRunner
|
||||||
|
containerManager *dockertools.DockerManager
|
||||||
}
|
}
|
||||||
|
|
||||||
type httpActionHandler struct {
|
// TODO(yifan): Merge commandRunner and containerManager once containerManager implements the ContainerCommandRunner interface.
|
||||||
kubelet *Kubelet
|
func NewHandlerRunner(httpGetter httpGetter, commandRunner dockertools.ContainerCommandRunner, containerManager *dockertools.DockerManager) *handlerRunner {
|
||||||
client httpGetter
|
return &handlerRunner{
|
||||||
|
httpGetter: httpGetter,
|
||||||
|
commandRunner: commandRunner,
|
||||||
|
containerManager: containerManager,
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// ResolvePort attempts to turn a IntOrString port reference into a concrete port number.
|
// TODO(yifan): Use a strong type for containerID.
|
||||||
|
func (hr *handlerRunner) Run(containerID string, pod *api.Pod, container *api.Container, handler *api.Handler) error {
|
||||||
|
switch {
|
||||||
|
case handler.Exec != nil:
|
||||||
|
_, err := hr.commandRunner.RunInContainer(containerID, handler.Exec.Command)
|
||||||
|
return err
|
||||||
|
case handler.HTTPGet != nil:
|
||||||
|
return hr.runHTTPHandler(pod, container, handler)
|
||||||
|
default:
|
||||||
|
err := fmt.Errorf("Invalid handler: %v", handler)
|
||||||
|
glog.Errorf("Cannot run handler: %v", err)
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// resolvePort attempts to turn a IntOrString port reference into a concrete port number.
|
||||||
// If portReference has an int value, it is treated as a literal, and simply returns that value.
|
// If portReference has an int value, it is treated as a literal, and simply returns that value.
|
||||||
// If portReference is a string, an attempt is first made to parse it as an integer. If that fails,
|
// If portReference is a string, an attempt is first made to parse it as an integer. If that fails,
|
||||||
// an attempt is made to find a port with the same name in the container spec.
|
// an attempt is made to find a port with the same name in the container spec.
|
||||||
// If a port with the same name is found, it's ContainerPort value is returned. If no matching
|
// If a port with the same name is found, it's ContainerPort value is returned. If no matching
|
||||||
// port is found, an error is returned.
|
// port is found, an error is returned.
|
||||||
func ResolvePort(portReference util.IntOrString, container *api.Container) (int, error) {
|
func resolvePort(portReference util.IntOrString, container *api.Container) (int, error) {
|
||||||
if portReference.Kind == util.IntstrInt {
|
if portReference.Kind == util.IntstrInt {
|
||||||
return portReference.IntVal, nil
|
return portReference.IntVal, nil
|
||||||
} else {
|
} else {
|
||||||
@ -66,10 +86,10 @@ func ResolvePort(portReference util.IntOrString, container *api.Container) (int,
|
|||||||
return -1, fmt.Errorf("couldn't find port: %v in %v", portReference, container)
|
return -1, fmt.Errorf("couldn't find port: %v in %v", portReference, container)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (h *httpActionHandler) Run(podFullName string, uid types.UID, container *api.Container, handler *api.Handler) error {
|
func (hr *handlerRunner) runHTTPHandler(pod *api.Pod, container *api.Container, handler *api.Handler) error {
|
||||||
host := handler.HTTPGet.Host
|
host := handler.HTTPGet.Host
|
||||||
if len(host) == 0 {
|
if len(host) == 0 {
|
||||||
status, err := h.kubelet.GetPodStatus(podFullName)
|
status, err := hr.containerManager.GetPodStatus(pod)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
glog.Errorf("Unable to get pod info, event handlers may be invalid.")
|
glog.Errorf("Unable to get pod info, event handlers may be invalid.")
|
||||||
return err
|
return err
|
||||||
@ -84,12 +104,12 @@ func (h *httpActionHandler) Run(podFullName string, uid types.UID, container *ap
|
|||||||
port = 80
|
port = 80
|
||||||
} else {
|
} else {
|
||||||
var err error
|
var err error
|
||||||
port, err = ResolvePort(handler.HTTPGet.Port, container)
|
port, err = resolvePort(handler.HTTPGet.Port, container)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
url := fmt.Sprintf("http://%s/%s", net.JoinHostPort(host, strconv.Itoa(port)), handler.HTTPGet.Path)
|
url := fmt.Sprintf("http://%s/%s", net.JoinHostPort(host, strconv.Itoa(port)), handler.HTTPGet.Path)
|
||||||
_, err := h.client.Get(url)
|
_, err := hr.httpGetter.Get(url)
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
@ -25,7 +25,7 @@ import (
|
|||||||
|
|
||||||
func TestResolvePortInt(t *testing.T) {
|
func TestResolvePortInt(t *testing.T) {
|
||||||
expected := 80
|
expected := 80
|
||||||
port, err := ResolvePort(util.IntOrString{Kind: util.IntstrInt, IntVal: expected}, &api.Container{})
|
port, err := resolvePort(util.IntOrString{Kind: util.IntstrInt, IntVal: expected}, &api.Container{})
|
||||||
if port != expected {
|
if port != expected {
|
||||||
t.Errorf("expected: %d, saw: %d", expected, port)
|
t.Errorf("expected: %d, saw: %d", expected, port)
|
||||||
}
|
}
|
||||||
@ -42,7 +42,7 @@ func TestResolvePortString(t *testing.T) {
|
|||||||
{Name: name, ContainerPort: expected},
|
{Name: name, ContainerPort: expected},
|
||||||
},
|
},
|
||||||
}
|
}
|
||||||
port, err := ResolvePort(util.IntOrString{Kind: util.IntstrString, StrVal: name}, container)
|
port, err := resolvePort(util.IntOrString{Kind: util.IntstrString, StrVal: name}, container)
|
||||||
if port != expected {
|
if port != expected {
|
||||||
t.Errorf("expected: %d, saw: %d", expected, port)
|
t.Errorf("expected: %d, saw: %d", expected, port)
|
||||||
}
|
}
|
||||||
@ -59,7 +59,7 @@ func TestResolvePortStringUnknown(t *testing.T) {
|
|||||||
{Name: "bar", ContainerPort: expected},
|
{Name: "bar", ContainerPort: expected},
|
||||||
},
|
},
|
||||||
}
|
}
|
||||||
port, err := ResolvePort(util.IntOrString{Kind: util.IntstrString, StrVal: name}, container)
|
port, err := resolvePort(util.IntOrString{Kind: util.IntstrString, StrVal: name}, container)
|
||||||
if port != -1 {
|
if port != -1 {
|
||||||
t.Errorf("expected: -1, saw: %d", port)
|
t.Errorf("expected: -1, saw: %d", port)
|
||||||
}
|
}
|
||||||
|
@ -233,6 +233,7 @@ func NewMainKubelet(
|
|||||||
|
|
||||||
klet.podManager = newBasicPodManager(klet.kubeClient)
|
klet.podManager = newBasicPodManager(klet.kubeClient)
|
||||||
klet.prober = NewProber(klet.runner, klet.readinessManager, klet.containerRefManager, klet.recorder)
|
klet.prober = NewProber(klet.runner, klet.readinessManager, klet.containerRefManager, klet.recorder)
|
||||||
|
klet.handlerRunner = NewHandlerRunner(klet.httpClient, klet.runner, klet.containerManager)
|
||||||
|
|
||||||
runtimeCache, err := kubecontainer.NewRuntimeCache(containerManager)
|
runtimeCache, err := kubecontainer.NewRuntimeCache(containerManager)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
@ -317,6 +318,10 @@ type Kubelet struct {
|
|||||||
|
|
||||||
// Healthy check prober.
|
// Healthy check prober.
|
||||||
prober *Prober
|
prober *Prober
|
||||||
|
|
||||||
|
// Container lifecycle handler runner.
|
||||||
|
handlerRunner HandlerRunner
|
||||||
|
|
||||||
// Container readiness state manager.
|
// Container readiness state manager.
|
||||||
readinessManager *kubecontainer.ReadinessManager
|
readinessManager *kubecontainer.ReadinessManager
|
||||||
|
|
||||||
@ -597,31 +602,6 @@ func makeBinds(container *api.Container, podVolumes volumeMap) []string {
|
|||||||
return binds
|
return binds
|
||||||
}
|
}
|
||||||
|
|
||||||
// A basic interface that knows how to execute handlers
|
|
||||||
type actionHandler interface {
|
|
||||||
Run(podFullName string, uid types.UID, container *api.Container, handler *api.Handler) error
|
|
||||||
}
|
|
||||||
|
|
||||||
func (kl *Kubelet) newActionHandler(handler *api.Handler) actionHandler {
|
|
||||||
switch {
|
|
||||||
case handler.Exec != nil:
|
|
||||||
return &execActionHandler{kubelet: kl}
|
|
||||||
case handler.HTTPGet != nil:
|
|
||||||
return &httpActionHandler{client: kl.httpClient, kubelet: kl}
|
|
||||||
default:
|
|
||||||
glog.Errorf("Invalid handler: %v", handler)
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func (kl *Kubelet) runHandler(podFullName string, uid types.UID, container *api.Container, handler *api.Handler) error {
|
|
||||||
actionHandler := kl.newActionHandler(handler)
|
|
||||||
if actionHandler == nil {
|
|
||||||
return fmt.Errorf("invalid handler")
|
|
||||||
}
|
|
||||||
return actionHandler.Run(podFullName, uid, container, handler)
|
|
||||||
}
|
|
||||||
|
|
||||||
// generateRunContainerOptions generates the RunContainerOptions, which can be used by
|
// generateRunContainerOptions generates the RunContainerOptions, which can be used by
|
||||||
// the container runtime to set parameters for launching a container.
|
// the container runtime to set parameters for launching a container.
|
||||||
func (kl *Kubelet) generateRunContainerOptions(pod *api.Pod, container *api.Container, podVolumes volumeMap, netMode, ipcMode string) (*kubecontainer.RunContainerOptions, error) {
|
func (kl *Kubelet) generateRunContainerOptions(pod *api.Pod, container *api.Container, podVolumes volumeMap, netMode, ipcMode string) (*kubecontainer.RunContainerOptions, error) {
|
||||||
@ -677,7 +657,7 @@ func (kl *Kubelet) runContainer(pod *api.Pod, container *api.Container, podVolum
|
|||||||
}
|
}
|
||||||
|
|
||||||
if container.Lifecycle != nil && container.Lifecycle.PostStart != nil {
|
if container.Lifecycle != nil && container.Lifecycle.PostStart != nil {
|
||||||
handlerErr := kl.runHandler(kubecontainer.GetPodFullName(pod), pod.UID, container, container.Lifecycle.PostStart)
|
handlerErr := kl.handlerRunner.Run(id, pod, container, container.Lifecycle.PostStart)
|
||||||
if handlerErr != nil {
|
if handlerErr != nil {
|
||||||
kl.killContainerByID(id)
|
kl.killContainerByID(id)
|
||||||
return dockertools.DockerID(""), fmt.Errorf("failed to call event handler: %v", handlerErr)
|
return dockertools.DockerID(""), fmt.Errorf("failed to call event handler: %v", handlerErr)
|
||||||
|
@ -114,6 +114,7 @@ func newTestKubelet(t *testing.T) *TestKubelet {
|
|||||||
fakeRecorder)
|
fakeRecorder)
|
||||||
kubelet.containerManager.Puller = &dockertools.FakeDockerPuller{}
|
kubelet.containerManager.Puller = &dockertools.FakeDockerPuller{}
|
||||||
kubelet.prober = NewProber(nil, kubelet.readinessManager, kubelet.containerRefManager, kubelet.recorder)
|
kubelet.prober = NewProber(nil, kubelet.readinessManager, kubelet.containerRefManager, kubelet.recorder)
|
||||||
|
kubelet.handlerRunner = NewHandlerRunner(&fakeHTTP{}, &fakeContainerCommandRunner{}, kubelet.containerManager)
|
||||||
|
|
||||||
return &TestKubelet{kubelet, fakeDocker, mockCadvisor, fakeKubeClient, waitGroup, fakeMirrorClient}
|
return &TestKubelet{kubelet, fakeDocker, mockCadvisor, fakeKubeClient, waitGroup, fakeMirrorClient}
|
||||||
}
|
}
|
||||||
@ -767,6 +768,7 @@ func TestSyncPodsWithPodInfraCreatesContainerCallsHandler(t *testing.T) {
|
|||||||
waitGroup := testKubelet.waitGroup
|
waitGroup := testKubelet.waitGroup
|
||||||
fakeHttp := fakeHTTP{}
|
fakeHttp := fakeHTTP{}
|
||||||
kubelet.httpClient = &fakeHttp
|
kubelet.httpClient = &fakeHttp
|
||||||
|
kubelet.handlerRunner = NewHandlerRunner(kubelet.httpClient, &fakeContainerCommandRunner{}, kubelet.containerManager)
|
||||||
pods := []*api.Pod{
|
pods := []*api.Pod{
|
||||||
{
|
{
|
||||||
ObjectMeta: api.ObjectMeta{
|
ObjectMeta: api.ObjectMeta{
|
||||||
@ -1692,6 +1694,7 @@ func TestRunHandlerExec(t *testing.T) {
|
|||||||
kubelet := testKubelet.kubelet
|
kubelet := testKubelet.kubelet
|
||||||
fakeDocker := testKubelet.fakeDocker
|
fakeDocker := testKubelet.fakeDocker
|
||||||
kubelet.runner = &fakeCommandRunner
|
kubelet.runner = &fakeCommandRunner
|
||||||
|
kubelet.handlerRunner = NewHandlerRunner(&fakeHTTP{}, kubelet.runner, kubelet.containerManager)
|
||||||
|
|
||||||
containerID := "abc1234"
|
containerID := "abc1234"
|
||||||
podName := "podFoo"
|
podName := "podFoo"
|
||||||
@ -1715,7 +1718,12 @@ func TestRunHandlerExec(t *testing.T) {
|
|||||||
},
|
},
|
||||||
},
|
},
|
||||||
}
|
}
|
||||||
err := kubelet.runHandler(podName+"_"+podNamespace, "", &container, container.Lifecycle.PostStart)
|
|
||||||
|
pod := api.Pod{}
|
||||||
|
pod.ObjectMeta.Name = podName
|
||||||
|
pod.ObjectMeta.Namespace = podNamespace
|
||||||
|
pod.Spec.Containers = []api.Container{container}
|
||||||
|
err := kubelet.handlerRunner.Run(containerID, &pod, &container, container.Lifecycle.PostStart)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Errorf("unexpected error: %v", err)
|
t.Errorf("unexpected error: %v", err)
|
||||||
}
|
}
|
||||||
@ -1741,7 +1749,9 @@ func TestRunHandlerHttp(t *testing.T) {
|
|||||||
testKubelet := newTestKubelet(t)
|
testKubelet := newTestKubelet(t)
|
||||||
kubelet := testKubelet.kubelet
|
kubelet := testKubelet.kubelet
|
||||||
kubelet.httpClient = &fakeHttp
|
kubelet.httpClient = &fakeHttp
|
||||||
|
kubelet.handlerRunner = NewHandlerRunner(kubelet.httpClient, &fakeContainerCommandRunner{}, kubelet.containerManager)
|
||||||
|
|
||||||
|
containerID := "abc1234"
|
||||||
podName := "podFoo"
|
podName := "podFoo"
|
||||||
podNamespace := "nsFoo"
|
podNamespace := "nsFoo"
|
||||||
containerName := "containerFoo"
|
containerName := "containerFoo"
|
||||||
@ -1758,7 +1768,12 @@ func TestRunHandlerHttp(t *testing.T) {
|
|||||||
},
|
},
|
||||||
},
|
},
|
||||||
}
|
}
|
||||||
err := kubelet.runHandler(podName+"_"+podNamespace, "", &container, container.Lifecycle.PostStart)
|
pod := api.Pod{}
|
||||||
|
pod.ObjectMeta.Name = podName
|
||||||
|
pod.ObjectMeta.Namespace = podNamespace
|
||||||
|
pod.Spec.Containers = []api.Container{container}
|
||||||
|
err := kubelet.handlerRunner.Run(containerID, &pod, &container, container.Lifecycle.PostStart)
|
||||||
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Errorf("unexpected error: %v", err)
|
t.Errorf("unexpected error: %v", err)
|
||||||
}
|
}
|
||||||
@ -1767,35 +1782,28 @@ func TestRunHandlerHttp(t *testing.T) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestNewHandler(t *testing.T) {
|
func TestRunHandlerNil(t *testing.T) {
|
||||||
testKubelet := newTestKubelet(t)
|
testKubelet := newTestKubelet(t)
|
||||||
kubelet := testKubelet.kubelet
|
kubelet := testKubelet.kubelet
|
||||||
handler := &api.Handler{
|
|
||||||
HTTPGet: &api.HTTPGetAction{
|
containerID := "abc1234"
|
||||||
Host: "foo",
|
podName := "podFoo"
|
||||||
Port: util.IntOrString{IntVal: 8080, Kind: util.IntstrInt},
|
podNamespace := "nsFoo"
|
||||||
Path: "bar",
|
containerName := "containerFoo"
|
||||||
|
|
||||||
|
container := api.Container{
|
||||||
|
Name: containerName,
|
||||||
|
Lifecycle: &api.Lifecycle{
|
||||||
|
PostStart: &api.Handler{},
|
||||||
},
|
},
|
||||||
}
|
}
|
||||||
actionHandler := kubelet.newActionHandler(handler)
|
pod := api.Pod{}
|
||||||
if actionHandler == nil {
|
pod.ObjectMeta.Name = podName
|
||||||
t.Error("unexpected nil action handler.")
|
pod.ObjectMeta.Namespace = podNamespace
|
||||||
}
|
pod.Spec.Containers = []api.Container{container}
|
||||||
|
err := kubelet.handlerRunner.Run(containerID, &pod, &container, container.Lifecycle.PostStart)
|
||||||
handler = &api.Handler{
|
if err == nil {
|
||||||
Exec: &api.ExecAction{
|
t.Errorf("expect error, but got nil")
|
||||||
Command: []string{"ls", "-l"},
|
|
||||||
},
|
|
||||||
}
|
|
||||||
actionHandler = kubelet.newActionHandler(handler)
|
|
||||||
if actionHandler == nil {
|
|
||||||
t.Error("unexpected nil action handler.")
|
|
||||||
}
|
|
||||||
|
|
||||||
handler = &api.Handler{}
|
|
||||||
actionHandler = kubelet.newActionHandler(handler)
|
|
||||||
if actionHandler != nil {
|
|
||||||
t.Errorf("unexpected non-nil action handler: %v", actionHandler)
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -1809,6 +1817,7 @@ func TestSyncPodEventHandlerFails(t *testing.T) {
|
|||||||
kubelet.httpClient = &fakeHTTP{
|
kubelet.httpClient = &fakeHTTP{
|
||||||
err: fmt.Errorf("test error"),
|
err: fmt.Errorf("test error"),
|
||||||
}
|
}
|
||||||
|
kubelet.handlerRunner = NewHandlerRunner(kubelet.httpClient, &fakeContainerCommandRunner{}, kubelet.containerManager)
|
||||||
|
|
||||||
pods := []*api.Pod{
|
pods := []*api.Pod{
|
||||||
{
|
{
|
||||||
|
Loading…
Reference in New Issue
Block a user