diff --git a/pkg/kubelet/kuberuntime/fake_kuberuntime_manager.go b/pkg/kubelet/kuberuntime/fake_kuberuntime_manager.go index bcdb33e4354..ca519e04935 100644 --- a/pkg/kubelet/kuberuntime/fake_kuberuntime_manager.go +++ b/pkg/kubelet/kuberuntime/fake_kuberuntime_manager.go @@ -127,7 +127,8 @@ func newFakeKubeRuntimeManager(runtimeService internalapi.RuntimeService, imageS kubeRuntimeManager.runner = lifecycle.NewHandlerRunner( &fakeHTTP{}, kubeRuntimeManager, - kubeRuntimeManager) + kubeRuntimeManager, + recorder) kubeRuntimeManager.getNodeAllocatable = func() v1.ResourceList { return v1.ResourceList{ diff --git a/pkg/kubelet/kuberuntime/kuberuntime_container_test.go b/pkg/kubelet/kuberuntime/kuberuntime_container_test.go index 2f7ce8f7d1f..5dd78a6e894 100644 --- a/pkg/kubelet/kuberuntime/kuberuntime_container_test.go +++ b/pkg/kubelet/kuberuntime/kuberuntime_container_test.go @@ -295,7 +295,8 @@ func TestLifeCycleHook(t *testing.T) { lcHanlder := lifecycle.NewHandlerRunner( fakeHTTP, fakeRunner, - fakePodStatusProvider) + fakePodStatusProvider, + nil) m.runner = lcHanlder diff --git a/pkg/kubelet/kuberuntime/kuberuntime_manager.go b/pkg/kubelet/kuberuntime/kuberuntime_manager.go index 7ead52b09ef..7e245cd00bd 100644 --- a/pkg/kubelet/kuberuntime/kuberuntime_manager.go +++ b/pkg/kubelet/kuberuntime/kuberuntime_manager.go @@ -265,7 +265,7 @@ func NewKubeGenericRuntimeManager( serializeImagePulls, imagePullQPS, imagePullBurst) - kubeRuntimeManager.runner = lifecycle.NewHandlerRunner(insecureContainerLifecycleHTTPClient, kubeRuntimeManager, kubeRuntimeManager) + kubeRuntimeManager.runner = lifecycle.NewHandlerRunner(insecureContainerLifecycleHTTPClient, kubeRuntimeManager, kubeRuntimeManager, recorder) kubeRuntimeManager.containerGC = newContainerGC(runtimeService, podStateProvider, kubeRuntimeManager) kubeRuntimeManager.podStateProvider = podStateProvider diff --git a/pkg/kubelet/lifecycle/handlers.go b/pkg/kubelet/lifecycle/handlers.go index a8a98e012a6..66bb834d3b3 100644 --- a/pkg/kubelet/lifecycle/handlers.go +++ b/pkg/kubelet/lifecycle/handlers.go @@ -31,9 +31,11 @@ import ( "k8s.io/apimachinery/pkg/types" "k8s.io/apimachinery/pkg/util/intstr" utilfeature "k8s.io/apiserver/pkg/util/feature" + "k8s.io/client-go/tools/record" "k8s.io/klog/v2" "k8s.io/kubernetes/pkg/features" kubecontainer "k8s.io/kubernetes/pkg/kubelet/container" + "k8s.io/kubernetes/pkg/kubelet/metrics" kubetypes "k8s.io/kubernetes/pkg/kubelet/types" "k8s.io/kubernetes/pkg/kubelet/util/format" httpprobe "k8s.io/kubernetes/pkg/probe/http" @@ -48,6 +50,7 @@ type handlerRunner struct { httpDoer kubetypes.HTTPDoer commandRunner kubecontainer.CommandRunner containerManager podStatusProvider + eventRecorder record.EventRecorder } type podStatusProvider interface { @@ -55,11 +58,12 @@ type podStatusProvider interface { } // NewHandlerRunner returns a configured lifecycle handler for a container. -func NewHandlerRunner(httpDoer kubetypes.HTTPDoer, commandRunner kubecontainer.CommandRunner, containerManager podStatusProvider) kubecontainer.HandlerRunner { +func NewHandlerRunner(httpDoer kubetypes.HTTPDoer, commandRunner kubecontainer.CommandRunner, containerManager podStatusProvider, eventRecorder record.EventRecorder) kubecontainer.HandlerRunner { return &handlerRunner{ httpDoer: httpDoer, commandRunner: commandRunner, containerManager: containerManager, + eventRecorder: eventRecorder, } } @@ -75,7 +79,7 @@ func (hr *handlerRunner) Run(containerID kubecontainer.ContainerID, pod *v1.Pod, } return msg, err case handler.HTTPGet != nil: - err := hr.runHTTPHandler(pod, container, handler) + err := hr.runHTTPHandler(pod, container, handler, hr.eventRecorder) var msg string if err != nil { msg = fmt.Sprintf("HTTP lifecycle hook (%s) for Container %q in Pod %q failed - error: %v", handler.HTTPGet.Path, container.Name, format.Pod(pod), err) @@ -113,7 +117,7 @@ func resolvePort(portReference intstr.IntOrString, container *v1.Container) (int return -1, fmt.Errorf("couldn't find port: %v in %v", portReference, container) } -func (hr *handlerRunner) runHTTPHandler(pod *v1.Pod, container *v1.Container, handler *v1.LifecycleHandler) error { +func (hr *handlerRunner) runHTTPHandler(pod *v1.Pod, container *v1.Container, handler *v1.LifecycleHandler, eventRecorder record.EventRecorder) error { host := handler.HTTPGet.Host podIP := host if len(host) == 0 { @@ -138,8 +142,6 @@ func (hr *handlerRunner) runHTTPHandler(pod *v1.Pod, container *v1.Container, ha discardHTTPRespBody(resp) if isHTTPResponseError(err) { - // TODO: emit an event about the fallback - // TODO: increment a metric about the fallback klog.V(1).ErrorS(err, "HTTPS request to lifecycle hook got HTTP response, retrying with HTTP.", "pod", klog.KObj(pod), "host", req.URL.Host) req := req.Clone(context.Background()) @@ -149,6 +151,11 @@ func (hr *handlerRunner) runHTTPHandler(pod *v1.Pod, container *v1.Container, ha // clear err since the fallback succeeded if httpErr == nil { + metrics.LifecycleHandlerHTTPFallbacks.Inc() + if eventRecorder != nil { + // report the fallback with an event + eventRecorder.Event(pod, v1.EventTypeWarning, "LifecycleHTTPFallback", fmt.Sprintf("request to HTTPS lifecycle hook %s got HTTP response, retry with HTTP succeeded", req.URL.Host)) + } err = nil } discardHTTPRespBody(resp) diff --git a/pkg/kubelet/lifecycle/handlers_test.go b/pkg/kubelet/lifecycle/handlers_test.go index 0dbdb24941e..5d017b89df3 100644 --- a/pkg/kubelet/lifecycle/handlers_test.go +++ b/pkg/kubelet/lifecycle/handlers_test.go @@ -32,9 +32,13 @@ import ( "k8s.io/apimachinery/pkg/types" "k8s.io/apimachinery/pkg/util/intstr" utilfeature "k8s.io/apiserver/pkg/util/feature" + "k8s.io/client-go/tools/record" featuregatetesting "k8s.io/component-base/featuregate/testing" + "k8s.io/component-base/metrics/legacyregistry" + "k8s.io/component-base/metrics/testutil" "k8s.io/kubernetes/pkg/features" kubecontainer "k8s.io/kubernetes/pkg/kubelet/container" + "k8s.io/kubernetes/pkg/kubelet/metrics" "k8s.io/kubernetes/pkg/kubelet/util/format" ) @@ -115,7 +119,7 @@ func (f podStatusProviderFunc) GetPodStatus(uid types.UID, name, namespace strin func TestRunHandlerExec(t *testing.T) { fakeCommandRunner := fakeContainerCommandRunner{} - handlerRunner := NewHandlerRunner(&fakeHTTP{}, &fakeCommandRunner, nil) + handlerRunner := NewHandlerRunner(&fakeHTTP{}, &fakeCommandRunner, nil, nil) containerID := kubecontainer.ContainerID{Type: "test", ID: "abc1234"} containerName := "containerFoo" @@ -161,7 +165,7 @@ func (f *fakeHTTP) Do(req *http.Request) (*http.Response, error) { func TestRunHandlerHttp(t *testing.T) { fakeHTTPGetter := fakeHTTP{} fakePodStatusProvider := stubPodStatusProvider("127.0.0.1") - handlerRunner := NewHandlerRunner(&fakeHTTPGetter, &fakeContainerCommandRunner{}, fakePodStatusProvider) + handlerRunner := NewHandlerRunner(&fakeHTTPGetter, &fakeContainerCommandRunner{}, fakePodStatusProvider, nil) containerID := kubecontainer.ContainerID{Type: "test", ID: "abc1234"} containerName := "containerFoo" @@ -197,7 +201,7 @@ func TestRunHandlerHttpWithHeaders(t *testing.T) { fakeHTTPDoer := fakeHTTP{} fakePodStatusProvider := stubPodStatusProvider("127.0.0.1") - handlerRunner := NewHandlerRunner(&fakeHTTPDoer, &fakeContainerCommandRunner{}, fakePodStatusProvider) + handlerRunner := NewHandlerRunner(&fakeHTTPDoer, &fakeContainerCommandRunner{}, fakePodStatusProvider, nil) containerID := kubecontainer.ContainerID{Type: "test", ID: "abc1234"} containerName := "containerFoo" @@ -237,7 +241,7 @@ func TestRunHandlerHttpWithHeaders(t *testing.T) { func TestRunHandlerHttps(t *testing.T) { fakeHTTPDoer := fakeHTTP{} fakePodStatusProvider := stubPodStatusProvider("127.0.0.1") - handlerRunner := NewHandlerRunner(&fakeHTTPDoer, &fakeContainerCommandRunner{}, fakePodStatusProvider) + handlerRunner := NewHandlerRunner(&fakeHTTPDoer, &fakeContainerCommandRunner{}, fakePodStatusProvider, nil) containerID := kubecontainer.ContainerID{Type: "test", ID: "abc1234"} containerName := "containerFoo" @@ -345,7 +349,7 @@ func TestRunHandlerHTTPPort(t *testing.T) { t.Run(tt.Name, func(t *testing.T) { defer featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.ConsistentHTTPGetHandlers, tt.FeatureGateEnabled)() fakeHTTPDoer := fakeHTTP{} - handlerRunner := NewHandlerRunner(&fakeHTTPDoer, &fakeContainerCommandRunner{}, fakePodStatusProvider) + handlerRunner := NewHandlerRunner(&fakeHTTPDoer, &fakeContainerCommandRunner{}, fakePodStatusProvider, nil) container.Lifecycle.PostStart.HTTPGet.Port = tt.Port pod.Spec.Containers = []v1.Container{container} @@ -621,7 +625,7 @@ func TestRunHTTPHandler(t *testing.T) { verify := func(t *testing.T, expectedHeader http.Header, expectedURL string) { fakeHTTPDoer := fakeHTTP{} - handlerRunner := NewHandlerRunner(&fakeHTTPDoer, &fakeContainerCommandRunner{}, fakePodStatusProvider) + handlerRunner := NewHandlerRunner(&fakeHTTPDoer, &fakeContainerCommandRunner{}, fakePodStatusProvider, nil) _, err := handlerRunner.Run(containerID, &pod, &container, container.Lifecycle.PostStart) if err != nil { @@ -650,7 +654,7 @@ func TestRunHTTPHandler(t *testing.T) { } func TestRunHandlerNil(t *testing.T) { - handlerRunner := NewHandlerRunner(&fakeHTTP{}, &fakeContainerCommandRunner{}, nil) + handlerRunner := NewHandlerRunner(&fakeHTTP{}, &fakeContainerCommandRunner{}, nil, nil) containerID := kubecontainer.ContainerID{Type: "test", ID: "abc1234"} podName := "podFoo" podNamespace := "nsFoo" @@ -675,7 +679,7 @@ func TestRunHandlerNil(t *testing.T) { func TestRunHandlerExecFailure(t *testing.T) { expectedErr := fmt.Errorf("invalid command") fakeCommandRunner := fakeContainerCommandRunner{Err: expectedErr, Msg: expectedErr.Error()} - handlerRunner := NewHandlerRunner(&fakeHTTP{}, &fakeCommandRunner, nil) + handlerRunner := NewHandlerRunner(&fakeHTTP{}, &fakeCommandRunner, nil, nil) containerID := kubecontainer.ContainerID{Type: "test", ID: "abc1234"} containerName := "containerFoo" @@ -715,7 +719,7 @@ func TestRunHandlerHttpFailure(t *testing.T) { fakePodStatusProvider := stubPodStatusProvider("127.0.0.1") - handlerRunner := NewHandlerRunner(&fakeHTTPGetter, &fakeContainerCommandRunner{}, fakePodStatusProvider) + handlerRunner := NewHandlerRunner(&fakeHTTPGetter, &fakeContainerCommandRunner{}, fakePodStatusProvider, nil) containerName := "containerFoo" containerID := kubecontainer.ContainerID{Type: "test", ID: "abc1234"} @@ -749,6 +753,14 @@ func TestRunHandlerHttpFailure(t *testing.T) { } func TestRunHandlerHttpsFailureFallback(t *testing.T) { + defer featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.ConsistentHTTPGetHandlers, true)() + + // Since prometheus' gatherer is global, other tests may have updated metrics already, so + // we need to reset them prior running this test. + // This also implies that we can't run this test in parallel with other tests. + metrics.Register() + legacyregistry.Reset() + var actualHeaders http.Header srv := httptest.NewServer(http.HandlerFunc(func(_ http.ResponseWriter, r *http.Request) { actualHeaders = r.Header.Clone() @@ -759,9 +771,11 @@ func TestRunHandlerHttpsFailureFallback(t *testing.T) { t.Fatal(err) } + recorder := &record.FakeRecorder{Events: make(chan string, 10)} + fakePodStatusProvider := stubPodStatusProvider("127.0.0.1") - handlerRunner := NewHandlerRunner(srv.Client(), &fakeContainerCommandRunner{}, fakePodStatusProvider).(*handlerRunner) + handlerRunner := NewHandlerRunner(srv.Client(), &fakeContainerCommandRunner{}, fakePodStatusProvider, recorder).(*handlerRunner) containerName := "containerFoo" containerID := kubecontainer.ContainerID{Type: "test", ID: "abc1234"} @@ -789,7 +803,6 @@ func TestRunHandlerHttpsFailureFallback(t *testing.T) { pod.ObjectMeta.Name = "podFoo" pod.ObjectMeta.Namespace = "nsFoo" pod.Spec.Containers = []v1.Container{container} - defer featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.ConsistentHTTPGetHandlers, true)() msg, err := handlerRunner.Run(containerID, &pod, &container, container.Lifecycle.PostStart) if err != nil { @@ -801,6 +814,25 @@ func TestRunHandlerHttpsFailureFallback(t *testing.T) { if actualHeaders.Get("Authorization") != "" { t.Error("unexpected Authorization header") } + + expectedMetrics := ` +# HELP kubelet_lifecycle_handler_http_fallbacks_total [ALPHA] The number of times lifecycle handlers successfully fell back to http from https. +# TYPE kubelet_lifecycle_handler_http_fallbacks_total counter +kubelet_lifecycle_handler_http_fallbacks_total 1 +` + + if err := testutil.GatherAndCompare(legacyregistry.DefaultGatherer, strings.NewReader(expectedMetrics), "kubelet_lifecycle_handler_http_fallbacks_total"); err != nil { + t.Fatal(err) + } + + select { + case event := <-recorder.Events: + if !strings.Contains(event, "LifecycleHTTPFallback") { + t.Fatalf("expected LifecycleHTTPFallback event, got %q", event) + } + default: + t.Fatal("no event recorded") + } } func TestIsHTTPResponseError(t *testing.T) { diff --git a/pkg/kubelet/metrics/metrics.go b/pkg/kubelet/metrics/metrics.go index 99217eb8103..00fe1ec5572 100644 --- a/pkg/kubelet/metrics/metrics.go +++ b/pkg/kubelet/metrics/metrics.go @@ -497,6 +497,15 @@ var ( StabilityLevel: metrics.ALPHA, }, ) + + LifecycleHandlerHTTPFallbacks = metrics.NewCounter( + &metrics.CounterOpts{ + Subsystem: KubeletSubsystem, + Name: "lifecycle_handler_http_fallbacks_total", + Help: "The number of times lifecycle handlers successfully fell back to http from https.", + StabilityLevel: metrics.ALPHA, + }, + ) ) var registerMetrics sync.Once @@ -558,6 +567,9 @@ func Register(collectors ...metrics.StableCollector) { legacyregistry.MustRegister(GracefulShutdownEndTime) } + if utilfeature.DefaultFeatureGate.Enabled(features.ConsistentHTTPGetHandlers) { + legacyregistry.MustRegister(LifecycleHandlerHTTPFallbacks) + } }) }