Align lifecycle handlers and probes

Align the behavior of HTTP-based lifecycle handlers and HTTP-based
probers, converging on the probers implementation. This fixes multiple
deficiencies in the current implementation of lifecycle handlers
surrounding what functionality is available.

The functionality is gated by the features.ConsistentHTTPGetHandlers feature gate.
This commit is contained in:
Jason Simmons 2019-11-27 13:15:25 -05:00 committed by Billie Cleek
parent 429f71d958
commit 5a6acf85fa
22 changed files with 1044 additions and 248 deletions

View File

@ -192,6 +192,12 @@ const (
// Enables container Checkpoint support in the kubelet
ContainerCheckpoint featuregate.Feature = "ContainerCheckpoint"
// owner: @bhcleek @wzshiming
// GA: v1.25
//
// Normalize HttpGet URL and Header passing for lifecycle handlers with probers.
ConsistentHTTPGetHandlers featuregate.Feature = "ConsistentHTTPGetHandlers"
// owner: @jiahuif
// alpha: v1.21
// beta: v1.22
@ -842,6 +848,8 @@ var defaultKubernetesFeatureGates = map[featuregate.Feature]featuregate.FeatureS
ContainerCheckpoint: {Default: false, PreRelease: featuregate.Alpha},
ConsistentHTTPGetHandlers: {Default: true, PreRelease: featuregate.GA},
ControllerManagerLeaderMigration: {Default: true, PreRelease: featuregate.GA, LockToDefault: true}, // remove in 1.26
CronJobTimeZone: {Default: true, PreRelease: featuregate.Beta},

View File

@ -110,6 +110,7 @@ import (
"k8s.io/kubernetes/pkg/kubelet/util/queue"
"k8s.io/kubernetes/pkg/kubelet/util/sliceutils"
"k8s.io/kubernetes/pkg/kubelet/volumemanager"
httpprobe "k8s.io/kubernetes/pkg/probe/http"
"k8s.io/kubernetes/pkg/security/apparmor"
"k8s.io/kubernetes/pkg/util/oom"
"k8s.io/kubernetes/pkg/volume"
@ -475,7 +476,20 @@ func NewMainKubelet(kubeCfg *kubeletconfiginternal.KubeletConfiguration,
clusterDNS = append(clusterDNS, ip)
}
}
httpClient := &http.Client{}
// A TLS transport is needed to make HTTPS-based container lifecycle requests,
// but we do not have the information necessary to do TLS verification.
//
// This client must not be modified to include credentials, because it is
// critical that credentials not leak from the client to arbitrary hosts.
insecureContainerLifecycleHTTPClient := &http.Client{}
if utilfeature.DefaultFeatureGate.Enabled(features.ConsistentHTTPGetHandlers) {
insecureTLSTransport := &http.Transport{
TLSClientConfig: &tls.Config{InsecureSkipVerify: true},
}
insecureContainerLifecycleHTTPClient.Transport = insecureTLSTransport
insecureContainerLifecycleHTTPClient.CheckRedirect = httpprobe.RedirectChecker(false)
}
klet := &Kubelet{
hostname: hostname,
@ -625,7 +639,7 @@ func NewMainKubelet(kubeCfg *kubeletconfiginternal.KubeletConfiguration,
klet.podWorkers,
kubeDeps.OSInterface,
klet,
httpClient,
insecureContainerLifecycleHTTPClient,
imageBackOff,
kubeCfg.SerializeImagePulls,
float32(kubeCfg.RegistryPullQPS),

View File

@ -46,12 +46,12 @@ const (
)
type fakeHTTP struct {
url string
req *http.Request
err error
}
func (f *fakeHTTP) Get(url string) (*http.Response, error) {
f.url = url
func (f *fakeHTTP) Do(req *http.Request) (*http.Response, error) {
f.req = req
return nil, f.err
}

View File

@ -23,11 +23,18 @@ import (
v1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
runtimeapi "k8s.io/cri-api/pkg/apis/runtime/v1"
runtimetesting "k8s.io/cri-api/pkg/apis/testing"
kubecontainer "k8s.io/kubernetes/pkg/kubelet/container"
)
type podStatusProviderFunc func(uid types.UID, name, namespace string) (*kubecontainer.PodStatus, error)
func (f podStatusProviderFunc) GetPodStatus(uid types.UID, name, namespace string) (*kubecontainer.PodStatus, error) {
return f(uid, name, namespace)
}
func TestIsInitContainerFailed(t *testing.T) {
tests := []struct {
status *kubecontainer.Status

View File

@ -281,7 +281,8 @@ func (m *kubeGenericRuntimeManager) startContainer(podSandboxID string, podSandb
if handlerErr != nil {
klog.ErrorS(handlerErr, "Failed to execute PostStartHook", "pod", klog.KObj(pod),
"podUID", pod.UID, "containerName", container.Name, "containerID", kubeContainerID.String())
m.recordContainerEvent(pod, container, kubeContainerID.ID, v1.EventTypeWarning, events.FailedPostStartHook, msg)
// do not record the message in the event so that secrets won't leak from the server.
m.recordContainerEvent(pod, container, kubeContainerID.ID, v1.EventTypeWarning, events.FailedPostStartHook, "PostStartHook failed")
if err := m.killContainer(pod, kubeContainerID, container.Name, "FailedPostStartHook", reasonFailedPostStartHook, nil); err != nil {
klog.ErrorS(err, "Failed to kill container", "pod", klog.KObj(pod),
"podUID", pod.UID, "containerName", container.Name, "containerID", kubeContainerID.String())
@ -586,10 +587,11 @@ func (m *kubeGenericRuntimeManager) executePreStopHook(pod *v1.Pod, containerID
go func() {
defer close(done)
defer utilruntime.HandleCrash()
if msg, err := m.runner.Run(containerID, pod, containerSpec, containerSpec.Lifecycle.PreStop); err != nil {
if _, err := m.runner.Run(containerID, pod, containerSpec, containerSpec.Lifecycle.PreStop); err != nil {
klog.ErrorS(err, "PreStop hook failed", "pod", klog.KObj(pod), "podUID", pod.UID,
"containerName", containerSpec.Name, "containerID", containerID.String())
m.recordContainerEvent(pod, containerSpec, containerID.ID, v1.EventTypeWarning, events.FailedPreStopHook, msg)
// do not record the message in the event so that secrets won't leak from the server.
m.recordContainerEvent(pod, containerSpec, containerID.ID, v1.EventTypeWarning, events.FailedPreStopHook, "PreStopHook failed")
}
}()

View File

@ -28,9 +28,14 @@ import (
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/intstr"
utilfeature "k8s.io/apiserver/pkg/util/feature"
featuregatetesting "k8s.io/component-base/featuregate/testing"
v1 "k8s.io/api/core/v1"
runtimeapi "k8s.io/cri-api/pkg/apis/runtime/v1"
"k8s.io/kubernetes/pkg/features"
kubecontainer "k8s.io/kubernetes/pkg/kubelet/container"
containertest "k8s.io/kubernetes/pkg/kubelet/container/testing"
"k8s.io/kubernetes/pkg/kubelet/lifecycle"
@ -276,11 +281,21 @@ func TestLifeCycleHook(t *testing.T) {
fakeRunner := &containertest.FakeContainerCommandRunner{}
fakeHTTP := &fakeHTTP{}
fakePodStatusProvider := podStatusProviderFunc(func(uid types.UID, name, namespace string) (*kubecontainer.PodStatus, error) {
return &kubecontainer.PodStatus{
ID: uid,
Name: name,
Namespace: namespace,
IPs: []string{
"127.0.0.1",
},
}, nil
})
lcHanlder := lifecycle.NewHandlerRunner(
fakeHTTP,
fakeRunner,
nil)
fakePodStatusProvider)
m.runner = lcHanlder
@ -295,13 +310,27 @@ func TestLifeCycleHook(t *testing.T) {
// Configured and working HTTP hook
t.Run("PreStop-HTTPGet", func(t *testing.T) {
defer func() { fakeHTTP.url = "" }()
testPod.Spec.Containers[0].Lifecycle = httpLifeCycle
m.killContainer(testPod, cID, "foo", "testKill", "", &gracePeriod)
t.Run("inconsistent", func(t *testing.T) {
defer func() { fakeHTTP.req = nil }()
defer featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.ConsistentHTTPGetHandlers, false)()
httpLifeCycle.PreStop.HTTPGet.Port = intstr.IntOrString{}
testPod.Spec.Containers[0].Lifecycle = httpLifeCycle
m.killContainer(testPod, cID, "foo", "testKill", "", &gracePeriod)
if !strings.Contains(fakeHTTP.url, httpLifeCycle.PreStop.HTTPGet.Host) {
t.Errorf("HTTP Prestop hook was not invoked")
}
if fakeHTTP.req == nil || !strings.Contains(fakeHTTP.req.URL.String(), httpLifeCycle.PreStop.HTTPGet.Host) {
t.Errorf("HTTP Prestop hook was not invoked")
}
})
t.Run("consistent", func(t *testing.T) {
defer func() { fakeHTTP.req = nil }()
httpLifeCycle.PreStop.HTTPGet.Port = intstr.FromInt(80)
testPod.Spec.Containers[0].Lifecycle = httpLifeCycle
m.killContainer(testPod, cID, "foo", "testKill", "", &gracePeriod)
if fakeHTTP.req == nil || !strings.Contains(fakeHTTP.req.URL.String(), httpLifeCycle.PreStop.HTTPGet.Host) {
t.Errorf("HTTP Prestop hook was not invoked")
}
})
})
// When there is no time to run PreStopHook
@ -313,8 +342,8 @@ func TestLifeCycleHook(t *testing.T) {
m.killContainer(testPod, cID, "foo", "testKill", "", &gracePeriodLocal)
if strings.Contains(fakeHTTP.url, httpLifeCycle.PreStop.HTTPGet.Host) {
t.Errorf("HTTP Should not execute when gracePeriod is 0")
if fakeHTTP.req != nil {
t.Errorf("HTTP Prestop hook Should not execute when gracePeriod is 0")
}
})

View File

@ -176,7 +176,7 @@ func NewKubeGenericRuntimeManager(
podStateProvider podStateProvider,
osInterface kubecontainer.OSInterface,
runtimeHelper kubecontainer.RuntimeHelper,
httpClient types.HTTPGetter,
insecureContainerLifecycleHTTPClient types.HTTPDoer,
imageBackOff *flowcontrol.Backoff,
serializeImagePulls bool,
imagePullQPS float32,
@ -265,7 +265,7 @@ func NewKubeGenericRuntimeManager(
serializeImagePulls,
imagePullQPS,
imagePullBurst)
kubeRuntimeManager.runner = lifecycle.NewHandlerRunner(httpClient, kubeRuntimeManager, kubeRuntimeManager)
kubeRuntimeManager.runner = lifecycle.NewHandlerRunner(insecureContainerLifecycleHTTPClient, kubeRuntimeManager, kubeRuntimeManager)
kubeRuntimeManager.containerGC = newContainerGC(runtimeService, podStateProvider, kubeRuntimeManager)
kubeRuntimeManager.podStateProvider = podStateProvider

View File

@ -18,6 +18,7 @@ package lifecycle
import (
"fmt"
"io"
"net"
"net/http"
"strconv"
@ -25,12 +26,14 @@ import (
v1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/intstr"
utilfeature "k8s.io/apiserver/pkg/util/feature"
"k8s.io/klog/v2"
"k8s.io/kubernetes/pkg/features"
kubecontainer "k8s.io/kubernetes/pkg/kubelet/container"
kubetypes "k8s.io/kubernetes/pkg/kubelet/types"
"k8s.io/kubernetes/pkg/kubelet/util/format"
httpprobe "k8s.io/kubernetes/pkg/probe/http"
"k8s.io/kubernetes/pkg/security/apparmor"
utilio "k8s.io/utils/io"
)
const (
@ -38,7 +41,7 @@ const (
)
type handlerRunner struct {
httpGetter kubetypes.HTTPGetter
httpDoer kubetypes.HTTPDoer
commandRunner kubecontainer.CommandRunner
containerManager podStatusProvider
}
@ -48,9 +51,9 @@ type podStatusProvider interface {
}
// NewHandlerRunner returns a configured lifecycle handler for a container.
func NewHandlerRunner(httpGetter kubetypes.HTTPGetter, commandRunner kubecontainer.CommandRunner, containerManager podStatusProvider) kubecontainer.HandlerRunner {
func NewHandlerRunner(httpDoer kubetypes.HTTPDoer, commandRunner kubecontainer.CommandRunner, containerManager podStatusProvider) kubecontainer.HandlerRunner {
return &handlerRunner{
httpGetter: httpGetter,
httpDoer: httpDoer,
commandRunner: commandRunner,
containerManager: containerManager,
}
@ -68,9 +71,10 @@ func (hr *handlerRunner) Run(containerID kubecontainer.ContainerID, pod *v1.Pod,
}
return msg, err
case handler.HTTPGet != nil:
msg, err := hr.runHTTPHandler(pod, container, handler)
err := hr.runHTTPHandler(pod, container, handler)
var msg string
if err != nil {
msg = fmt.Sprintf("HTTP lifecycle hook (%s) for Container %q in Pod %q failed - error: %v, message: %q", handler.HTTPGet.Path, container.Name, format.Pod(pod), err, msg)
msg = fmt.Sprintf("HTTP lifecycle hook (%s) for Container %q in Pod %q failed - error: %v", handler.HTTPGet.Path, container.Name, format.Pod(pod), err)
klog.V(1).ErrorS(err, "HTTP lifecycle hook for Container in Pod failed", "path", handler.HTTPGet.Path, "containerName", container.Name, "pod", klog.KObj(pod))
}
return msg, err
@ -105,19 +109,33 @@ func resolvePort(portReference intstr.IntOrString, container *v1.Container) (int
return -1, fmt.Errorf("couldn't find port: %v in %v", portReference, container)
}
func (hr *handlerRunner) runHTTPHandler(pod *v1.Pod, container *v1.Container, handler *v1.LifecycleHandler) (string, error) {
func (hr *handlerRunner) runHTTPHandler(pod *v1.Pod, container *v1.Container, handler *v1.LifecycleHandler) error {
host := handler.HTTPGet.Host
podIP := host
if len(host) == 0 {
status, err := hr.containerManager.GetPodStatus(pod.UID, pod.Name, pod.Namespace)
if err != nil {
klog.ErrorS(err, "Unable to get pod info, event handlers may be invalid.", "pod", klog.KObj(pod))
return "", err
return err
}
if len(status.IPs) == 0 {
return "", fmt.Errorf("failed to find networking container: %v", status)
return fmt.Errorf("failed to find networking container: %v", status)
}
host = status.IPs[0]
podIP = host
}
if utilfeature.DefaultFeatureGate.Enabled(features.ConsistentHTTPGetHandlers) {
req, err := httpprobe.NewRequestForHTTPGetAction(handler.HTTPGet, container, podIP, "lifecycle")
if err != nil {
return err
}
resp, err := hr.httpDoer.Do(req)
discardHTTPRespBody(resp)
return err
}
// Deprecated code path.
var port int
if handler.HTTPGet.Port.Type == intstr.String && len(handler.HTTPGet.Port.StrVal) == 0 {
port = 80
@ -125,24 +143,34 @@ func (hr *handlerRunner) runHTTPHandler(pod *v1.Pod, container *v1.Container, ha
var err error
port, err = resolvePort(handler.HTTPGet.Port, container)
if err != nil {
return "", err
return err
}
}
url := fmt.Sprintf("http://%s/%s", net.JoinHostPort(host, strconv.Itoa(port)), handler.HTTPGet.Path)
resp, err := hr.httpGetter.Get(url)
return getHTTPRespBody(resp), err
req, err := http.NewRequest(http.MethodGet, url, nil)
if err != nil {
return err
}
resp, err := hr.httpDoer.Do(req)
discardHTTPRespBody(resp)
return err
}
func getHTTPRespBody(resp *http.Response) string {
func discardHTTPRespBody(resp *http.Response) {
if resp == nil {
return ""
return
}
// Ensure the response body is fully read and closed
// before we reconnect, so that we reuse the same TCP
// connection.
defer resp.Body.Close()
bytes, err := utilio.ReadAtMost(resp.Body, maxRespBodyLength)
if err == nil || err == utilio.ErrLimitReached {
return string(bytes)
if resp.ContentLength <= maxRespBodyLength {
io.Copy(io.Discard, &io.LimitedReader{R: resp.Body, N: maxRespBodyLength})
}
return ""
}
// NewAppArmorAdmitHandler returns a PodAdmitHandler which is used to evaluate

View File

@ -25,8 +25,13 @@ import (
"testing"
"time"
"github.com/google/go-cmp/cmp"
v1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/intstr"
utilfeature "k8s.io/apiserver/pkg/util/feature"
featuregatetesting "k8s.io/component-base/featuregate/testing"
"k8s.io/kubernetes/pkg/features"
kubecontainer "k8s.io/kubernetes/pkg/kubelet/container"
"k8s.io/kubernetes/pkg/kubelet/util/format"
)
@ -89,6 +94,23 @@ func (f *fakeContainerCommandRunner) RunInContainer(id kubecontainer.ContainerID
return []byte(f.Msg), f.Err
}
func stubPodStatusProvider(podIP string) podStatusProvider {
return podStatusProviderFunc(func(uid types.UID, name, namespace string) (*kubecontainer.PodStatus, error) {
return &kubecontainer.PodStatus{
ID: uid,
Name: name,
Namespace: namespace,
IPs: []string{podIP},
}, nil
})
}
type podStatusProviderFunc func(uid types.UID, name, namespace string) (*kubecontainer.PodStatus, error)
func (f podStatusProviderFunc) GetPodStatus(uid types.UID, name, namespace string) (*kubecontainer.PodStatus, error) {
return f(uid, name, namespace)
}
func TestRunHandlerExec(t *testing.T) {
fakeCommandRunner := fakeContainerCommandRunner{}
handlerRunner := NewHandlerRunner(&fakeHTTP{}, &fakeCommandRunner, nil)
@ -122,19 +144,22 @@ func TestRunHandlerExec(t *testing.T) {
}
type fakeHTTP struct {
url string
err error
resp *http.Response
url string
headers http.Header
err error
resp *http.Response
}
func (f *fakeHTTP) Get(url string) (*http.Response, error) {
f.url = url
func (f *fakeHTTP) Do(req *http.Request) (*http.Response, error) {
f.url = req.URL.String()
f.headers = req.Header.Clone()
return f.resp, f.err
}
func TestRunHandlerHttp(t *testing.T) {
fakeHTTPGetter := fakeHTTP{}
handlerRunner := NewHandlerRunner(&fakeHTTPGetter, &fakeContainerCommandRunner{}, nil)
fakePodStatusProvider := stubPodStatusProvider("127.0.0.1")
handlerRunner := NewHandlerRunner(&fakeHTTPGetter, &fakeContainerCommandRunner{}, fakePodStatusProvider)
containerID := kubecontainer.ContainerID{Type: "test", ID: "abc1234"}
containerName := "containerFoo"
@ -154,6 +179,7 @@ func TestRunHandlerHttp(t *testing.T) {
pod := v1.Pod{}
pod.ObjectMeta.Name = "podFoo"
pod.ObjectMeta.Namespace = "nsFoo"
pod.ObjectMeta.UID = "foo-bar-quux"
pod.Spec.Containers = []v1.Container{container}
_, err := handlerRunner.Run(containerID, &pod, &container, container.Lifecycle.PostStart)
@ -165,6 +191,462 @@ func TestRunHandlerHttp(t *testing.T) {
}
}
func TestRunHandlerHttpWithHeaders(t *testing.T) {
fakeHTTPDoer := fakeHTTP{}
fakePodStatusProvider := stubPodStatusProvider("127.0.0.1")
handlerRunner := NewHandlerRunner(&fakeHTTPDoer, &fakeContainerCommandRunner{}, fakePodStatusProvider)
containerID := kubecontainer.ContainerID{Type: "test", ID: "abc1234"}
containerName := "containerFoo"
container := v1.Container{
Name: containerName,
Lifecycle: &v1.Lifecycle{
PostStart: &v1.LifecycleHandler{
HTTPGet: &v1.HTTPGetAction{
Host: "foo",
Port: intstr.FromInt(8080),
Path: "/bar",
HTTPHeaders: []v1.HTTPHeader{
{Name: "Foo", Value: "bar"},
},
},
},
},
}
pod := v1.Pod{}
pod.ObjectMeta.Name = "podFoo"
pod.ObjectMeta.Namespace = "nsFoo"
pod.Spec.Containers = []v1.Container{container}
_, err := handlerRunner.Run(containerID, &pod, &container, container.Lifecycle.PostStart)
if err != nil {
t.Errorf("unexpected error: %v", err)
}
if fakeHTTPDoer.url != "http://foo:8080/bar" {
t.Errorf("unexpected url: %s", fakeHTTPDoer.url)
}
if fakeHTTPDoer.headers["Foo"][0] != "bar" {
t.Errorf("missing http header: %s", fakeHTTPDoer.headers)
}
}
func TestRunHandlerHttps(t *testing.T) {
fakeHTTPDoer := fakeHTTP{}
fakePodStatusProvider := stubPodStatusProvider("127.0.0.1")
handlerRunner := NewHandlerRunner(&fakeHTTPDoer, &fakeContainerCommandRunner{}, fakePodStatusProvider)
containerID := kubecontainer.ContainerID{Type: "test", ID: "abc1234"}
containerName := "containerFoo"
container := v1.Container{
Name: containerName,
Lifecycle: &v1.Lifecycle{
PostStart: &v1.LifecycleHandler{
HTTPGet: &v1.HTTPGetAction{
Scheme: v1.URISchemeHTTPS,
Host: "foo",
Path: "bar",
},
},
},
}
pod := v1.Pod{}
pod.ObjectMeta.Name = "podFoo"
pod.ObjectMeta.Namespace = "nsFoo"
pod.Spec.Containers = []v1.Container{container}
t.Run("consistent", func(t *testing.T) {
container.Lifecycle.PostStart.HTTPGet.Port = intstr.FromString("70")
pod.Spec.Containers = []v1.Container{container}
_, err := handlerRunner.Run(containerID, &pod, &container, container.Lifecycle.PostStart)
if err != nil {
t.Errorf("unexpected error: %v", err)
}
if fakeHTTPDoer.url != "https://foo:70/bar" {
t.Errorf("unexpected url: %s", fakeHTTPDoer.url)
}
})
t.Run("inconsistent", func(t *testing.T) {
defer featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.ConsistentHTTPGetHandlers, false)()
container.Lifecycle.PostStart.HTTPGet.Port = intstr.FromString("70")
pod.Spec.Containers = []v1.Container{container}
_, err := handlerRunner.Run(containerID, &pod, &container, container.Lifecycle.PostStart)
if err != nil {
t.Errorf("unexpected error: %v", err)
}
if fakeHTTPDoer.url != "http://foo:70/bar" {
t.Errorf("unexpected url: %q", fakeHTTPDoer.url)
}
})
}
func TestRunHandlerHTTPPort(t *testing.T) {
tests := []struct {
Name string
FeatureGateEnabled bool
Port intstr.IntOrString
ExpectError bool
Expected string
}{
{
Name: "consistent/with port",
FeatureGateEnabled: true,
Port: intstr.FromString("70"),
Expected: "https://foo:70/bar",
}, {
Name: "consistent/without port",
FeatureGateEnabled: true,
Port: intstr.FromString(""),
ExpectError: true,
}, {
Name: "inconsistent/with port",
FeatureGateEnabled: false,
Port: intstr.FromString("70"),
Expected: "http://foo:70/bar",
}, {
Name: "inconsistent/without port",
Port: intstr.FromString(""),
FeatureGateEnabled: false,
Expected: "http://foo:80/bar",
},
}
fakePodStatusProvider := stubPodStatusProvider("127.0.0.1")
containerID := kubecontainer.ContainerID{Type: "test", ID: "abc1234"}
containerName := "containerFoo"
container := v1.Container{
Name: containerName,
Lifecycle: &v1.Lifecycle{
PostStart: &v1.LifecycleHandler{
HTTPGet: &v1.HTTPGetAction{
Scheme: v1.URISchemeHTTPS,
Host: "foo",
Port: intstr.FromString("unexpected"),
Path: "bar",
},
},
},
}
pod := v1.Pod{}
pod.ObjectMeta.Name = "podFoo"
pod.ObjectMeta.Namespace = "nsFoo"
pod.Spec.Containers = []v1.Container{container}
for _, tt := range tests {
t.Run(tt.Name, func(t *testing.T) {
defer featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.ConsistentHTTPGetHandlers, tt.FeatureGateEnabled)()
fakeHTTPDoer := fakeHTTP{}
handlerRunner := NewHandlerRunner(&fakeHTTPDoer, &fakeContainerCommandRunner{}, fakePodStatusProvider)
container.Lifecycle.PostStart.HTTPGet.Port = tt.Port
pod.Spec.Containers = []v1.Container{container}
_, err := handlerRunner.Run(containerID, &pod, &container, container.Lifecycle.PostStart)
if hasError := (err != nil); hasError != tt.ExpectError {
t.Errorf("unexpected error: %v", err)
}
if fakeHTTPDoer.url != tt.Expected {
t.Errorf("unexpected url: %s", fakeHTTPDoer.url)
}
})
}
}
func TestRunHTTPHandler(t *testing.T) {
type expected struct {
OldURL string
OldHeader http.Header
NewURL string
NewHeader http.Header
}
tests := []struct {
Name string
PodIP string
HTTPGet *v1.HTTPGetAction
Expected expected
}{
{
Name: "missing pod IP",
PodIP: "",
HTTPGet: &v1.HTTPGetAction{
Path: "foo",
Port: intstr.FromString("42"),
Host: "example.test",
Scheme: "http",
HTTPHeaders: []v1.HTTPHeader{},
},
Expected: expected{
OldURL: "http://example.test:42/foo",
OldHeader: http.Header{},
NewURL: "http://example.test:42/foo",
NewHeader: http.Header{
"Accept": {"*/*"},
"User-Agent": {"kube-lifecycle/."},
},
},
}, {
Name: "missing host",
PodIP: "233.252.0.1",
HTTPGet: &v1.HTTPGetAction{
Path: "foo",
Port: intstr.FromString("42"),
Scheme: "http",
HTTPHeaders: []v1.HTTPHeader{},
},
Expected: expected{
OldURL: "http://233.252.0.1:42/foo",
OldHeader: http.Header{},
NewURL: "http://233.252.0.1:42/foo",
NewHeader: http.Header{
"Accept": {"*/*"},
"User-Agent": {"kube-lifecycle/."},
},
},
}, {
Name: "path with leading slash",
PodIP: "233.252.0.1",
HTTPGet: &v1.HTTPGetAction{
Path: "/foo",
Port: intstr.FromString("42"),
Scheme: "http",
HTTPHeaders: []v1.HTTPHeader{},
},
Expected: expected{
OldURL: "http://233.252.0.1:42//foo",
OldHeader: http.Header{},
NewURL: "http://233.252.0.1:42/foo",
NewHeader: http.Header{
"Accept": {"*/*"},
"User-Agent": {"kube-lifecycle/."},
},
},
}, {
Name: "path without leading slash",
PodIP: "233.252.0.1",
HTTPGet: &v1.HTTPGetAction{
Path: "foo",
Port: intstr.FromString("42"),
Scheme: "http",
HTTPHeaders: []v1.HTTPHeader{},
},
Expected: expected{
OldURL: "http://233.252.0.1:42/foo",
OldHeader: http.Header{},
NewURL: "http://233.252.0.1:42/foo",
NewHeader: http.Header{
"Accept": {"*/*"},
"User-Agent": {"kube-lifecycle/."},
},
},
}, {
Name: "port resolution",
PodIP: "233.252.0.1",
HTTPGet: &v1.HTTPGetAction{
Path: "foo",
Port: intstr.FromString("quux"),
Scheme: "http",
HTTPHeaders: []v1.HTTPHeader{},
},
Expected: expected{
OldURL: "http://233.252.0.1:8080/foo",
OldHeader: http.Header{},
NewURL: "http://233.252.0.1:8080/foo",
NewHeader: http.Header{
"Accept": {"*/*"},
"User-Agent": {"kube-lifecycle/."},
},
},
}, {
Name: "https",
PodIP: "233.252.0.1",
HTTPGet: &v1.HTTPGetAction{
Path: "foo",
Port: intstr.FromString("4430"),
Scheme: "https",
HTTPHeaders: []v1.HTTPHeader{},
},
Expected: expected{
OldURL: "http://233.252.0.1:4430/foo",
OldHeader: http.Header{},
NewURL: "https://233.252.0.1:4430/foo",
NewHeader: http.Header{
"Accept": {"*/*"},
"User-Agent": {"kube-lifecycle/."},
},
},
}, {
Name: "unknown scheme",
PodIP: "233.252.0.1",
HTTPGet: &v1.HTTPGetAction{
Path: "foo",
Port: intstr.FromString("80"),
Scheme: "baz",
HTTPHeaders: []v1.HTTPHeader{},
},
Expected: expected{
OldURL: "http://233.252.0.1:80/foo",
OldHeader: http.Header{},
NewURL: "baz://233.252.0.1:80/foo",
NewHeader: http.Header{
"Accept": {"*/*"},
"User-Agent": {"kube-lifecycle/."},
},
},
}, {
Name: "query param",
PodIP: "233.252.0.1",
HTTPGet: &v1.HTTPGetAction{
Path: "foo?k=v",
Port: intstr.FromString("80"),
Scheme: "http",
HTTPHeaders: []v1.HTTPHeader{},
},
Expected: expected{
OldURL: "http://233.252.0.1:80/foo?k=v",
OldHeader: http.Header{},
NewURL: "http://233.252.0.1:80/foo?k=v",
NewHeader: http.Header{
"Accept": {"*/*"},
"User-Agent": {"kube-lifecycle/."},
},
},
}, {
Name: "fragment",
PodIP: "233.252.0.1",
HTTPGet: &v1.HTTPGetAction{
Path: "foo#frag",
Port: intstr.FromString("80"),
Scheme: "http",
HTTPHeaders: []v1.HTTPHeader{},
},
Expected: expected{
OldURL: "http://233.252.0.1:80/foo#frag",
OldHeader: http.Header{},
NewURL: "http://233.252.0.1:80/foo#frag",
NewHeader: http.Header{
"Accept": {"*/*"},
"User-Agent": {"kube-lifecycle/."},
},
},
}, {
Name: "headers",
PodIP: "233.252.0.1",
HTTPGet: &v1.HTTPGetAction{
Path: "foo",
Port: intstr.FromString("80"),
Scheme: "http",
HTTPHeaders: []v1.HTTPHeader{
{
Name: "Foo",
Value: "bar",
},
},
},
Expected: expected{
OldURL: "http://233.252.0.1:80/foo",
OldHeader: http.Header{},
NewURL: "http://233.252.0.1:80/foo",
NewHeader: http.Header{
"Accept": {"*/*"},
"Foo": {"bar"},
"User-Agent": {"kube-lifecycle/."},
},
},
}, {
Name: "host header",
PodIP: "233.252.0.1",
HTTPGet: &v1.HTTPGetAction{
Host: "example.test",
Path: "foo",
Port: intstr.FromString("80"),
Scheme: "http",
HTTPHeaders: []v1.HTTPHeader{
{
Name: "Host",
Value: "from.header",
},
},
},
Expected: expected{
OldURL: "http://example.test:80/foo",
OldHeader: http.Header{},
NewURL: "http://example.test:80/foo",
NewHeader: http.Header{
"Accept": {"*/*"},
"User-Agent": {"kube-lifecycle/."},
"Host": {"from.header"},
},
},
},
}
containerID := kubecontainer.ContainerID{Type: "test", ID: "abc1234"}
containerName := "containerFoo"
container := v1.Container{
Name: containerName,
Lifecycle: &v1.Lifecycle{
PostStart: &v1.LifecycleHandler{},
},
Ports: []v1.ContainerPort{
{
Name: "quux",
ContainerPort: 8080,
},
},
}
pod := v1.Pod{}
pod.ObjectMeta.Name = "podFoo"
pod.ObjectMeta.Namespace = "nsFoo"
pod.Spec.Containers = []v1.Container{container}
for _, tt := range tests {
t.Run(tt.Name, func(t *testing.T) {
fakePodStatusProvider := stubPodStatusProvider(tt.PodIP)
container.Lifecycle.PostStart.HTTPGet = tt.HTTPGet
pod.Spec.Containers = []v1.Container{container}
verify := func(t *testing.T, expectedHeader http.Header, expectedURL string) {
fakeHTTPDoer := fakeHTTP{}
handlerRunner := NewHandlerRunner(&fakeHTTPDoer, &fakeContainerCommandRunner{}, fakePodStatusProvider)
_, err := handlerRunner.Run(containerID, &pod, &container, container.Lifecycle.PostStart)
if err != nil {
t.Fatal(err)
}
if diff := cmp.Diff(expectedHeader, fakeHTTPDoer.headers); diff != "" {
t.Errorf("unexpected header (-want, +got)\n:%s", diff)
}
if fakeHTTPDoer.url != expectedURL {
t.Errorf("url = %v; want %v", fakeHTTPDoer.url, tt.Expected.NewURL)
}
}
t.Run("consistent", func(t *testing.T) {
defer featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.ConsistentHTTPGetHandlers, true)()
verify(t, tt.Expected.NewHeader, tt.Expected.NewURL)
})
t.Run("inconsistent", func(t *testing.T) {
defer featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.ConsistentHTTPGetHandlers, false)()
verify(t, tt.Expected.OldHeader, tt.Expected.OldURL)
})
})
}
}
func TestRunHandlerNil(t *testing.T) {
handlerRunner := NewHandlerRunner(&fakeHTTP{}, &fakeContainerCommandRunner{}, nil)
containerID := kubecontainer.ContainerID{Type: "test", ID: "abc1234"}
@ -228,7 +710,11 @@ func TestRunHandlerHttpFailure(t *testing.T) {
Body: io.NopCloser(strings.NewReader(expectedErr.Error())),
}
fakeHTTPGetter := fakeHTTP{err: expectedErr, resp: &expectedResp}
handlerRunner := NewHandlerRunner(&fakeHTTPGetter, &fakeContainerCommandRunner{}, nil)
fakePodStatusProvider := stubPodStatusProvider("127.0.0.1")
handlerRunner := NewHandlerRunner(&fakeHTTPGetter, &fakeContainerCommandRunner{}, fakePodStatusProvider)
containerName := "containerFoo"
containerID := kubecontainer.ContainerID{Type: "test", ID: "abc1234"}
container := v1.Container{
@ -247,7 +733,7 @@ func TestRunHandlerHttpFailure(t *testing.T) {
pod.ObjectMeta.Name = "podFoo"
pod.ObjectMeta.Namespace = "nsFoo"
pod.Spec.Containers = []v1.Container{container}
expectedErrMsg := fmt.Sprintf("HTTP lifecycle hook (%s) for Container %q in Pod %q failed - error: %v, message: %q", "bar", containerName, format.Pod(&pod), expectedErr, expectedErr.Error())
expectedErrMsg := fmt.Sprintf("HTTP lifecycle hook (%s) for Container %q in Pod %q failed - error: %v", "bar", containerName, format.Pod(&pod), expectedErr)
msg, err := handlerRunner.Run(containerID, &pod, &container, container.Lifecycle.PostStart)
if err == nil {
t.Errorf("expected error: %v", expectedErr)

View File

@ -19,15 +19,9 @@ package prober
import (
"fmt"
"io"
"net"
"net/http"
"net/url"
"strconv"
"strings"
"time"
v1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/util/intstr"
utilfeature "k8s.io/apiserver/pkg/util/feature"
"k8s.io/client-go/tools/record"
kubefeatures "k8s.io/kubernetes/pkg/features"
@ -140,16 +134,6 @@ func (pb *prober) runProbeWithRetries(probeType probeType, p *v1.Probe, pod *v1.
return result, output, err
}
// buildHeaderMap takes a list of HTTPHeader <name, value> string
// pairs and returns a populated string->[]string http.Header map.
func buildHeader(headerList []v1.HTTPHeader) http.Header {
headers := make(http.Header)
for _, header := range headerList {
headers[header.Name] = append(headers[header.Name], header.Value)
}
return headers
}
func (pb *prober) runProbe(probeType probeType, p *v1.Probe, pod *v1.Pod, status v1.PodStatus, container v1.Container, containerID kubecontainer.ContainerID) (probe.Result, string, error) {
timeout := time.Duration(p.TimeoutSeconds) * time.Second
if p.Exec != nil {
@ -158,24 +142,22 @@ func (pb *prober) runProbe(probeType probeType, p *v1.Probe, pod *v1.Pod, status
return pb.exec.Probe(pb.newExecInContainer(container, containerID, command, timeout))
}
if p.HTTPGet != nil {
scheme := strings.ToLower(string(p.HTTPGet.Scheme))
host := p.HTTPGet.Host
if host == "" {
host = status.PodIP
}
port, err := extractPort(p.HTTPGet.Port, container)
req, err := httpprobe.NewRequestForHTTPGetAction(p.HTTPGet, &container, status.PodIP, "probe")
if err != nil {
return probe.Unknown, "", err
}
path := p.HTTPGet.Path
klog.V(4).InfoS("HTTP-Probe", "scheme", scheme, "host", host, "port", port, "path", path, "timeout", timeout)
url := formatURL(scheme, host, port, path)
headers := buildHeader(p.HTTPGet.HTTPHeaders)
klog.V(4).InfoS("HTTP-Probe Headers", "headers", headers)
return pb.http.Probe(url, headers, timeout)
if klogV4 := klog.V(4); klogV4.Enabled() {
port := req.URL.Port()
host := req.URL.Hostname()
path := req.URL.Path
scheme := req.URL.Scheme
headers := p.HTTPGet.HTTPHeaders
klogV4.InfoS("HTTP-Probe", "scheme", scheme, "host", host, "port", port, "path", path, "timeout", timeout, "headers", headers)
}
return pb.http.Probe(req, timeout)
}
if p.TCPSocket != nil {
port, err := extractPort(p.TCPSocket.Port, container)
port, err := probe.ResolveContainerPort(p.TCPSocket.Port, &container)
if err != nil {
return probe.Unknown, "", err
}
@ -198,52 +180,6 @@ func (pb *prober) runProbe(probeType probeType, p *v1.Probe, pod *v1.Pod, status
return probe.Unknown, "", fmt.Errorf("missing probe handler for %s:%s", format.Pod(pod), container.Name)
}
func extractPort(param intstr.IntOrString, container v1.Container) (int, error) {
port := -1
var err error
switch param.Type {
case intstr.Int:
port = param.IntValue()
case intstr.String:
if port, err = findPortByName(container, param.StrVal); err != nil {
// Last ditch effort - maybe it was an int stored as string?
if port, err = strconv.Atoi(param.StrVal); err != nil {
return port, err
}
}
default:
return port, fmt.Errorf("intOrString had no kind: %+v", param)
}
if port > 0 && port < 65536 {
return port, nil
}
return port, fmt.Errorf("invalid port number: %v", port)
}
// findPortByName is a helper function to look up a port in a container by name.
func findPortByName(container v1.Container, portName string) (int, error) {
for _, port := range container.Ports {
if port.Name == portName {
return int(port.ContainerPort), nil
}
}
return 0, fmt.Errorf("port %s not found", portName)
}
// formatURL formats a URL from args. For testability.
func formatURL(scheme string, host string, port int, path string) *url.URL {
u, err := url.Parse(path)
// Something is busted with the path, but it's too late to reject it. Pass it along as is.
if err != nil {
u = &url.URL{
Path: path,
}
}
u.Scheme = scheme
u.Host = net.JoinHostPort(host, strconv.Itoa(port))
return u
}
type execInContainer struct {
// run executes a command in a container. Combined stdout and stderr output is always returned. An
// error is returned if one occurred.

View File

@ -20,7 +20,6 @@ import (
"bytes"
"errors"
"fmt"
"net/http"
"reflect"
"strings"
"testing"
@ -36,47 +35,6 @@ import (
execprobe "k8s.io/kubernetes/pkg/probe/exec"
)
func TestFormatURL(t *testing.T) {
testCases := []struct {
scheme string
host string
port int
path string
result string
}{
{"http", "localhost", 93, "", "http://localhost:93"},
{"https", "localhost", 93, "/path", "https://localhost:93/path"},
{"http", "localhost", 93, "?foo", "http://localhost:93?foo"},
{"https", "localhost", 93, "/path?bar", "https://localhost:93/path?bar"},
}
for _, test := range testCases {
url := formatURL(test.scheme, test.host, test.port, test.path)
if url.String() != test.result {
t.Errorf("Expected %s, got %s", test.result, url.String())
}
}
}
func TestFindPortByName(t *testing.T) {
container := v1.Container{
Ports: []v1.ContainerPort{
{
Name: "foo",
ContainerPort: 8080,
},
{
Name: "bar",
ContainerPort: 9000,
},
},
}
want := 8080
got, err := findPortByName(container, "foo")
if got != want || err != nil {
t.Errorf("Expected %v, got %v, err: %v", want, got, err)
}
}
func TestGetURLParts(t *testing.T) {
testCases := []struct {
probe *v1.HTTPGetAction
@ -114,7 +72,7 @@ func TestGetURLParts(t *testing.T) {
if host == "" {
host = state.PodIP
}
port, err := extractPort(test.probe.Port, container)
port, err := probe.ResolveContainerPort(test.probe.Port, &container)
if test.ok && err != nil {
t.Errorf("Unexpected error: %v", err)
}
@ -158,7 +116,7 @@ func TestGetTCPAddrParts(t *testing.T) {
},
},
}
port, err := extractPort(test.probe.Port, container)
port, err := probe.ResolveContainerPort(test.probe.Port, &container)
if !test.ok && err == nil {
t.Errorf("Expected error for %+v, got %s:%d", test, host, port)
}
@ -173,33 +131,6 @@ func TestGetTCPAddrParts(t *testing.T) {
}
}
func TestHTTPHeaders(t *testing.T) {
testCases := []struct {
input []v1.HTTPHeader
output http.Header
}{
{[]v1.HTTPHeader{}, http.Header{}},
{[]v1.HTTPHeader{
{Name: "X-Muffins-Or-Cupcakes", Value: "Muffins"},
}, http.Header{"X-Muffins-Or-Cupcakes": {"Muffins"}}},
{[]v1.HTTPHeader{
{Name: "X-Muffins-Or-Cupcakes", Value: "Muffins"},
{Name: "X-Muffins-Or-Plumcakes", Value: "Muffins!"},
}, http.Header{"X-Muffins-Or-Cupcakes": {"Muffins"},
"X-Muffins-Or-Plumcakes": {"Muffins!"}}},
{[]v1.HTTPHeader{
{Name: "X-Muffins-Or-Cupcakes", Value: "Muffins"},
{Name: "X-Muffins-Or-Cupcakes", Value: "Cupcakes, too"},
}, http.Header{"X-Muffins-Or-Cupcakes": {"Muffins", "Cupcakes, too"}}},
}
for _, test := range testCases {
headers := buildHeader(test.input)
if !reflect.DeepEqual(test.output, headers) {
t.Errorf("Expected %#v, got %#v", test.output, headers)
}
}
}
func TestProbe(t *testing.T) {
containerID := kubecontainer.ContainerID{Type: "test", ID: "foobar"}

View File

@ -26,10 +26,9 @@ import (
// TODO: Reconcile custom types in kubelet/types and this subpackage
// HTTPGetter is an interface representing the ability to perform HTTP GET requests.
type HTTPGetter interface {
// Get issues a GET to the specified URL.
Get(url string) (*http.Response, error)
// HTTPDoer encapsulates http.Do functionality
type HTTPDoer interface {
Do(req *http.Request) (*http.Response, error)
}
// Timestamp wraps around time.Time and offers utilities to format and parse

View File

@ -21,11 +21,9 @@ import (
"errors"
"fmt"
"net/http"
"net/url"
"time"
utilnet "k8s.io/apimachinery/pkg/util/net"
"k8s.io/component-base/version"
"k8s.io/kubernetes/pkg/probe"
"k8s.io/klog/v2"
@ -63,7 +61,7 @@ func NewWithTLSConfig(config *tls.Config, followNonLocalRedirects bool) Prober {
// Prober is an interface that defines the Probe function for doing HTTP readiness/liveness checks.
type Prober interface {
Probe(url *url.URL, headers http.Header, timeout time.Duration) (probe.Result, string, error)
Probe(req *http.Request, timeout time.Duration) (probe.Result, string, error)
}
type httpProber struct {
@ -71,14 +69,14 @@ type httpProber struct {
followNonLocalRedirects bool
}
// Probe returns a probing result. The only case the err will not be nil is when there is a problem reading the response body.
func (pr httpProber) Probe(url *url.URL, headers http.Header, timeout time.Duration) (probe.Result, string, error) {
// Probe returns a ProbeRunner capable of running an HTTP check.
func (pr httpProber) Probe(req *http.Request, timeout time.Duration) (probe.Result, string, error) {
client := &http.Client{
Timeout: timeout,
Transport: pr.transport,
CheckRedirect: redirectChecker(pr.followNonLocalRedirects),
CheckRedirect: RedirectChecker(pr.followNonLocalRedirects),
}
return DoHTTPProbe(url, headers, client)
return DoHTTPProbe(req, client)
}
// GetHTTPInterface is an interface for making HTTP requests, that returns a response and error.
@ -90,29 +88,9 @@ type GetHTTPInterface interface {
// If the HTTP response code is successful (i.e. 400 > code >= 200), it returns Success.
// If the HTTP response code is unsuccessful or HTTP communication fails, it returns Failure.
// This is exported because some other packages may want to do direct HTTP probes.
func DoHTTPProbe(url *url.URL, headers http.Header, client GetHTTPInterface) (probe.Result, string, error) {
req, err := http.NewRequest("GET", url.String(), nil)
if err != nil {
// Convert errors into failures to catch timeouts.
return probe.Failure, err.Error(), nil
}
if headers == nil {
headers = http.Header{}
}
if _, ok := headers["User-Agent"]; !ok {
// explicitly set User-Agent so it's not set to default Go value
v := version.Get()
headers.Set("User-Agent", fmt.Sprintf("kube-probe/%s.%s", v.Major, v.Minor))
}
if _, ok := headers["Accept"]; !ok {
// Accept header was not defined. accept all
headers.Set("Accept", "*/*")
} else if headers.Get("Accept") == "" {
// Accept header was overridden but is empty. removing
headers.Del("Accept")
}
req.Header = headers
req.Host = headers.Get("Host")
func DoHTTPProbe(req *http.Request, client GetHTTPInterface) (probe.Result, string, error) {
url := req.URL
headers := req.Header
res, err := client.Do(req)
if err != nil {
// Convert errors into failures to catch timeouts.
@ -140,7 +118,8 @@ func DoHTTPProbe(url *url.URL, headers http.Header, client GetHTTPInterface) (pr
return probe.Failure, fmt.Sprintf("HTTP probe failed with statuscode: %d", res.StatusCode), nil
}
func redirectChecker(followNonLocalRedirects bool) func(*http.Request, []*http.Request) error {
// RedirectChecker returns a function that can be used to check HTTP redirects.
func RedirectChecker(followNonLocalRedirects bool) func(*http.Request, []*http.Request) error {
if followNonLocalRedirects {
return nil // Use the default http client checker.
}

View File

@ -84,7 +84,13 @@ func TestHTTPProbeProxy(t *testing.T) {
if err != nil {
t.Errorf("proxy test unexpected error: %v", err)
}
_, response, _ := prober.Probe(url, http.Header{}, time.Second*3)
req, err := NewProbeRequest(url, http.Header{})
if err != nil {
t.Fatal(err)
}
_, response, _ := prober.Probe(req, time.Second*3)
if response == res {
t.Errorf("proxy test unexpected error: the probe is using proxy")
@ -376,7 +382,11 @@ func TestHTTPProbeChecker(t *testing.T) {
if err != nil {
t.Errorf("case %d: unexpected error: %v", i, err)
}
health, output, err := prober.Probe(u, test.reqHeaders, 1*time.Second)
req, err := NewProbeRequest(u, test.reqHeaders)
if err != nil {
t.Fatal(err)
}
health, output, err := prober.Probe(req, 1*time.Second)
if test.health == probe.Unknown && err == nil {
t.Errorf("case %d: expected error", i)
}
@ -436,7 +446,9 @@ func TestHTTPProbeChecker_NonLocalRedirects(t *testing.T) {
prober := New(followNonLocalRedirects)
target, err := url.Parse(server.URL + "/redirect?loc=" + url.QueryEscape(test.redirect))
require.NoError(t, err)
result, _, _ := prober.Probe(target, nil, wait.ForeverTestTimeout)
req, err := NewProbeRequest(target, nil)
require.NoError(t, err)
result, _, _ := prober.Probe(req, wait.ForeverTestTimeout)
assert.Equal(t, test.expectLocalResult, result)
})
t.Run(desc+"-nonlocal", func(t *testing.T) {
@ -444,7 +456,9 @@ func TestHTTPProbeChecker_NonLocalRedirects(t *testing.T) {
prober := New(followNonLocalRedirects)
target, err := url.Parse(server.URL + "/redirect?loc=" + url.QueryEscape(test.redirect))
require.NoError(t, err)
result, _, _ := prober.Probe(target, nil, wait.ForeverTestTimeout)
req, err := NewProbeRequest(target, nil)
require.NoError(t, err)
result, _, _ := prober.Probe(req, wait.ForeverTestTimeout)
assert.Equal(t, test.expectNonLocalResult, result)
})
}
@ -486,7 +500,9 @@ func TestHTTPProbeChecker_HostHeaderPreservedAfterRedirect(t *testing.T) {
prober := New(followNonLocalRedirects)
target, err := url.Parse(server.URL + "/redirect")
require.NoError(t, err)
result, _, _ := prober.Probe(target, headers, wait.ForeverTestTimeout)
req, err := NewProbeRequest(target, headers)
require.NoError(t, err)
result, _, _ := prober.Probe(req, wait.ForeverTestTimeout)
assert.Equal(t, test.expectedResult, result)
})
t.Run(desc+"nonlocal", func(t *testing.T) {
@ -494,7 +510,9 @@ func TestHTTPProbeChecker_HostHeaderPreservedAfterRedirect(t *testing.T) {
prober := New(followNonLocalRedirects)
target, err := url.Parse(server.URL + "/redirect")
require.NoError(t, err)
result, _, _ := prober.Probe(target, headers, wait.ForeverTestTimeout)
req, err := NewProbeRequest(target, headers)
require.NoError(t, err)
result, _, _ := prober.Probe(req, wait.ForeverTestTimeout)
assert.Equal(t, test.expectedResult, result)
})
}
@ -527,7 +545,9 @@ func TestHTTPProbeChecker_PayloadTruncated(t *testing.T) {
prober := New(false)
target, err := url.Parse(server.URL + "/success")
require.NoError(t, err)
result, body, err := prober.Probe(target, headers, wait.ForeverTestTimeout)
req, err := NewProbeRequest(target, headers)
require.NoError(t, err)
result, body, err := prober.Probe(req, wait.ForeverTestTimeout)
assert.NoError(t, err)
assert.Equal(t, probe.Success, result)
assert.Equal(t, string(truncatedPayload), body)
@ -560,7 +580,9 @@ func TestHTTPProbeChecker_PayloadNormal(t *testing.T) {
prober := New(false)
target, err := url.Parse(server.URL + "/success")
require.NoError(t, err)
result, body, err := prober.Probe(target, headers, wait.ForeverTestTimeout)
req, err := NewProbeRequest(target, headers)
require.NoError(t, err)
result, body, err := prober.Probe(req, wait.ForeverTestTimeout)
assert.NoError(t, err)
assert.Equal(t, probe.Success, result)
assert.Equal(t, string(normalPayload), body)

119
pkg/probe/http/request.go Normal file
View File

@ -0,0 +1,119 @@
/*
Copyright 2022 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package http
import (
"fmt"
"net"
"net/http"
"net/url"
"strconv"
"strings"
v1 "k8s.io/api/core/v1"
"k8s.io/component-base/version"
"k8s.io/kubernetes/pkg/probe"
)
// NewProbeRequest returns an http.Request suitable for use as a request for a
// probe.
func NewProbeRequest(url *url.URL, headers http.Header) (*http.Request, error) {
return newProbeRequest(url, headers, "probe")
}
// NewRequestForHTTPGetAction returns an http.Request derived from httpGet.
// When httpGet.Host is empty, podIP will be used instead.
func NewRequestForHTTPGetAction(httpGet *v1.HTTPGetAction, container *v1.Container, podIP string, userAgentFragment string) (*http.Request, error) {
scheme := strings.ToLower(string(httpGet.Scheme))
if scheme == "" {
scheme = "http"
}
host := httpGet.Host
if host == "" {
host = podIP
}
port, err := probe.ResolveContainerPort(httpGet.Port, container)
if err != nil {
return nil, err
}
path := httpGet.Path
url := formatURL(scheme, host, port, path)
headers := v1HeaderToHTTPHeader(httpGet.HTTPHeaders)
return newProbeRequest(url, headers, userAgentFragment)
}
func newProbeRequest(url *url.URL, headers http.Header, userAgentFragment string) (*http.Request, error) {
req, err := http.NewRequest("GET", url.String(), nil)
if err != nil {
return nil, err
}
if headers == nil {
headers = http.Header{}
}
if _, ok := headers["User-Agent"]; !ok {
// User-Agent header was not defined, set it
headers.Set("User-Agent", userAgent(userAgentFragment))
}
if _, ok := headers["Accept"]; !ok {
// Accept header was not defined. accept all
headers.Set("Accept", "*/*")
} else if headers.Get("Accept") == "" {
// Accept header was overridden but is empty. removing
headers.Del("Accept")
}
req.Header = headers
req.Host = headers.Get("Host")
return req, nil
}
func userAgent(purpose string) string {
v := version.Get()
return fmt.Sprintf("kube-%s/%s.%s", purpose, v.Major, v.Minor)
}
// formatURL formats a URL from args. For testability.
func formatURL(scheme string, host string, port int, path string) *url.URL {
u, err := url.Parse(path)
// Something is busted with the path, but it's too late to reject it. Pass it along as is.
//
// This construction of a URL may be wrong in some cases, but it preserves
// legacy prober behavior.
if err != nil {
u = &url.URL{
Path: path,
}
}
u.Scheme = scheme
u.Host = net.JoinHostPort(host, strconv.Itoa(port))
return u
}
// v1HeaderToHTTPHeader takes a list of HTTPHeader <name, value> string pairs
// and returns a populated string->[]string http.Header map.
func v1HeaderToHTTPHeader(headerList []v1.HTTPHeader) http.Header {
headers := make(http.Header)
for _, header := range headerList {
headers[header.Name] = append(headers[header.Name], header.Value)
}
return headers
}

View File

@ -0,0 +1,40 @@
/*
Copyright 2022 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package http
import "testing"
func TestFormatURL(t *testing.T) {
testCases := []struct {
scheme string
host string
port int
path string
result string
}{
{"http", "localhost", 93, "", "http://localhost:93"},
{"https", "localhost", 93, "/path", "https://localhost:93/path"},
{"http", "localhost", 93, "?foo", "http://localhost:93?foo"},
{"https", "localhost", 93, "/path?bar", "https://localhost:93/path?bar"},
}
for _, test := range testCases {
url := formatURL(test.scheme, test.host, test.port, test.path)
if url.String() != test.result {
t.Errorf("Expected %s, got %s", test.result, url.String())
}
}
}

57
pkg/probe/util.go Normal file
View File

@ -0,0 +1,57 @@
/*
Copyright 2022 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package probe
import (
"fmt"
"strconv"
v1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/util/intstr"
)
func ResolveContainerPort(param intstr.IntOrString, container *v1.Container) (int, error) {
port := -1
var err error
switch param.Type {
case intstr.Int:
port = param.IntValue()
case intstr.String:
if port, err = findPortByName(container, param.StrVal); err != nil {
// Last ditch effort - maybe it was an int stored as string?
if port, err = strconv.Atoi(param.StrVal); err != nil {
return port, err
}
}
default:
return port, fmt.Errorf("intOrString had no kind: %+v", param)
}
if port > 0 && port < 65536 {
return port, nil
}
return port, fmt.Errorf("invalid port number: %v", port)
}
// findPortByName is a helper function to look up a port in a container by name.
func findPortByName(container *v1.Container, portName string) (int, error) {
for _, port := range container.Ports {
if port.Name == portName {
return int(port.ContainerPort), nil
}
}
return 0, fmt.Errorf("port %s not found", portName)
}

43
pkg/probe/util_test.go Normal file
View File

@ -0,0 +1,43 @@
/*
Copyright 2022 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package probe
import (
"testing"
v1 "k8s.io/api/core/v1"
)
func TestFindPortByName(t *testing.T) {
container := v1.Container{
Ports: []v1.ContainerPort{
{
Name: "foo",
ContainerPort: 8080,
},
{
Name: "bar",
ContainerPort: 9000,
},
},
}
want := 8080
got, err := findPortByName(&container, "foo")
if got != want || err != nil {
t.Errorf("Expected %v, got %v, err: %v", want, got, err)
}
}

View File

@ -27,7 +27,6 @@ import (
"k8s.io/apimachinery/pkg/labels"
"net/http"
"net/url"
"time"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
@ -43,7 +42,7 @@ type fakeHttpProber struct {
err error
}
func (f *fakeHttpProber) Probe(*url.URL, http.Header, time.Duration) (probe.Result, string, error) {
func (f *fakeHttpProber) Probe(*http.Request, time.Duration) (probe.Result, string, error) {
return f.result, f.body, f.err
}

View File

@ -18,6 +18,7 @@ package componentstatus
import (
"crypto/tls"
"fmt"
"sync"
"time"
@ -72,7 +73,11 @@ func (server *Server) DoServerCheck() (probe.Result, string, error) {
}
url := utilnet.FormatURL(scheme, server.Addr, server.Port, server.Path)
result, data, err := server.Prober.Probe(url, nil, probeTimeOut)
req, err := httpprober.NewProbeRequest(url, nil)
if err != nil {
return probe.Unknown, "", fmt.Errorf("failed to construct probe request: %w", err)
}
result, data, err := server.Prober.Probe(req, probeTimeOut)
if err != nil {
return probe.Unknown, "", err

View File

@ -44,14 +44,36 @@ var _ = SIGDescribe("Container Lifecycle Hook", func() {
preStopWaitTimeout = 30 * time.Second
)
ginkgo.Context("when create a pod with lifecycle hook", func() {
var targetIP, targetURL, targetNode string
ports := []v1.ContainerPort{
{
ContainerPort: 8080,
Protocol: v1.ProtocolTCP,
},
}
podHandleHookRequest := e2epod.NewAgnhostPod("", "pod-handle-http-request", nil, nil, ports, "netexec")
var (
targetIP, targetURL, targetNode string
httpPorts = []v1.ContainerPort{
{
ContainerPort: 8080,
Protocol: v1.ProtocolTCP,
},
}
httpsPorts = []v1.ContainerPort{
{
ContainerPort: 9090,
Protocol: v1.ProtocolTCP,
},
}
httpsArgs = []string{
"netexec",
"--http-port", "9090",
"--udp-port", "9091",
"--tls-cert-file", "/localhost.crt",
"--tls-private-key-file", "/localhost.key",
}
)
podHandleHookRequest := e2epod.NewAgnhostPodFromContainers(
"", "pod-handle-http-request", nil,
e2epod.NewAgnhostContainer("container-handle-http-request", nil, httpPorts, "netexec"),
e2epod.NewAgnhostContainer("container-handle-https-request", nil, httpsPorts, httpsArgs...),
)
ginkgo.BeforeEach(func() {
node, err := e2enode.GetRandomReadySchedulableNode(f.ClientSet)
framework.ExpectNoError(err)
@ -72,10 +94,20 @@ var _ = SIGDescribe("Container Lifecycle Hook", func() {
testPodWithHook := func(podWithHook *v1.Pod) {
ginkgo.By("create the pod with lifecycle hook")
podClient.CreateSync(podWithHook)
const (
defaultHandler = iota
httpsHandler
)
handlerContainer := defaultHandler
if podWithHook.Spec.Containers[0].Lifecycle.PostStart != nil {
ginkgo.By("check poststart hook")
if podWithHook.Spec.Containers[0].Lifecycle.PostStart.HTTPGet != nil {
if v1.URISchemeHTTPS == podWithHook.Spec.Containers[0].Lifecycle.PostStart.HTTPGet.Scheme {
handlerContainer = httpsHandler
}
}
gomega.Eventually(func() error {
return podClient.MatchContainerOutput(podHandleHookRequest.Name, podHandleHookRequest.Spec.Containers[0].Name,
return podClient.MatchContainerOutput(podHandleHookRequest.Name, podHandleHookRequest.Spec.Containers[handlerContainer].Name,
`GET /echo\?msg=poststart`)
}, postStartWaitTimeout, podCheckInterval).Should(gomega.BeNil())
}
@ -83,8 +115,13 @@ var _ = SIGDescribe("Container Lifecycle Hook", func() {
podClient.DeleteSync(podWithHook.Name, *metav1.NewDeleteOptions(15), e2epod.DefaultPodDeletionTimeout)
if podWithHook.Spec.Containers[0].Lifecycle.PreStop != nil {
ginkgo.By("check prestop hook")
if podWithHook.Spec.Containers[0].Lifecycle.PreStop.HTTPGet != nil {
if v1.URISchemeHTTPS == podWithHook.Spec.Containers[0].Lifecycle.PreStop.HTTPGet.Scheme {
handlerContainer = httpsHandler
}
}
gomega.Eventually(func() error {
return podClient.MatchContainerOutput(podHandleHookRequest.Name, podHandleHookRequest.Spec.Containers[0].Name,
return podClient.MatchContainerOutput(podHandleHookRequest.Name, podHandleHookRequest.Spec.Containers[handlerContainer].Name,
`GET /echo\?msg=prestop`)
}, preStopWaitTimeout, podCheckInterval).Should(gomega.BeNil())
}
@ -145,7 +182,26 @@ var _ = SIGDescribe("Container Lifecycle Hook", func() {
testPodWithHook(podWithHook)
})
/*
Release: v1.9
Release : v1.23
Testname: Pod Lifecycle, poststart https hook
Description: When a post-start handler is specified in the container lifecycle using a 'HttpGet' action, then the handler MUST be invoked before the container is terminated. A server pod is created that will serve https requests, create a second pod with a container lifecycle specifying a post-start that invokes the server pod to validate that the post-start is executed.
*/
ginkgo.It("should execute poststart https hook properly [MinimumKubeletVersion:1.23] [NodeConformance]", func() {
lifecycle := &v1.Lifecycle{
PostStart: &v1.LifecycleHandler{
HTTPGet: &v1.HTTPGetAction{
Scheme: v1.URISchemeHTTPS,
Path: "/echo?msg=poststart",
Host: targetIP,
Port: intstr.FromInt(9090),
},
},
}
podWithHook := getPodWithHook("pod-with-poststart-https-hook", imageutils.GetPauseImageName(), lifecycle)
testPodWithHook(podWithHook)
})
/*
Release : v1.9
Testname: Pod Lifecycle, prestop http hook
Description: When a pre-stop handler is specified in the container lifecycle using a 'HttpGet' action, then the handler MUST be invoked before the container is terminated. A server pod is created that will serve http requests, create a second pod on the same node with a container lifecycle specifying a pre-stop that invokes the server pod to validate that the pre-stop is executed.
*/
@ -166,6 +222,25 @@ var _ = SIGDescribe("Container Lifecycle Hook", func() {
e2epod.SetNodeSelection(&podWithHook.Spec, nodeSelection)
testPodWithHook(podWithHook)
})
/*
Release : v1.23
Testname: Pod Lifecycle, prestop https hook
Description: When a pre-stop handler is specified in the container lifecycle using a 'HttpGet' action, then the handler MUST be invoked before the container is terminated. A server pod is created that will serve https requests, create a second pod with a container lifecycle specifying a pre-stop that invokes the server pod to validate that the pre-stop is executed.
*/
ginkgo.It("should execute prestop https hook properly [MinimumKubeletVersion:1.23] [NodeConformance]", func() {
lifecycle := &v1.Lifecycle{
PreStop: &v1.LifecycleHandler{
HTTPGet: &v1.HTTPGetAction{
Scheme: v1.URISchemeHTTPS,
Path: "/echo?msg=prestop",
Host: targetIP,
Port: intstr.FromInt(9090),
},
},
}
podWithHook := getPodWithHook("pod-with-prestop-https-hook", imageutils.GetPauseImageName(), lifecycle)
testPodWithHook(podWithHook)
})
})
})

View File

@ -408,6 +408,23 @@ func NewAgnhostPod(ns, podName string, volumes []v1.Volume, mounts []v1.VolumeMo
return pod
}
func NewAgnhostPodFromContainers(ns, podName string, volumes []v1.Volume, containers ...v1.Container) *v1.Pod {
immediate := int64(0)
pod := &v1.Pod{
ObjectMeta: metav1.ObjectMeta{
Name: podName,
Namespace: ns,
},
Spec: v1.PodSpec{
Containers: containers[:],
Volumes: volumes,
SecurityContext: &v1.PodSecurityContext{},
TerminationGracePeriodSeconds: &immediate,
},
}
return pod
}
// NewAgnhostContainer returns the container Spec of an agnhost container.
func NewAgnhostContainer(containerName string, mounts []v1.VolumeMount, ports []v1.ContainerPort, args ...string) v1.Container {
if len(args) == 0 {