diff --git a/cmd/kube-proxy/app/server.go b/cmd/kube-proxy/app/server.go index 91d4d6e93b6..385726f907e 100644 --- a/cmd/kube-proxy/app/server.go +++ b/cmd/kube-proxy/app/server.go @@ -419,7 +419,7 @@ func serveHealthz(ctx context.Context, hz *healthcheck.ProxierHealthServer, errC } fn := func() { - err := hz.Run() + err := hz.Run(ctx) if err != nil { logger.Error(err, "Healthz server failed") if errCh != nil { @@ -435,7 +435,7 @@ func serveHealthz(ctx context.Context, hz *healthcheck.ProxierHealthServer, errC go wait.Until(fn, 5*time.Second, ctx.Done()) } -func serveMetrics(bindAddress string, proxyMode kubeproxyconfig.ProxyMode, enableProfiling bool, errCh chan error) { +func serveMetrics(ctx context.Context, bindAddress string, proxyMode kubeproxyconfig.ProxyMode, enableProfiling bool, errCh chan error) { if len(bindAddress) == 0 { return } @@ -460,17 +460,31 @@ func serveMetrics(bindAddress string, proxyMode kubeproxyconfig.ProxyMode, enabl configz.InstallHandler(proxyMux) fn := func() { - err := http.ListenAndServe(bindAddress, proxyMux) - if err != nil { - err = fmt.Errorf("starting metrics server failed: %w", err) - utilruntime.HandleError(err) - if errCh != nil { - errCh <- err - // if in hardfail mode, never retry again - blockCh := make(chan error) - <-blockCh + var err error + defer func() { + if err != nil { + err = fmt.Errorf("starting metrics server failed: %w", err) + utilruntime.HandleError(err) + if errCh != nil { + errCh <- err + // if in hardfail mode, never retry again + blockCh := make(chan error) + <-blockCh + } } + }() + + listener, err := netutils.MultiListen(ctx, "tcp", bindAddress) + if err != nil { + return } + + server := &http.Server{Handler: proxyMux} + err = server.Serve(listener) + if err != nil { + return + } + } go wait.Until(fn, 5*time.Second, wait.NeverStop) } @@ -512,7 +526,7 @@ func (s *ProxyServer) Run(ctx context.Context) error { serveHealthz(ctx, s.HealthzServer, healthzErrCh) // Start up a metrics server if requested - serveMetrics(s.Config.MetricsBindAddress, s.Config.Mode, s.Config.EnableProfiling, metricsErrCh) + serveMetrics(ctx, s.Config.MetricsBindAddress, s.Config.Mode, s.Config.EnableProfiling, metricsErrCh) noProxyName, err := labels.NewRequirement(apis.LabelServiceProxyName, selection.DoesNotExist, nil) if err != nil { diff --git a/pkg/proxy/healthcheck/common.go b/pkg/proxy/healthcheck/common.go index 2013508c1c4..c7a9f8bfbe0 100644 --- a/pkg/proxy/healthcheck/common.go +++ b/pkg/proxy/healthcheck/common.go @@ -17,22 +17,25 @@ limitations under the License. package healthcheck import ( + "context" "net" "net/http" + + netutils "k8s.io/utils/net" ) // listener allows for testing of ServiceHealthServer and ProxierHealthServer. type listener interface { - // Listen is very much like net.Listen, except the first arg (network) is + // Listen is very much like netutils.MultiListen, except the second arg (network) is // fixed to be "tcp". - Listen(addr string) (net.Listener, error) + Listen(ctx context.Context, addrs ...string) (net.Listener, error) } // httpServerFactory allows for testing of ServiceHealthServer and ProxierHealthServer. type httpServerFactory interface { // New creates an instance of a type satisfying HTTPServer. This is // designed to include http.Server. - New(addr string, handler http.Handler) httpServer + New(handler http.Handler) httpServer } // httpServer allows for testing of ServiceHealthServer and ProxierHealthServer. @@ -45,8 +48,8 @@ type httpServer interface { // Implement listener in terms of net.Listen. type stdNetListener struct{} -func (stdNetListener) Listen(addr string) (net.Listener, error) { - return net.Listen("tcp", addr) +func (stdNetListener) Listen(ctx context.Context, addrs ...string) (net.Listener, error) { + return netutils.MultiListen(ctx, "tcp", addrs...) } var _ listener = stdNetListener{} @@ -54,9 +57,8 @@ var _ listener = stdNetListener{} // Implement httpServerFactory in terms of http.Server. type stdHTTPServerFactory struct{} -func (stdHTTPServerFactory) New(addr string, handler http.Handler) httpServer { +func (stdHTTPServerFactory) New(handler http.Handler) httpServer { return &http.Server{ - Addr: addr, Handler: handler, } } diff --git a/pkg/proxy/healthcheck/healthcheck_test.go b/pkg/proxy/healthcheck/healthcheck_test.go index a3047a1829d..777f529897e 100644 --- a/pkg/proxy/healthcheck/healthcheck_test.go +++ b/pkg/proxy/healthcheck/healthcheck_test.go @@ -17,6 +17,7 @@ limitations under the License. package healthcheck import ( + "context" "encoding/json" "net" "net/http" @@ -54,17 +55,17 @@ func (fake *fakeListener) hasPort(addr string) bool { return fake.openPorts.Has(addr) } -func (fake *fakeListener) Listen(addr string) (net.Listener, error) { - fake.openPorts.Insert(addr) +func (fake *fakeListener) Listen(_ context.Context, addrs ...string) (net.Listener, error) { + fake.openPorts.Insert(addrs...) return &fakeNetListener{ parent: fake, - addr: addr, + addrs: addrs, }, nil } type fakeNetListener struct { parent *fakeListener - addr string + addrs []string } type fakeAddr struct { @@ -82,7 +83,7 @@ func (fake *fakeNetListener) Accept() (net.Conn, error) { } func (fake *fakeNetListener) Close() error { - fake.parent.openPorts.Delete(fake.addr) + fake.parent.openPorts.Delete(fake.addrs...) return nil } @@ -97,15 +98,13 @@ func newFakeHTTPServerFactory() *fakeHTTPServerFactory { return &fakeHTTPServerFactory{} } -func (fake *fakeHTTPServerFactory) New(addr string, handler http.Handler) httpServer { +func (fake *fakeHTTPServerFactory) New(handler http.Handler) httpServer { return &fakeHTTPServer{ - addr: addr, handler: handler, } } type fakeHTTPServer struct { - addr string handler http.Handler } @@ -471,7 +470,7 @@ func TestHealthzServer(t *testing.T) { fakeClock := testingclock.NewFakeClock(time.Now()) hs := newProxierHealthServer(listener, httpFactory, fakeClock, "127.0.0.1:10256", 10*time.Second) - server := hs.httpFactory.New(hs.addr, healthzHandler{hs: hs}) + server := hs.httpFactory.New(healthzHandler{hs: hs}) hsTest := &serverTest{ server: server, @@ -506,7 +505,7 @@ func TestLivezServer(t *testing.T) { fakeClock := testingclock.NewFakeClock(time.Now()) hs := newProxierHealthServer(listener, httpFactory, fakeClock, "127.0.0.1:10256", 10*time.Second) - server := hs.httpFactory.New(hs.addr, livezHandler{hs: hs}) + server := hs.httpFactory.New(livezHandler{hs: hs}) hsTest := &serverTest{ server: server, diff --git a/pkg/proxy/healthcheck/proxier_health.go b/pkg/proxy/healthcheck/proxier_health.go index 7b009fba135..a76437189bd 100644 --- a/pkg/proxy/healthcheck/proxier_health.go +++ b/pkg/proxy/healthcheck/proxier_health.go @@ -17,6 +17,7 @@ limitations under the License. package healthcheck import ( + "context" "fmt" "net/http" "sync" @@ -162,13 +163,13 @@ func (hs *ProxierHealthServer) NodeEligible() bool { } // Run starts the healthz HTTP server and blocks until it exits. -func (hs *ProxierHealthServer) Run() error { +func (hs *ProxierHealthServer) Run(ctx context.Context) error { serveMux := http.NewServeMux() serveMux.Handle("/healthz", healthzHandler{hs: hs}) serveMux.Handle("/livez", livezHandler{hs: hs}) - server := hs.httpFactory.New(hs.addr, serveMux) + server := hs.httpFactory.New(serveMux) - listener, err := hs.listener.Listen(hs.addr) + listener, err := hs.listener.Listen(ctx, hs.addr) if err != nil { return fmt.Errorf("failed to start proxier healthz on %s: %v", hs.addr, err) } diff --git a/pkg/proxy/healthcheck/service_health.go b/pkg/proxy/healthcheck/service_health.go index 25d48285560..44eb8b687cf 100644 --- a/pkg/proxy/healthcheck/service_health.go +++ b/pkg/proxy/healthcheck/service_health.go @@ -17,6 +17,7 @@ limitations under the License. package healthcheck import ( + "context" "fmt" "net" "net/http" @@ -170,9 +171,9 @@ func (hcI *hcInstance) listenAndServeAll(hcs *server) error { for _, ip := range hcs.nodeIPs { addr := net.JoinHostPort(ip.String(), fmt.Sprint(hcI.port)) // create http server - httpSrv := hcs.httpFactory.New(addr, hcHandler{name: hcI.nsn, hcs: hcs}) + httpSrv := hcs.httpFactory.New(hcHandler{name: hcI.nsn, hcs: hcs}) // start listener - listener, err = hcs.listener.Listen(addr) + listener, err = hcs.listener.Listen(context.TODO(), addr) if err != nil { // must close whatever have been previously opened // to allow a retry/or port ownership change as needed