mirror of
https://github.com/k3s-io/kubernetes.git
synced 2025-07-29 14:37:00 +00:00
kube-proxy: use netutils.MultiListen for healthz and metrics server
Signed-off-by: Daman Arora <aroradaman@gmail.com>
(cherry picked from commit 7ce36f9bca
)
This commit is contained in:
parent
f1e447b9d3
commit
0aa9dc84ab
@ -419,7 +419,7 @@ func serveHealthz(ctx context.Context, hz *healthcheck.ProxierHealthServer, errC
|
||||
}
|
||||
|
||||
fn := func() {
|
||||
err := hz.Run()
|
||||
err := hz.Run(ctx)
|
||||
if err != nil {
|
||||
logger.Error(err, "Healthz server failed")
|
||||
if errCh != nil {
|
||||
@ -435,7 +435,7 @@ func serveHealthz(ctx context.Context, hz *healthcheck.ProxierHealthServer, errC
|
||||
go wait.Until(fn, 5*time.Second, ctx.Done())
|
||||
}
|
||||
|
||||
func serveMetrics(bindAddress string, proxyMode kubeproxyconfig.ProxyMode, enableProfiling bool, errCh chan error) {
|
||||
func serveMetrics(ctx context.Context, bindAddress string, proxyMode kubeproxyconfig.ProxyMode, enableProfiling bool, errCh chan error) {
|
||||
if len(bindAddress) == 0 {
|
||||
return
|
||||
}
|
||||
@ -460,17 +460,31 @@ func serveMetrics(bindAddress string, proxyMode kubeproxyconfig.ProxyMode, enabl
|
||||
configz.InstallHandler(proxyMux)
|
||||
|
||||
fn := func() {
|
||||
err := http.ListenAndServe(bindAddress, proxyMux)
|
||||
if err != nil {
|
||||
err = fmt.Errorf("starting metrics server failed: %w", err)
|
||||
utilruntime.HandleError(err)
|
||||
if errCh != nil {
|
||||
errCh <- err
|
||||
// if in hardfail mode, never retry again
|
||||
blockCh := make(chan error)
|
||||
<-blockCh
|
||||
var err error
|
||||
defer func() {
|
||||
if err != nil {
|
||||
err = fmt.Errorf("starting metrics server failed: %w", err)
|
||||
utilruntime.HandleError(err)
|
||||
if errCh != nil {
|
||||
errCh <- err
|
||||
// if in hardfail mode, never retry again
|
||||
blockCh := make(chan error)
|
||||
<-blockCh
|
||||
}
|
||||
}
|
||||
}()
|
||||
|
||||
listener, err := netutils.MultiListen(ctx, "tcp", bindAddress)
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
|
||||
server := &http.Server{Handler: proxyMux}
|
||||
err = server.Serve(listener)
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
|
||||
}
|
||||
go wait.Until(fn, 5*time.Second, wait.NeverStop)
|
||||
}
|
||||
@ -512,7 +526,7 @@ func (s *ProxyServer) Run(ctx context.Context) error {
|
||||
serveHealthz(ctx, s.HealthzServer, healthzErrCh)
|
||||
|
||||
// Start up a metrics server if requested
|
||||
serveMetrics(s.Config.MetricsBindAddress, s.Config.Mode, s.Config.EnableProfiling, metricsErrCh)
|
||||
serveMetrics(ctx, s.Config.MetricsBindAddress, s.Config.Mode, s.Config.EnableProfiling, metricsErrCh)
|
||||
|
||||
noProxyName, err := labels.NewRequirement(apis.LabelServiceProxyName, selection.DoesNotExist, nil)
|
||||
if err != nil {
|
||||
|
@ -17,22 +17,25 @@ limitations under the License.
|
||||
package healthcheck
|
||||
|
||||
import (
|
||||
"context"
|
||||
"net"
|
||||
"net/http"
|
||||
|
||||
netutils "k8s.io/utils/net"
|
||||
)
|
||||
|
||||
// listener allows for testing of ServiceHealthServer and ProxierHealthServer.
|
||||
type listener interface {
|
||||
// Listen is very much like net.Listen, except the first arg (network) is
|
||||
// Listen is very much like netutils.MultiListen, except the second arg (network) is
|
||||
// fixed to be "tcp".
|
||||
Listen(addr string) (net.Listener, error)
|
||||
Listen(ctx context.Context, addrs ...string) (net.Listener, error)
|
||||
}
|
||||
|
||||
// httpServerFactory allows for testing of ServiceHealthServer and ProxierHealthServer.
|
||||
type httpServerFactory interface {
|
||||
// New creates an instance of a type satisfying HTTPServer. This is
|
||||
// designed to include http.Server.
|
||||
New(addr string, handler http.Handler) httpServer
|
||||
New(handler http.Handler) httpServer
|
||||
}
|
||||
|
||||
// httpServer allows for testing of ServiceHealthServer and ProxierHealthServer.
|
||||
@ -45,8 +48,8 @@ type httpServer interface {
|
||||
// Implement listener in terms of net.Listen.
|
||||
type stdNetListener struct{}
|
||||
|
||||
func (stdNetListener) Listen(addr string) (net.Listener, error) {
|
||||
return net.Listen("tcp", addr)
|
||||
func (stdNetListener) Listen(ctx context.Context, addrs ...string) (net.Listener, error) {
|
||||
return netutils.MultiListen(ctx, "tcp", addrs...)
|
||||
}
|
||||
|
||||
var _ listener = stdNetListener{}
|
||||
@ -54,9 +57,8 @@ var _ listener = stdNetListener{}
|
||||
// Implement httpServerFactory in terms of http.Server.
|
||||
type stdHTTPServerFactory struct{}
|
||||
|
||||
func (stdHTTPServerFactory) New(addr string, handler http.Handler) httpServer {
|
||||
func (stdHTTPServerFactory) New(handler http.Handler) httpServer {
|
||||
return &http.Server{
|
||||
Addr: addr,
|
||||
Handler: handler,
|
||||
}
|
||||
}
|
||||
|
@ -17,6 +17,7 @@ limitations under the License.
|
||||
package healthcheck
|
||||
|
||||
import (
|
||||
"context"
|
||||
"encoding/json"
|
||||
"net"
|
||||
"net/http"
|
||||
@ -54,17 +55,17 @@ func (fake *fakeListener) hasPort(addr string) bool {
|
||||
return fake.openPorts.Has(addr)
|
||||
}
|
||||
|
||||
func (fake *fakeListener) Listen(addr string) (net.Listener, error) {
|
||||
fake.openPorts.Insert(addr)
|
||||
func (fake *fakeListener) Listen(_ context.Context, addrs ...string) (net.Listener, error) {
|
||||
fake.openPorts.Insert(addrs...)
|
||||
return &fakeNetListener{
|
||||
parent: fake,
|
||||
addr: addr,
|
||||
addrs: addrs,
|
||||
}, nil
|
||||
}
|
||||
|
||||
type fakeNetListener struct {
|
||||
parent *fakeListener
|
||||
addr string
|
||||
addrs []string
|
||||
}
|
||||
|
||||
type fakeAddr struct {
|
||||
@ -82,7 +83,7 @@ func (fake *fakeNetListener) Accept() (net.Conn, error) {
|
||||
}
|
||||
|
||||
func (fake *fakeNetListener) Close() error {
|
||||
fake.parent.openPorts.Delete(fake.addr)
|
||||
fake.parent.openPorts.Delete(fake.addrs...)
|
||||
return nil
|
||||
}
|
||||
|
||||
@ -97,15 +98,13 @@ func newFakeHTTPServerFactory() *fakeHTTPServerFactory {
|
||||
return &fakeHTTPServerFactory{}
|
||||
}
|
||||
|
||||
func (fake *fakeHTTPServerFactory) New(addr string, handler http.Handler) httpServer {
|
||||
func (fake *fakeHTTPServerFactory) New(handler http.Handler) httpServer {
|
||||
return &fakeHTTPServer{
|
||||
addr: addr,
|
||||
handler: handler,
|
||||
}
|
||||
}
|
||||
|
||||
type fakeHTTPServer struct {
|
||||
addr string
|
||||
handler http.Handler
|
||||
}
|
||||
|
||||
@ -471,7 +470,7 @@ func TestHealthzServer(t *testing.T) {
|
||||
fakeClock := testingclock.NewFakeClock(time.Now())
|
||||
|
||||
hs := newProxierHealthServer(listener, httpFactory, fakeClock, "127.0.0.1:10256", 10*time.Second)
|
||||
server := hs.httpFactory.New(hs.addr, healthzHandler{hs: hs})
|
||||
server := hs.httpFactory.New(healthzHandler{hs: hs})
|
||||
|
||||
hsTest := &serverTest{
|
||||
server: server,
|
||||
@ -506,7 +505,7 @@ func TestLivezServer(t *testing.T) {
|
||||
fakeClock := testingclock.NewFakeClock(time.Now())
|
||||
|
||||
hs := newProxierHealthServer(listener, httpFactory, fakeClock, "127.0.0.1:10256", 10*time.Second)
|
||||
server := hs.httpFactory.New(hs.addr, livezHandler{hs: hs})
|
||||
server := hs.httpFactory.New(livezHandler{hs: hs})
|
||||
|
||||
hsTest := &serverTest{
|
||||
server: server,
|
||||
|
@ -17,6 +17,7 @@ limitations under the License.
|
||||
package healthcheck
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"net/http"
|
||||
"sync"
|
||||
@ -162,13 +163,13 @@ func (hs *ProxierHealthServer) NodeEligible() bool {
|
||||
}
|
||||
|
||||
// Run starts the healthz HTTP server and blocks until it exits.
|
||||
func (hs *ProxierHealthServer) Run() error {
|
||||
func (hs *ProxierHealthServer) Run(ctx context.Context) error {
|
||||
serveMux := http.NewServeMux()
|
||||
serveMux.Handle("/healthz", healthzHandler{hs: hs})
|
||||
serveMux.Handle("/livez", livezHandler{hs: hs})
|
||||
server := hs.httpFactory.New(hs.addr, serveMux)
|
||||
server := hs.httpFactory.New(serveMux)
|
||||
|
||||
listener, err := hs.listener.Listen(hs.addr)
|
||||
listener, err := hs.listener.Listen(ctx, hs.addr)
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to start proxier healthz on %s: %v", hs.addr, err)
|
||||
}
|
||||
|
@ -17,6 +17,7 @@ limitations under the License.
|
||||
package healthcheck
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"net"
|
||||
"net/http"
|
||||
@ -170,9 +171,9 @@ func (hcI *hcInstance) listenAndServeAll(hcs *server) error {
|
||||
for _, ip := range hcs.nodeIPs {
|
||||
addr := net.JoinHostPort(ip.String(), fmt.Sprint(hcI.port))
|
||||
// create http server
|
||||
httpSrv := hcs.httpFactory.New(addr, hcHandler{name: hcI.nsn, hcs: hcs})
|
||||
httpSrv := hcs.httpFactory.New(hcHandler{name: hcI.nsn, hcs: hcs})
|
||||
// start listener
|
||||
listener, err = hcs.listener.Listen(addr)
|
||||
listener, err = hcs.listener.Listen(context.TODO(), addr)
|
||||
if err != nil {
|
||||
// must close whatever have been previously opened
|
||||
// to allow a retry/or port ownership change as needed
|
||||
|
Loading…
Reference in New Issue
Block a user