Clean up kube-proxy healthz startup

Make the healthz package simpler, move retries back to caller.
This commit is contained in:
Tim Hockin 2020-03-26 23:11:10 -07:00
parent fce286e227
commit 8747ba9370
2 changed files with 39 additions and 29 deletions

View File

@ -576,6 +576,23 @@ func createClients(config componentbaseconfig.ClientConnectionConfiguration, mas
return client, eventClient.CoreV1(), nil
}
func serveHealthz(hz healthcheck.ProxierHealthUpdater) {
if hz == nil {
return
}
fn := func() {
err := hz.Run()
if err != nil {
// For historical reasons we do not abort on errors here. We may
// change that in the future.
klog.Errorf("healthz server failed: %v", err)
} else {
klog.Errorf("healthz server returned without error")
}
}
go wait.Until(fn, 5*time.Second, wait.NeverStop)
}
// Run runs the specified ProxyServer. This should never exit (unless CleanupAndExit is set).
// TODO: At the moment, Run() cannot return a nil error, otherwise it's caller will never exit. Update callers of Run to handle nil errors.
func (s *ProxyServer) Run() error {
@ -595,10 +612,10 @@ func (s *ProxyServer) Run() error {
s.Broadcaster.StartRecordingToSink(&v1core.EventSinkImpl{Interface: s.EventClient.Events("")})
}
// TODO(thockin): make it possible for healthz and metrics to be on the same port.
// Start up a healthz server if requested
if s.HealthzServer != nil {
s.HealthzServer.Run()
}
serveHealthz(s.HealthzServer)
// Start up a metrics server if requested
if len(s.MetricsBindAddress) > 0 {

View File

@ -22,17 +22,13 @@ import (
"sync/atomic"
"time"
"k8s.io/klog"
"k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/util/clock"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/client-go/tools/record"
"k8s.io/klog"
api "k8s.io/kubernetes/pkg/apis/core"
)
var proxierHealthzRetryInterval = 60 * time.Second
// ProxierHealthUpdater allows callers to update healthz timestamp only.
type ProxierHealthUpdater interface {
// QueuedUpdate should be called when the proxier receives a Service or Endpoints
@ -43,8 +39,8 @@ type ProxierHealthUpdater interface {
// rules to reflect the current state.
Updated()
// Run starts the healthz http server and returns.
Run()
// Run starts the healthz HTTP server and blocks until it exits.
Run() error
}
var _ ProxierHealthUpdater = &proxierHealthServer{}
@ -92,31 +88,28 @@ func (hs *proxierHealthServer) QueuedUpdate() {
hs.lastQueued.Store(hs.clock.Now())
}
// Run starts the healthz http server and returns.
func (hs *proxierHealthServer) Run() {
// Run starts the healthz HTTP server and blocks until it exits.
func (hs *proxierHealthServer) Run() error {
serveMux := http.NewServeMux()
serveMux.Handle("/healthz", healthzHandler{hs: hs})
server := hs.httpFactory.New(hs.addr, serveMux)
go wait.Until(func() {
klog.V(3).Infof("Starting goroutine for proxier healthz on %s", hs.addr)
listener, err := hs.listener.Listen(hs.addr)
if err != nil {
msg := fmt.Sprintf("Failed to start proxier healthz on %s: %v", hs.addr, err)
if hs.recorder != nil {
hs.recorder.Eventf(hs.nodeRef, api.EventTypeWarning, "FailedToStartProxierHealthcheck", msg)
}
klog.Error(msg)
return
listener, err := hs.listener.Listen(hs.addr)
if err != nil {
msg := fmt.Sprintf("failed to start proxier healthz on %s: %v", hs.addr, err)
// TODO(thockin): move eventing back to caller
if hs.recorder != nil {
hs.recorder.Eventf(hs.nodeRef, api.EventTypeWarning, "FailedToStartProxierHealthcheck", msg)
}
return fmt.Errorf("%v", msg)
}
if err := server.Serve(listener); err != nil {
klog.Errorf("Proxier healthz closed with error: %v", err)
return
}
klog.Error("Unexpected proxier healthz closed.")
}, proxierHealthzRetryInterval, wait.NeverStop)
klog.V(3).Infof("starting healthz on %s", hs.addr)
if err := server.Serve(listener); err != nil {
return fmt.Errorf("proxier healthz closed with error: %v", err)
}
return nil
}
type healthzHandler struct {