mirror of
https://github.com/k3s-io/kubernetes.git
synced 2025-07-22 11:21:47 +00:00
commit
9f7aaf4174
@ -576,6 +576,57 @@ func createClients(config componentbaseconfig.ClientConnectionConfiguration, mas
|
|||||||
return client, eventClient.CoreV1(), nil
|
return client, eventClient.CoreV1(), nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func serveHealthz(hz healthcheck.ProxierHealthUpdater) {
|
||||||
|
if hz == nil {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
fn := func() {
|
||||||
|
err := hz.Run()
|
||||||
|
if err != nil {
|
||||||
|
// For historical reasons we do not abort on errors here. We may
|
||||||
|
// change that in the future.
|
||||||
|
klog.Errorf("healthz server failed: %v", err)
|
||||||
|
} else {
|
||||||
|
klog.Errorf("healthz server returned without error")
|
||||||
|
}
|
||||||
|
}
|
||||||
|
go wait.Until(fn, 5*time.Second, wait.NeverStop)
|
||||||
|
}
|
||||||
|
|
||||||
|
func serveMetrics(bindAddress string, proxyMode string, enableProfiling bool) {
|
||||||
|
if len(bindAddress) == 0 {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
proxyMux := mux.NewPathRecorderMux("kube-proxy")
|
||||||
|
healthz.InstallHandler(proxyMux)
|
||||||
|
proxyMux.HandleFunc("/proxyMode", func(w http.ResponseWriter, r *http.Request) {
|
||||||
|
w.Header().Set("Content-Type", "text/plain; charset=utf-8")
|
||||||
|
w.Header().Set("X-Content-Type-Options", "nosniff")
|
||||||
|
fmt.Fprintf(w, "%s", proxyMode)
|
||||||
|
})
|
||||||
|
|
||||||
|
//lint:ignore SA1019 See the Metrics Stability Migration KEP
|
||||||
|
proxyMux.Handle("/metrics", legacyregistry.Handler())
|
||||||
|
|
||||||
|
if enableProfiling {
|
||||||
|
routes.Profiling{}.Install(proxyMux)
|
||||||
|
}
|
||||||
|
|
||||||
|
configz.InstallHandler(proxyMux)
|
||||||
|
|
||||||
|
fn := func() {
|
||||||
|
err := http.ListenAndServe(bindAddress, proxyMux)
|
||||||
|
if err != nil {
|
||||||
|
// For historical reasons we do not abort on errors here. We may
|
||||||
|
// change that in the future.
|
||||||
|
utilruntime.HandleError(fmt.Errorf("starting metrics server failed: %v", err))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
go wait.Until(fn, 5*time.Second, wait.NeverStop)
|
||||||
|
}
|
||||||
|
|
||||||
// Run runs the specified ProxyServer. This should never exit (unless CleanupAndExit is set).
|
// Run runs the specified ProxyServer. This should never exit (unless CleanupAndExit is set).
|
||||||
// TODO: At the moment, Run() cannot return a nil error, otherwise it's caller will never exit. Update callers of Run to handle nil errors.
|
// TODO: At the moment, Run() cannot return a nil error, otherwise it's caller will never exit. Update callers of Run to handle nil errors.
|
||||||
func (s *ProxyServer) Run() error {
|
func (s *ProxyServer) Run() error {
|
||||||
@ -595,33 +646,13 @@ func (s *ProxyServer) Run() error {
|
|||||||
s.Broadcaster.StartRecordingToSink(&v1core.EventSinkImpl{Interface: s.EventClient.Events("")})
|
s.Broadcaster.StartRecordingToSink(&v1core.EventSinkImpl{Interface: s.EventClient.Events("")})
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// TODO(thockin): make it possible for healthz and metrics to be on the same port.
|
||||||
|
|
||||||
// Start up a healthz server if requested
|
// Start up a healthz server if requested
|
||||||
if s.HealthzServer != nil {
|
serveHealthz(s.HealthzServer)
|
||||||
s.HealthzServer.Run()
|
|
||||||
}
|
|
||||||
|
|
||||||
// Start up a metrics server if requested
|
// Start up a metrics server if requested
|
||||||
if len(s.MetricsBindAddress) > 0 {
|
serveMetrics(s.MetricsBindAddress, s.ProxyMode, s.EnableProfiling)
|
||||||
proxyMux := mux.NewPathRecorderMux("kube-proxy")
|
|
||||||
healthz.InstallHandler(proxyMux)
|
|
||||||
proxyMux.HandleFunc("/proxyMode", func(w http.ResponseWriter, r *http.Request) {
|
|
||||||
w.Header().Set("Content-Type", "text/plain; charset=utf-8")
|
|
||||||
w.Header().Set("X-Content-Type-Options", "nosniff")
|
|
||||||
fmt.Fprintf(w, "%s", s.ProxyMode)
|
|
||||||
})
|
|
||||||
//lint:ignore SA1019 See the Metrics Stability Migration KEP
|
|
||||||
proxyMux.Handle("/metrics", legacyregistry.Handler())
|
|
||||||
if s.EnableProfiling {
|
|
||||||
routes.Profiling{}.Install(proxyMux)
|
|
||||||
}
|
|
||||||
configz.InstallHandler(proxyMux)
|
|
||||||
go wait.Until(func() {
|
|
||||||
err := http.ListenAndServe(s.MetricsBindAddress, proxyMux)
|
|
||||||
if err != nil {
|
|
||||||
utilruntime.HandleError(fmt.Errorf("starting metrics server failed: %v", err))
|
|
||||||
}
|
|
||||||
}, 5*time.Second, wait.NeverStop)
|
|
||||||
}
|
|
||||||
|
|
||||||
// Tune conntrack, if requested
|
// Tune conntrack, if requested
|
||||||
// Conntracker is always nil for windows
|
// Conntracker is always nil for windows
|
||||||
|
@ -20,7 +20,6 @@ go_library(
|
|||||||
"//staging/src/k8s.io/api/core/v1:go_default_library",
|
"//staging/src/k8s.io/api/core/v1:go_default_library",
|
||||||
"//staging/src/k8s.io/apimachinery/pkg/types:go_default_library",
|
"//staging/src/k8s.io/apimachinery/pkg/types:go_default_library",
|
||||||
"//staging/src/k8s.io/apimachinery/pkg/util/clock:go_default_library",
|
"//staging/src/k8s.io/apimachinery/pkg/util/clock:go_default_library",
|
||||||
"//staging/src/k8s.io/apimachinery/pkg/util/wait:go_default_library",
|
|
||||||
"//staging/src/k8s.io/client-go/tools/record:go_default_library",
|
"//staging/src/k8s.io/client-go/tools/record:go_default_library",
|
||||||
"//vendor/github.com/lithammer/dedent:go_default_library",
|
"//vendor/github.com/lithammer/dedent:go_default_library",
|
||||||
"//vendor/k8s.io/klog:go_default_library",
|
"//vendor/k8s.io/klog:go_default_library",
|
||||||
|
@ -22,17 +22,13 @@ import (
|
|||||||
"sync/atomic"
|
"sync/atomic"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
"k8s.io/klog"
|
|
||||||
|
|
||||||
"k8s.io/api/core/v1"
|
"k8s.io/api/core/v1"
|
||||||
"k8s.io/apimachinery/pkg/util/clock"
|
"k8s.io/apimachinery/pkg/util/clock"
|
||||||
"k8s.io/apimachinery/pkg/util/wait"
|
|
||||||
"k8s.io/client-go/tools/record"
|
"k8s.io/client-go/tools/record"
|
||||||
|
"k8s.io/klog"
|
||||||
api "k8s.io/kubernetes/pkg/apis/core"
|
api "k8s.io/kubernetes/pkg/apis/core"
|
||||||
)
|
)
|
||||||
|
|
||||||
var proxierHealthzRetryInterval = 60 * time.Second
|
|
||||||
|
|
||||||
// ProxierHealthUpdater allows callers to update healthz timestamp only.
|
// ProxierHealthUpdater allows callers to update healthz timestamp only.
|
||||||
type ProxierHealthUpdater interface {
|
type ProxierHealthUpdater interface {
|
||||||
// QueuedUpdate should be called when the proxier receives a Service or Endpoints
|
// QueuedUpdate should be called when the proxier receives a Service or Endpoints
|
||||||
@ -43,8 +39,8 @@ type ProxierHealthUpdater interface {
|
|||||||
// rules to reflect the current state.
|
// rules to reflect the current state.
|
||||||
Updated()
|
Updated()
|
||||||
|
|
||||||
// Run starts the healthz http server and returns.
|
// Run starts the healthz HTTP server and blocks until it exits.
|
||||||
Run()
|
Run() error
|
||||||
}
|
}
|
||||||
|
|
||||||
var _ ProxierHealthUpdater = &proxierHealthServer{}
|
var _ ProxierHealthUpdater = &proxierHealthServer{}
|
||||||
@ -92,31 +88,28 @@ func (hs *proxierHealthServer) QueuedUpdate() {
|
|||||||
hs.lastQueued.Store(hs.clock.Now())
|
hs.lastQueued.Store(hs.clock.Now())
|
||||||
}
|
}
|
||||||
|
|
||||||
// Run starts the healthz http server and returns.
|
// Run starts the healthz HTTP server and blocks until it exits.
|
||||||
func (hs *proxierHealthServer) Run() {
|
func (hs *proxierHealthServer) Run() error {
|
||||||
serveMux := http.NewServeMux()
|
serveMux := http.NewServeMux()
|
||||||
serveMux.Handle("/healthz", healthzHandler{hs: hs})
|
serveMux.Handle("/healthz", healthzHandler{hs: hs})
|
||||||
server := hs.httpFactory.New(hs.addr, serveMux)
|
server := hs.httpFactory.New(hs.addr, serveMux)
|
||||||
|
|
||||||
go wait.Until(func() {
|
listener, err := hs.listener.Listen(hs.addr)
|
||||||
klog.V(3).Infof("Starting goroutine for proxier healthz on %s", hs.addr)
|
if err != nil {
|
||||||
|
msg := fmt.Sprintf("failed to start proxier healthz on %s: %v", hs.addr, err)
|
||||||
listener, err := hs.listener.Listen(hs.addr)
|
// TODO(thockin): move eventing back to caller
|
||||||
if err != nil {
|
if hs.recorder != nil {
|
||||||
msg := fmt.Sprintf("Failed to start proxier healthz on %s: %v", hs.addr, err)
|
hs.recorder.Eventf(hs.nodeRef, api.EventTypeWarning, "FailedToStartProxierHealthcheck", msg)
|
||||||
if hs.recorder != nil {
|
|
||||||
hs.recorder.Eventf(hs.nodeRef, api.EventTypeWarning, "FailedToStartProxierHealthcheck", msg)
|
|
||||||
}
|
|
||||||
klog.Error(msg)
|
|
||||||
return
|
|
||||||
}
|
}
|
||||||
|
return fmt.Errorf("%v", msg)
|
||||||
|
}
|
||||||
|
|
||||||
if err := server.Serve(listener); err != nil {
|
klog.V(3).Infof("starting healthz on %s", hs.addr)
|
||||||
klog.Errorf("Proxier healthz closed with error: %v", err)
|
|
||||||
return
|
if err := server.Serve(listener); err != nil {
|
||||||
}
|
return fmt.Errorf("proxier healthz closed with error: %v", err)
|
||||||
klog.Error("Unexpected proxier healthz closed.")
|
}
|
||||||
}, proxierHealthzRetryInterval, wait.NeverStop)
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
type healthzHandler struct {
|
type healthzHandler struct {
|
||||||
|
Loading…
Reference in New Issue
Block a user