Merge pull request #128234 from aroradaman/kube-proxy-multi-listen

kube-proxy: use netutils.MultiListen for healthz and metrics server
This commit is contained in:
Kubernetes Prow Robot 2024-10-23 21:00:59 +01:00 committed by GitHub
commit c73aeaf5b5
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
5 changed files with 51 additions and 34 deletions

View File

@ -419,7 +419,7 @@ func serveHealthz(ctx context.Context, hz *healthcheck.ProxierHealthServer, errC
}
fn := func() {
err := hz.Run()
err := hz.Run(ctx)
if err != nil {
logger.Error(err, "Healthz server failed")
if errCh != nil {
@ -435,7 +435,7 @@ func serveHealthz(ctx context.Context, hz *healthcheck.ProxierHealthServer, errC
go wait.Until(fn, 5*time.Second, ctx.Done())
}
func serveMetrics(bindAddress string, proxyMode kubeproxyconfig.ProxyMode, enableProfiling bool, errCh chan error) {
func serveMetrics(ctx context.Context, bindAddress string, proxyMode kubeproxyconfig.ProxyMode, enableProfiling bool, errCh chan error) {
if len(bindAddress) == 0 {
return
}
@ -460,17 +460,31 @@ func serveMetrics(bindAddress string, proxyMode kubeproxyconfig.ProxyMode, enabl
configz.InstallHandler(proxyMux)
fn := func() {
err := http.ListenAndServe(bindAddress, proxyMux)
if err != nil {
err = fmt.Errorf("starting metrics server failed: %w", err)
utilruntime.HandleError(err)
if errCh != nil {
errCh <- err
// if in hardfail mode, never retry again
blockCh := make(chan error)
<-blockCh
var err error
defer func() {
if err != nil {
err = fmt.Errorf("starting metrics server failed: %w", err)
utilruntime.HandleError(err)
if errCh != nil {
errCh <- err
// if in hardfail mode, never retry again
blockCh := make(chan error)
<-blockCh
}
}
}()
listener, err := netutils.MultiListen(ctx, "tcp", bindAddress)
if err != nil {
return
}
server := &http.Server{Handler: proxyMux}
err = server.Serve(listener)
if err != nil {
return
}
}
go wait.Until(fn, 5*time.Second, wait.NeverStop)
}
@ -512,7 +526,7 @@ func (s *ProxyServer) Run(ctx context.Context) error {
serveHealthz(ctx, s.HealthzServer, healthzErrCh)
// Start up a metrics server if requested
serveMetrics(s.Config.MetricsBindAddress, s.Config.Mode, s.Config.EnableProfiling, metricsErrCh)
serveMetrics(ctx, s.Config.MetricsBindAddress, s.Config.Mode, s.Config.EnableProfiling, metricsErrCh)
noProxyName, err := labels.NewRequirement(apis.LabelServiceProxyName, selection.DoesNotExist, nil)
if err != nil {

View File

@ -17,22 +17,25 @@ limitations under the License.
package healthcheck
import (
"context"
"net"
"net/http"
netutils "k8s.io/utils/net"
)
// listener allows for testing of ServiceHealthServer and ProxierHealthServer.
type listener interface {
// Listen is very much like net.Listen, except the first arg (network) is
// Listen is very much like netutils.MultiListen, except the second arg (network) is
// fixed to be "tcp".
Listen(addr string) (net.Listener, error)
Listen(ctx context.Context, addrs ...string) (net.Listener, error)
}
// httpServerFactory allows for testing of ServiceHealthServer and ProxierHealthServer.
type httpServerFactory interface {
// New creates an instance of a type satisfying HTTPServer. This is
// designed to include http.Server.
New(addr string, handler http.Handler) httpServer
New(handler http.Handler) httpServer
}
// httpServer allows for testing of ServiceHealthServer and ProxierHealthServer.
@ -45,8 +48,8 @@ type httpServer interface {
// Implement listener in terms of net.Listen.
type stdNetListener struct{}
func (stdNetListener) Listen(addr string) (net.Listener, error) {
return net.Listen("tcp", addr)
func (stdNetListener) Listen(ctx context.Context, addrs ...string) (net.Listener, error) {
return netutils.MultiListen(ctx, "tcp", addrs...)
}
var _ listener = stdNetListener{}
@ -54,9 +57,8 @@ var _ listener = stdNetListener{}
// Implement httpServerFactory in terms of http.Server.
type stdHTTPServerFactory struct{}
func (stdHTTPServerFactory) New(addr string, handler http.Handler) httpServer {
func (stdHTTPServerFactory) New(handler http.Handler) httpServer {
return &http.Server{
Addr: addr,
Handler: handler,
}
}

View File

@ -17,6 +17,7 @@ limitations under the License.
package healthcheck
import (
"context"
"encoding/json"
"net"
"net/http"
@ -54,17 +55,17 @@ func (fake *fakeListener) hasPort(addr string) bool {
return fake.openPorts.Has(addr)
}
func (fake *fakeListener) Listen(addr string) (net.Listener, error) {
fake.openPorts.Insert(addr)
func (fake *fakeListener) Listen(_ context.Context, addrs ...string) (net.Listener, error) {
fake.openPorts.Insert(addrs...)
return &fakeNetListener{
parent: fake,
addr: addr,
addrs: addrs,
}, nil
}
type fakeNetListener struct {
parent *fakeListener
addr string
addrs []string
}
type fakeAddr struct {
@ -82,7 +83,7 @@ func (fake *fakeNetListener) Accept() (net.Conn, error) {
}
func (fake *fakeNetListener) Close() error {
fake.parent.openPorts.Delete(fake.addr)
fake.parent.openPorts.Delete(fake.addrs...)
return nil
}
@ -97,15 +98,13 @@ func newFakeHTTPServerFactory() *fakeHTTPServerFactory {
return &fakeHTTPServerFactory{}
}
func (fake *fakeHTTPServerFactory) New(addr string, handler http.Handler) httpServer {
func (fake *fakeHTTPServerFactory) New(handler http.Handler) httpServer {
return &fakeHTTPServer{
addr: addr,
handler: handler,
}
}
type fakeHTTPServer struct {
addr string
handler http.Handler
}
@ -471,7 +470,7 @@ func TestHealthzServer(t *testing.T) {
fakeClock := testingclock.NewFakeClock(time.Now())
hs := newProxierHealthServer(listener, httpFactory, fakeClock, "127.0.0.1:10256", 10*time.Second)
server := hs.httpFactory.New(hs.addr, healthzHandler{hs: hs})
server := hs.httpFactory.New(healthzHandler{hs: hs})
hsTest := &serverTest{
server: server,
@ -506,7 +505,7 @@ func TestLivezServer(t *testing.T) {
fakeClock := testingclock.NewFakeClock(time.Now())
hs := newProxierHealthServer(listener, httpFactory, fakeClock, "127.0.0.1:10256", 10*time.Second)
server := hs.httpFactory.New(hs.addr, livezHandler{hs: hs})
server := hs.httpFactory.New(livezHandler{hs: hs})
hsTest := &serverTest{
server: server,

View File

@ -17,6 +17,7 @@ limitations under the License.
package healthcheck
import (
"context"
"fmt"
"net/http"
"sync"
@ -162,13 +163,13 @@ func (hs *ProxierHealthServer) NodeEligible() bool {
}
// Run starts the healthz HTTP server and blocks until it exits.
func (hs *ProxierHealthServer) Run() error {
func (hs *ProxierHealthServer) Run(ctx context.Context) error {
serveMux := http.NewServeMux()
serveMux.Handle("/healthz", healthzHandler{hs: hs})
serveMux.Handle("/livez", livezHandler{hs: hs})
server := hs.httpFactory.New(hs.addr, serveMux)
server := hs.httpFactory.New(serveMux)
listener, err := hs.listener.Listen(hs.addr)
listener, err := hs.listener.Listen(ctx, hs.addr)
if err != nil {
return fmt.Errorf("failed to start proxier healthz on %s: %v", hs.addr, err)
}

View File

@ -17,6 +17,7 @@ limitations under the License.
package healthcheck
import (
"context"
"fmt"
"net"
"net/http"
@ -170,9 +171,9 @@ func (hcI *hcInstance) listenAndServeAll(hcs *server) error {
for _, ip := range hcs.nodeIPs {
addr := net.JoinHostPort(ip.String(), fmt.Sprint(hcI.port))
// create http server
httpSrv := hcs.httpFactory.New(addr, hcHandler{name: hcI.nsn, hcs: hcs})
httpSrv := hcs.httpFactory.New(hcHandler{name: hcI.nsn, hcs: hcs})
// start listener
listener, err = hcs.listener.Listen(addr)
listener, err = hcs.listener.Listen(context.TODO(), addr)
if err != nil {
// must close whatever have been previously opened
// to allow a retry/or port ownership change as needed