Refactor Kubelet Server to take kubeConfiguration instead of multiple fields

This commit is contained in:
Sri Saran Balaji Vellore Rajakumar 2021-01-27 07:34:38 -08:00
parent 51cdf4e97b
commit af05a7eca3
5 changed files with 50 additions and 49 deletions

View File

@ -1187,9 +1187,7 @@ func startKubelet(k kubelet.Bootstrap, podCfg *config.PodConfig, kubeCfg *kubele
// start the kubelet server // start the kubelet server
if enableServer { if enableServer {
go k.ListenAndServe(net.ParseIP(kubeCfg.Address), uint(kubeCfg.Port), kubeDeps.TLSOptions, kubeDeps.Auth, go k.ListenAndServe(kubeCfg, kubeDeps.TLSOptions, kubeDeps.Auth, enableCAdvisorJSONEndpoints)
enableCAdvisorJSONEndpoints, kubeCfg.EnableDebuggingHandlers, kubeCfg.EnableContentionProfiling,
kubeCfg.EnableSystemLogHandler, kubeCfg.EnableProfilingHandler, kubeCfg.EnableDebugFlagsHandler)
} }
if kubeCfg.ReadOnlyPort > 0 { if kubeCfg.ReadOnlyPort > 0 {
go k.ListenAndServeReadOnly(net.ParseIP(kubeCfg.Address), uint(kubeCfg.ReadOnlyPort), enableCAdvisorJSONEndpoints) go k.ListenAndServeReadOnly(net.ParseIP(kubeCfg.Address), uint(kubeCfg.ReadOnlyPort), enableCAdvisorJSONEndpoints)

View File

@ -195,9 +195,8 @@ type Bootstrap interface {
GetConfiguration() kubeletconfiginternal.KubeletConfiguration GetConfiguration() kubeletconfiginternal.KubeletConfiguration
BirthCry() BirthCry()
StartGarbageCollection() StartGarbageCollection()
ListenAndServe(address net.IP, port uint, tlsOptions *server.TLSOptions, auth server.AuthInterface, ListenAndServe(kubeCfg *kubeletconfiginternal.KubeletConfiguration, tlsOptions *server.TLSOptions, auth server.AuthInterface,
enableCAdvisorJSONEndpoints, enableDebuggingHandlers, enableContentionProfiling, enableSystemLogHandler, enableCAdvisorJSONEndpoints bool)
enableProfilingHandler, enableDebugFlagsHandler bool)
ListenAndServeReadOnly(address net.IP, port uint, enableCAdvisorJSONEndpoints bool) ListenAndServeReadOnly(address net.IP, port uint, enableCAdvisorJSONEndpoints bool)
ListenAndServePodResources() ListenAndServePodResources()
Run(<-chan kubetypes.PodUpdate) Run(<-chan kubetypes.PodUpdate)
@ -2227,12 +2226,9 @@ func (kl *Kubelet) ResyncInterval() time.Duration {
} }
// ListenAndServe runs the kubelet HTTP server. // ListenAndServe runs the kubelet HTTP server.
func (kl *Kubelet) ListenAndServe(address net.IP, port uint, tlsOptions *server.TLSOptions, auth server.AuthInterface, func (kl *Kubelet) ListenAndServe(kubeCfg *kubeletconfiginternal.KubeletConfiguration, tlsOptions *server.TLSOptions,
enableCAdvisorJSONEndpoints, enableDebuggingHandlers, enableContentionProfiling, enableSystemLogHandler, auth server.AuthInterface, enableCAdvisorJSONEndpoints bool) {
enableProfilingHandler, enableDebugFlagsHandler bool) { server.ListenAndServeKubeletServer(kl, kl.resourceAnalyzer, kubeCfg, tlsOptions, auth, enableCAdvisorJSONEndpoints)
server.ListenAndServeKubeletServer(kl, kl.resourceAnalyzer, address, port, tlsOptions, auth,
enableCAdvisorJSONEndpoints, enableDebuggingHandlers, enableContentionProfiling, enableSystemLogHandler,
enableProfilingHandler, enableDebugFlagsHandler)
} }
// ListenAndServeReadOnly runs the kubelet HTTP server in read-only mode. // ListenAndServeReadOnly runs the kubelet HTTP server in read-only mode.

View File

@ -19,6 +19,7 @@ go_library(
"//pkg/apis/core:go_default_library", "//pkg/apis/core:go_default_library",
"//pkg/apis/core/v1/validation:go_default_library", "//pkg/apis/core/v1/validation:go_default_library",
"//pkg/features:go_default_library", "//pkg/features:go_default_library",
"//pkg/kubelet/apis/config:go_default_library",
"//pkg/kubelet/apis/podresources:go_default_library", "//pkg/kubelet/apis/podresources:go_default_library",
"//pkg/kubelet/container:go_default_library", "//pkg/kubelet/container:go_default_library",
"//pkg/kubelet/cri/streaming:go_default_library", "//pkg/kubelet/cri/streaming:go_default_library",
@ -74,6 +75,7 @@ go_test(
deps = [ deps = [
"//pkg/apis/core:go_default_library", "//pkg/apis/core:go_default_library",
"//pkg/apis/core/install:go_default_library", "//pkg/apis/core/install:go_default_library",
"//pkg/kubelet/apis/config:go_default_library",
"//pkg/kubelet/cm:go_default_library", "//pkg/kubelet/cm:go_default_library",
"//pkg/kubelet/container:go_default_library", "//pkg/kubelet/container:go_default_library",
"//pkg/kubelet/cri/streaming:go_default_library", "//pkg/kubelet/cri/streaming:go_default_library",

View File

@ -66,6 +66,7 @@ import (
api "k8s.io/kubernetes/pkg/apis/core" api "k8s.io/kubernetes/pkg/apis/core"
"k8s.io/kubernetes/pkg/apis/core/v1/validation" "k8s.io/kubernetes/pkg/apis/core/v1/validation"
"k8s.io/kubernetes/pkg/features" "k8s.io/kubernetes/pkg/features"
kubeletconfiginternal "k8s.io/kubernetes/pkg/kubelet/apis/config"
"k8s.io/kubernetes/pkg/kubelet/apis/podresources" "k8s.io/kubernetes/pkg/kubelet/apis/podresources"
kubecontainer "k8s.io/kubernetes/pkg/kubelet/container" kubecontainer "k8s.io/kubernetes/pkg/kubelet/container"
"k8s.io/kubernetes/pkg/kubelet/cri/streaming" "k8s.io/kubernetes/pkg/kubelet/cri/streaming"
@ -139,19 +140,15 @@ func (a *filteringContainer) RegisteredHandlePaths() []string {
func ListenAndServeKubeletServer( func ListenAndServeKubeletServer(
host HostInterface, host HostInterface,
resourceAnalyzer stats.ResourceAnalyzer, resourceAnalyzer stats.ResourceAnalyzer,
address net.IP, kubeCfg *kubeletconfiginternal.KubeletConfiguration,
port uint,
tlsOptions *TLSOptions, tlsOptions *TLSOptions,
auth AuthInterface, auth AuthInterface,
enableCAdvisorJSONEndpoints, enableCAdvisorJSONEndpoints bool) {
enableDebuggingHandlers,
enableContentionProfiling, address := net.ParseIP(kubeCfg.Address)
enableSystemLogHandler, port := uint(kubeCfg.Port)
enableProfilingHandler, klog.InfoS("Starting to listen", "address", address, "port", port)
enableDebugFlagsHandler bool) { handler := NewServer(host, resourceAnalyzer, auth, enableCAdvisorJSONEndpoints, kubeCfg)
klog.Infof("Starting to listen on %s:%d", address, port)
handler := NewServer(host, resourceAnalyzer, auth, enableCAdvisorJSONEndpoints, enableDebuggingHandlers,
enableContentionProfiling, enableSystemLogHandler, enableProfilingHandler, enableDebugFlagsHandler)
s := &http.Server{ s := &http.Server{
Addr: net.JoinHostPort(address.String(), strconv.FormatUint(uint64(port), 10)), Addr: net.JoinHostPort(address.String(), strconv.FormatUint(uint64(port), 10)),
Handler: &handler, Handler: &handler,
@ -172,8 +169,8 @@ func ListenAndServeKubeletServer(
// ListenAndServeKubeletReadOnlyServer initializes a server to respond to HTTP network requests on the Kubelet. // ListenAndServeKubeletReadOnlyServer initializes a server to respond to HTTP network requests on the Kubelet.
func ListenAndServeKubeletReadOnlyServer(host HostInterface, resourceAnalyzer stats.ResourceAnalyzer, address net.IP, port uint, enableCAdvisorJSONEndpoints bool) { func ListenAndServeKubeletReadOnlyServer(host HostInterface, resourceAnalyzer stats.ResourceAnalyzer, address net.IP, port uint, enableCAdvisorJSONEndpoints bool) {
klog.V(1).Infof("Starting to listen read-only on %s:%d", address, port) klog.InfoS("Starting to listen read-only", "address", address, "port", port)
s := NewServer(host, resourceAnalyzer, nil, enableCAdvisorJSONEndpoints, false, false, false, false, false) s := NewServer(host, resourceAnalyzer, nil, enableCAdvisorJSONEndpoints, nil)
server := &http.Server{ server := &http.Server{
Addr: net.JoinHostPort(address.String(), strconv.FormatUint(uint64(port), 10)), Addr: net.JoinHostPort(address.String(), strconv.FormatUint(uint64(port), 10)),
@ -225,12 +222,8 @@ func NewServer(
host HostInterface, host HostInterface,
resourceAnalyzer stats.ResourceAnalyzer, resourceAnalyzer stats.ResourceAnalyzer,
auth AuthInterface, auth AuthInterface,
enableCAdvisorJSONEndpoints, enableCAdvisorJSONEndpoints bool,
enableDebuggingHandlers, kubeCfg *kubeletconfiginternal.KubeletConfiguration) Server {
enableContentionProfiling,
enableSystemLogHandler,
enableProfilingHandler,
enableDebugFlagsHandler bool) Server {
server := Server{ server := Server{
host: host, host: host,
resourceAnalyzer: resourceAnalyzer, resourceAnalyzer: resourceAnalyzer,
@ -243,13 +236,13 @@ func NewServer(
server.InstallAuthFilter() server.InstallAuthFilter()
} }
server.InstallDefaultHandlers(enableCAdvisorJSONEndpoints) server.InstallDefaultHandlers(enableCAdvisorJSONEndpoints)
if enableDebuggingHandlers { if kubeCfg != nil && kubeCfg.EnableDebuggingHandlers {
server.InstallDebuggingHandlers() server.InstallDebuggingHandlers()
// To maintain backward compatibility serve logs and pprof only when enableDebuggingHandlers is also enabled // To maintain backward compatibility serve logs and pprof only when enableDebuggingHandlers is also enabled
// see https://github.com/kubernetes/kubernetes/pull/87273 // see https://github.com/kubernetes/kubernetes/pull/87273
server.InstallSystemLogHandler(enableSystemLogHandler) server.InstallSystemLogHandler(kubeCfg.EnableSystemLogHandler)
server.InstallProfilingHandler(enableProfilingHandler, enableContentionProfiling) server.InstallProfilingHandler(kubeCfg.EnableProfilingHandler, kubeCfg.EnableContentionProfiling)
server.InstallDebugFlagsHandler(enableDebugFlagsHandler) server.InstallDebugFlagsHandler(kubeCfg.EnableDebugFlagsHandler)
} else { } else {
server.InstallDebuggingDisabledHandlers() server.InstallDebuggingDisabledHandlers()
} }
@ -563,9 +556,9 @@ func (s *Server) InstallDebugFlagsHandler(enableDebugFlagsHandler bool) {
} }
// InstallProfilingHandler registers the HTTP request patterns for /debug/pprof endpoint. // InstallProfilingHandler registers the HTTP request patterns for /debug/pprof endpoint.
func (s *Server) InstallProfilingHandler(enableSystemLogHandler bool, enableContentionProfiling bool) { func (s *Server) InstallProfilingHandler(enableProfilingLogHandler bool, enableContentionProfiling bool) {
s.addMetricsBucketMatcher("debug") s.addMetricsBucketMatcher("debug")
if !enableSystemLogHandler { if !enableProfilingLogHandler {
s.restfulCont.Handle(pprofBasePath, getHandlerForDisabledEndpoint("profiling endpoint is disabled.")) s.restfulCont.Handle(pprofBasePath, getHandlerForDisabledEndpoint("profiling endpoint is disabled."))
return return
} }

View File

@ -55,6 +55,7 @@ import (
// Do some initialization to decode the query parameters correctly. // Do some initialization to decode the query parameters correctly.
_ "k8s.io/kubernetes/pkg/apis/core/install" _ "k8s.io/kubernetes/pkg/apis/core/install"
kubeletconfiginternal "k8s.io/kubernetes/pkg/kubelet/apis/config"
"k8s.io/kubernetes/pkg/kubelet/cm" "k8s.io/kubernetes/pkg/kubelet/cm"
kubecontainer "k8s.io/kubernetes/pkg/kubelet/container" kubecontainer "k8s.io/kubernetes/pkg/kubelet/container"
"k8s.io/kubernetes/pkg/kubelet/cri/streaming" "k8s.io/kubernetes/pkg/kubelet/cri/streaming"
@ -304,10 +305,17 @@ func newServerTest() *serverTestFramework {
} }
func newServerTestWithDebug(enableDebugging bool, streamingServer streaming.Server) *serverTestFramework { func newServerTestWithDebug(enableDebugging bool, streamingServer streaming.Server) *serverTestFramework {
return newServerTestWithDebuggingHandlers(enableDebugging, enableDebugging, streamingServer) kubeCfg := &kubeletconfiginternal.KubeletConfiguration{
EnableDebuggingHandlers: enableDebugging,
EnableSystemLogHandler: enableDebugging,
EnableProfilingHandler: enableDebugging,
EnableDebugFlagsHandler: enableDebugging,
}
return newServerTestWithDebuggingHandlers(kubeCfg, streamingServer)
} }
func newServerTestWithDebuggingHandlers(enableDebugging, enableLogAndProfilingHandler bool, streamingServer streaming.Server) *serverTestFramework { func newServerTestWithDebuggingHandlers(kubeCfg *kubeletconfiginternal.KubeletConfiguration, streamingServer streaming.Server) *serverTestFramework {
fw := &serverTestFramework{} fw := &serverTestFramework{}
fw.fakeKubelet = &fakeKubelet{ fw.fakeKubelet = &fakeKubelet{
hostnameFunc: func() string { hostnameFunc: func() string {
@ -341,11 +349,7 @@ func newServerTestWithDebuggingHandlers(enableDebugging, enableLogAndProfilingHa
stats.NewResourceAnalyzer(fw.fakeKubelet, time.Minute), stats.NewResourceAnalyzer(fw.fakeKubelet, time.Minute),
fw.fakeAuth, fw.fakeAuth,
true, true,
enableDebugging, kubeCfg)
false,
enableLogAndProfilingHandler,
enableLogAndProfilingHandler,
enableLogAndProfilingHandler)
fw.serverUnderTest = &server fw.serverUnderTest = &server
fw.testHTTPServer = httptest.NewServer(fw.serverUnderTest) fw.testHTTPServer = httptest.NewServer(fw.serverUnderTest)
return fw return fw
@ -1506,9 +1510,15 @@ func TestMetricMethodBuckets(t *testing.T) {
} }
func TestDebuggingDisabledHandlers(t *testing.T) { func TestDebuggingDisabledHandlers(t *testing.T) {
// for backward compatibility even if enablesystemLogHandler is set but not enableDebuggingHandler then /logs // for backward compatibility even if enablesystemLogHandler or enableProfilingHandler is set but not
//shouldn't be served. // enableDebuggingHandler then /logs, /pprof and /flags shouldn't be served.
fw := newServerTestWithDebuggingHandlers(false, true, nil) kubeCfg := &kubeletconfiginternal.KubeletConfiguration{
EnableDebuggingHandlers: false,
EnableSystemLogHandler: true,
EnableDebugFlagsHandler: true,
EnableProfilingHandler: true,
}
fw := newServerTestWithDebuggingHandlers(kubeCfg, nil)
defer fw.testHTTPServer.Close() defer fw.testHTTPServer.Close()
paths := []string{ paths := []string{
@ -1548,11 +1558,13 @@ func TestDebuggingDisabledHandlers(t *testing.T) {
resp, err = http.Get(fw.testHTTPServer.URL + "/spec") resp, err = http.Get(fw.testHTTPServer.URL + "/spec")
require.NoError(t, err) require.NoError(t, err)
assert.Equal(t, http.StatusOK, resp.StatusCode) assert.Equal(t, http.StatusOK, resp.StatusCode)
} }
func TestDisablingLogAndProfilingHandler(t *testing.T) { func TestDisablingLogAndProfilingHandler(t *testing.T) {
fw := newServerTestWithDebuggingHandlers(true, false, nil) kubeCfg := &kubeletconfiginternal.KubeletConfiguration{
EnableDebuggingHandlers: true,
}
fw := newServerTestWithDebuggingHandlers(kubeCfg, nil)
defer fw.testHTTPServer.Close() defer fw.testHTTPServer.Close()
// verify debug endpoints are disabled // verify debug endpoints are disabled