mirror of
https://github.com/k3s-io/kubernetes.git
synced 2025-07-29 14:37:00 +00:00
Better distinguish the two kinds of proxy health check servers
Kube-proxy runs two different health servers; one for monitoring the health of kube-proxy itself, and one for monitoring the health of specific services. Rename them to "ProxierHealthServer" and "ServiceHealthServer" to make this clearer, and do a bit of API cleanup too.
This commit is contained in:
parent
ded22e3975
commit
0f10102c16
@ -482,7 +482,7 @@ type ProxyServer struct {
|
||||
UseEndpointSlices bool
|
||||
OOMScoreAdj *int32
|
||||
ConfigSyncPeriod time.Duration
|
||||
HealthzServer *healthcheck.HealthzServer
|
||||
HealthzServer *healthcheck.ProxierHealthServer
|
||||
}
|
||||
|
||||
// createClients creates a kube client and an event client from the given config and masterOverride.
|
||||
|
@ -125,11 +125,9 @@ func newProxyServer(
|
||||
Namespace: "",
|
||||
}
|
||||
|
||||
var healthzServer *healthcheck.HealthzServer
|
||||
var healthzUpdater healthcheck.HealthzUpdater
|
||||
var healthzServer *healthcheck.ProxierHealthServer
|
||||
if len(config.HealthzBindAddress) > 0 {
|
||||
healthzServer = healthcheck.NewDefaultHealthzServer(config.HealthzBindAddress, 2*config.IPTables.SyncPeriod.Duration, recorder, nodeRef)
|
||||
healthzUpdater = healthzServer
|
||||
healthzServer = healthcheck.NewProxierHealthServer(config.HealthzBindAddress, 2*config.IPTables.SyncPeriod.Duration, recorder, nodeRef)
|
||||
}
|
||||
|
||||
var proxier proxy.Provider
|
||||
@ -162,7 +160,7 @@ func newProxyServer(
|
||||
hostname,
|
||||
nodeIP,
|
||||
recorder,
|
||||
healthzUpdater,
|
||||
healthzServer,
|
||||
config.NodePortAddresses,
|
||||
)
|
||||
if err != nil {
|
||||
|
@ -87,11 +87,9 @@ func newProxyServer(config *proxyconfigapi.KubeProxyConfiguration, cleanupAndExi
|
||||
Namespace: "",
|
||||
}
|
||||
|
||||
var healthzServer *healthcheck.HealthzServer
|
||||
var healthzUpdater healthcheck.HealthzUpdater
|
||||
var healthzServer *healthcheck.ProxierHealthServer
|
||||
if len(config.HealthzBindAddress) > 0 {
|
||||
healthzServer = healthcheck.NewDefaultHealthzServer(config.HealthzBindAddress, 2*config.IPTables.SyncPeriod.Duration, recorder, nodeRef)
|
||||
healthzUpdater = healthzServer
|
||||
healthzServer = healthcheck.NewProxierHealthServer(config.HealthzBindAddress, 2*config.IPTables.SyncPeriod.Duration, recorder, nodeRef)
|
||||
}
|
||||
|
||||
var proxier proxy.Provider
|
||||
@ -108,7 +106,7 @@ func newProxyServer(config *proxyconfigapi.KubeProxyConfiguration, cleanupAndExi
|
||||
hostname,
|
||||
utilnode.GetNodeIP(client, hostname),
|
||||
recorder,
|
||||
healthzUpdater,
|
||||
healthzServer,
|
||||
config.Winkernel,
|
||||
)
|
||||
if err != nil {
|
||||
|
@ -9,8 +9,10 @@ load(
|
||||
go_library(
|
||||
name = "go_default_library",
|
||||
srcs = [
|
||||
"common.go",
|
||||
"doc.go",
|
||||
"healthcheck.go",
|
||||
"proxier_health.go",
|
||||
"service_health.go",
|
||||
],
|
||||
importpath = "k8s.io/kubernetes/pkg/proxy/healthcheck",
|
||||
deps = [
|
||||
|
63
pkg/proxy/healthcheck/common.go
Normal file
63
pkg/proxy/healthcheck/common.go
Normal file
@ -0,0 +1,63 @@
|
||||
/*
|
||||
Copyright 2016 The Kubernetes Authors.
|
||||
|
||||
Licensed under the Apache License, Version 2.0 (the "License");
|
||||
you may not use this file except in compliance with the License.
|
||||
You may obtain a copy of the License at
|
||||
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
|
||||
Unless required by applicable law or agreed to in writing, software
|
||||
distributed under the License is distributed on an "AS IS" BASIS,
|
||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
*/
|
||||
|
||||
package healthcheck
|
||||
|
||||
import (
|
||||
"net"
|
||||
"net/http"
|
||||
)
|
||||
|
||||
// listener allows for testing of ServiceHealthServer and ProxierHealthServer.
|
||||
type listener interface {
|
||||
// Listen is very much like net.Listen, except the first arg (network) is
|
||||
// fixed to be "tcp".
|
||||
Listen(addr string) (net.Listener, error)
|
||||
}
|
||||
|
||||
// httpServerFactory allows for testing of ServiceHealthServer and ProxierHealthServer.
|
||||
type httpServerFactory interface {
|
||||
// New creates an instance of a type satisfying HTTPServer. This is
|
||||
// designed to include http.Server.
|
||||
New(addr string, handler http.Handler) httpServer
|
||||
}
|
||||
|
||||
// httpServer allows for testing of ServiceHealthServer and ProxierHealthServer.
|
||||
// It is designed so that http.Server satisfies this interface,
|
||||
type httpServer interface {
|
||||
Serve(listener net.Listener) error
|
||||
}
|
||||
|
||||
// Implement listener in terms of net.Listen.
|
||||
type stdNetListener struct{}
|
||||
|
||||
func (stdNetListener) Listen(addr string) (net.Listener, error) {
|
||||
return net.Listen("tcp", addr)
|
||||
}
|
||||
|
||||
var _ listener = stdNetListener{}
|
||||
|
||||
// Implement httpServerFactory in terms of http.Server.
|
||||
type stdHTTPServerFactory struct{}
|
||||
|
||||
func (stdHTTPServerFactory) New(addr string, handler http.Handler) httpServer {
|
||||
return &http.Server{
|
||||
Addr: addr,
|
||||
Handler: handler,
|
||||
}
|
||||
}
|
||||
|
||||
var _ httpServerFactory = stdHTTPServerFactory{}
|
@ -79,7 +79,7 @@ func newFakeHTTPServerFactory() *fakeHTTPServerFactory {
|
||||
return &fakeHTTPServerFactory{}
|
||||
}
|
||||
|
||||
func (fake *fakeHTTPServerFactory) New(addr string, handler http.Handler) HTTPServer {
|
||||
func (fake *fakeHTTPServerFactory) New(addr string, handler http.Handler) httpServer {
|
||||
return &fakeHTTPServer{
|
||||
addr: addr,
|
||||
handler: handler,
|
||||
@ -119,7 +119,7 @@ func TestServer(t *testing.T) {
|
||||
listener := newFakeListener()
|
||||
httpFactory := newFakeHTTPServerFactory()
|
||||
|
||||
hcsi := NewServer("hostname", nil, listener, httpFactory)
|
||||
hcsi := newServiceHealthServer("hostname", nil, listener, httpFactory)
|
||||
hcs := hcsi.(*server)
|
||||
if len(hcs.services) != 0 {
|
||||
t.Errorf("expected 0 services, got %d", len(hcs.services))
|
||||
@ -368,7 +368,7 @@ func TestHealthzServer(t *testing.T) {
|
||||
httpFactory := newFakeHTTPServerFactory()
|
||||
fakeClock := clock.NewFakeClock(time.Now())
|
||||
|
||||
hs := newHealthzServer(listener, httpFactory, fakeClock, "127.0.0.1:10256", 10*time.Second, nil, nil)
|
||||
hs := newProxierHealthServer(listener, httpFactory, fakeClock, "127.0.0.1:10256", 10*time.Second, nil, nil)
|
||||
server := hs.httpFactory.New(hs.addr, healthzHandler{hs: hs})
|
||||
|
||||
// Should return 200 "OK" by default.
|
||||
@ -385,7 +385,7 @@ func TestHealthzServer(t *testing.T) {
|
||||
testHealthzHandler(server, http.StatusOK, t)
|
||||
}
|
||||
|
||||
func testHealthzHandler(server HTTPServer, status int, t *testing.T) {
|
||||
func testHealthzHandler(server httpServer, status int, t *testing.T) {
|
||||
handler := server.(*fakeHTTPServer).handler
|
||||
req, err := http.NewRequest("GET", "/healthz", nil)
|
||||
if err != nil {
|
||||
|
126
pkg/proxy/healthcheck/proxier_health.go
Normal file
126
pkg/proxy/healthcheck/proxier_health.go
Normal file
@ -0,0 +1,126 @@
|
||||
/*
|
||||
Copyright 2016 The Kubernetes Authors.
|
||||
|
||||
Licensed under the Apache License, Version 2.0 (the "License");
|
||||
you may not use this file except in compliance with the License.
|
||||
You may obtain a copy of the License at
|
||||
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
|
||||
Unless required by applicable law or agreed to in writing, software
|
||||
distributed under the License is distributed on an "AS IS" BASIS,
|
||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
*/
|
||||
|
||||
package healthcheck
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"net/http"
|
||||
"sync/atomic"
|
||||
"time"
|
||||
|
||||
"k8s.io/klog"
|
||||
|
||||
"k8s.io/api/core/v1"
|
||||
"k8s.io/apimachinery/pkg/util/clock"
|
||||
"k8s.io/apimachinery/pkg/util/wait"
|
||||
"k8s.io/client-go/tools/record"
|
||||
api "k8s.io/kubernetes/pkg/apis/core"
|
||||
)
|
||||
|
||||
var proxierHealthzRetryInterval = 60 * time.Second
|
||||
|
||||
// ProxierHealthUpdater allows callers to update healthz timestamp only.
|
||||
type ProxierHealthUpdater interface {
|
||||
UpdateTimestamp()
|
||||
}
|
||||
|
||||
// ProxierHealthServer returns 200 "OK" by default. Once timestamp has been
|
||||
// updated, it verifies we don't exceed max no respond duration since
|
||||
// last update.
|
||||
type ProxierHealthServer struct {
|
||||
listener listener
|
||||
httpFactory httpServerFactory
|
||||
clock clock.Clock
|
||||
|
||||
addr string
|
||||
port int32
|
||||
healthTimeout time.Duration
|
||||
recorder record.EventRecorder
|
||||
nodeRef *v1.ObjectReference
|
||||
|
||||
lastUpdated atomic.Value
|
||||
}
|
||||
|
||||
// NewProxierHealthServer returns a proxier health http server.
|
||||
func NewProxierHealthServer(addr string, healthTimeout time.Duration, recorder record.EventRecorder, nodeRef *v1.ObjectReference) *ProxierHealthServer {
|
||||
return newProxierHealthServer(stdNetListener{}, stdHTTPServerFactory{}, clock.RealClock{}, addr, healthTimeout, recorder, nodeRef)
|
||||
}
|
||||
|
||||
func newProxierHealthServer(listener listener, httpServerFactory httpServerFactory, c clock.Clock, addr string, healthTimeout time.Duration, recorder record.EventRecorder, nodeRef *v1.ObjectReference) *ProxierHealthServer {
|
||||
return &ProxierHealthServer{
|
||||
listener: listener,
|
||||
httpFactory: httpServerFactory,
|
||||
clock: c,
|
||||
addr: addr,
|
||||
healthTimeout: healthTimeout,
|
||||
recorder: recorder,
|
||||
nodeRef: nodeRef,
|
||||
}
|
||||
}
|
||||
|
||||
// UpdateTimestamp updates the lastUpdated timestamp.
|
||||
func (hs *ProxierHealthServer) UpdateTimestamp() {
|
||||
hs.lastUpdated.Store(hs.clock.Now())
|
||||
}
|
||||
|
||||
// Run starts the healthz http server and returns.
|
||||
func (hs *ProxierHealthServer) Run() {
|
||||
serveMux := http.NewServeMux()
|
||||
serveMux.Handle("/healthz", healthzHandler{hs: hs})
|
||||
server := hs.httpFactory.New(hs.addr, serveMux)
|
||||
|
||||
go wait.Until(func() {
|
||||
klog.V(3).Infof("Starting goroutine for proxier healthz on %s", hs.addr)
|
||||
|
||||
listener, err := hs.listener.Listen(hs.addr)
|
||||
if err != nil {
|
||||
msg := fmt.Sprintf("Failed to start proxier healthz on %s: %v", hs.addr, err)
|
||||
if hs.recorder != nil {
|
||||
hs.recorder.Eventf(hs.nodeRef, api.EventTypeWarning, "FailedToStartProxierHealthcheck", msg)
|
||||
}
|
||||
klog.Error(msg)
|
||||
return
|
||||
}
|
||||
|
||||
if err := server.Serve(listener); err != nil {
|
||||
klog.Errorf("Proxier healthz closed with error: %v", err)
|
||||
return
|
||||
}
|
||||
klog.Error("Unexpected proxier healthz closed.")
|
||||
}, proxierHealthzRetryInterval, wait.NeverStop)
|
||||
}
|
||||
|
||||
type healthzHandler struct {
|
||||
hs *ProxierHealthServer
|
||||
}
|
||||
|
||||
func (h healthzHandler) ServeHTTP(resp http.ResponseWriter, req *http.Request) {
|
||||
lastUpdated := time.Time{}
|
||||
if val := h.hs.lastUpdated.Load(); val != nil {
|
||||
lastUpdated = val.(time.Time)
|
||||
}
|
||||
currentTime := h.hs.clock.Now()
|
||||
|
||||
resp.Header().Set("Content-Type", "application/json")
|
||||
resp.Header().Set("X-Content-Type-Options", "nosniff")
|
||||
if !lastUpdated.IsZero() && currentTime.After(lastUpdated.Add(h.hs.healthTimeout)) {
|
||||
resp.WriteHeader(http.StatusServiceUnavailable)
|
||||
} else {
|
||||
resp.WriteHeader(http.StatusOK)
|
||||
}
|
||||
fmt.Fprintf(resp, fmt.Sprintf(`{"lastUpdated": %q,"currentTime": %q}`, lastUpdated, currentTime))
|
||||
}
|
@ -22,27 +22,21 @@ import (
|
||||
"net/http"
|
||||
"strings"
|
||||
"sync"
|
||||
"sync/atomic"
|
||||
"time"
|
||||
|
||||
"github.com/lithammer/dedent"
|
||||
"k8s.io/klog"
|
||||
|
||||
"k8s.io/api/core/v1"
|
||||
"k8s.io/apimachinery/pkg/types"
|
||||
"k8s.io/apimachinery/pkg/util/clock"
|
||||
"k8s.io/apimachinery/pkg/util/wait"
|
||||
"k8s.io/client-go/tools/record"
|
||||
api "k8s.io/kubernetes/pkg/apis/core"
|
||||
)
|
||||
|
||||
var nodeHealthzRetryInterval = 60 * time.Second
|
||||
|
||||
// Server serves HTTP endpoints for each service name, with results
|
||||
// ServiceHealthServer serves HTTP endpoints for each service name, with results
|
||||
// based on the endpoints. If there are 0 endpoints for a service, it returns a
|
||||
// 503 "Service Unavailable" error (telling LBs not to use this node). If there
|
||||
// are 1 or more endpoints, it returns a 200 "OK".
|
||||
type Server interface {
|
||||
type ServiceHealthServer interface {
|
||||
// Make the new set of services be active. Services that were open before
|
||||
// will be closed. Services that are new will be opened. Service that
|
||||
// existed and are in the new set will be left alone. The value of the map
|
||||
@ -54,73 +48,26 @@ type Server interface {
|
||||
SyncEndpoints(newEndpoints map[types.NamespacedName]int) error
|
||||
}
|
||||
|
||||
// Listener allows for testing of Server. If the Listener argument
|
||||
// to NewServer() is nil, the real net.Listen function will be used.
|
||||
type Listener interface {
|
||||
// Listen is very much like net.Listen, except the first arg (network) is
|
||||
// fixed to be "tcp".
|
||||
Listen(addr string) (net.Listener, error)
|
||||
}
|
||||
|
||||
// HTTPServerFactory allows for testing of Server. If the
|
||||
// HTTPServerFactory argument to NewServer() is nil, the real
|
||||
// http.Server type will be used.
|
||||
type HTTPServerFactory interface {
|
||||
// New creates an instance of a type satisfying HTTPServer. This is
|
||||
// designed to include http.Server.
|
||||
New(addr string, handler http.Handler) HTTPServer
|
||||
}
|
||||
|
||||
// HTTPServer allows for testing of Server.
|
||||
type HTTPServer interface {
|
||||
// Server is designed so that http.Server satisfies this interface,
|
||||
Serve(listener net.Listener) error
|
||||
}
|
||||
|
||||
// NewServer allocates a new healthcheck server manager. If either
|
||||
// of the injected arguments are nil, defaults will be used.
|
||||
func NewServer(hostname string, recorder record.EventRecorder, listener Listener, httpServerFactory HTTPServerFactory) Server {
|
||||
if listener == nil {
|
||||
listener = stdNetListener{}
|
||||
}
|
||||
if httpServerFactory == nil {
|
||||
httpServerFactory = stdHTTPServerFactory{}
|
||||
}
|
||||
func newServiceHealthServer(hostname string, recorder record.EventRecorder, listener listener, factory httpServerFactory) ServiceHealthServer {
|
||||
return &server{
|
||||
hostname: hostname,
|
||||
recorder: recorder,
|
||||
listener: listener,
|
||||
httpFactory: httpServerFactory,
|
||||
httpFactory: factory,
|
||||
services: map[types.NamespacedName]*hcInstance{},
|
||||
}
|
||||
}
|
||||
|
||||
// Implement Listener in terms of net.Listen.
|
||||
type stdNetListener struct{}
|
||||
|
||||
func (stdNetListener) Listen(addr string) (net.Listener, error) {
|
||||
return net.Listen("tcp", addr)
|
||||
// NewServiceHealthServer allocates a new service healthcheck server manager
|
||||
func NewServiceHealthServer(hostname string, recorder record.EventRecorder) ServiceHealthServer {
|
||||
return newServiceHealthServer(hostname, recorder, stdNetListener{}, stdHTTPServerFactory{})
|
||||
}
|
||||
|
||||
var _ Listener = stdNetListener{}
|
||||
|
||||
// Implement HTTPServerFactory in terms of http.Server.
|
||||
type stdHTTPServerFactory struct{}
|
||||
|
||||
func (stdHTTPServerFactory) New(addr string, handler http.Handler) HTTPServer {
|
||||
return &http.Server{
|
||||
Addr: addr,
|
||||
Handler: handler,
|
||||
}
|
||||
}
|
||||
|
||||
var _ HTTPServerFactory = stdHTTPServerFactory{}
|
||||
|
||||
type server struct {
|
||||
hostname string
|
||||
recorder record.EventRecorder // can be nil
|
||||
listener Listener
|
||||
httpFactory HTTPServerFactory
|
||||
listener listener
|
||||
httpFactory httpServerFactory
|
||||
|
||||
lock sync.RWMutex
|
||||
services map[types.NamespacedName]*hcInstance
|
||||
@ -187,7 +134,7 @@ func (hcs *server) SyncServices(newServices map[types.NamespacedName]uint16) err
|
||||
type hcInstance struct {
|
||||
port uint16
|
||||
listener net.Listener
|
||||
server HTTPServer
|
||||
server httpServer
|
||||
endpoints int // number of local endpoints for a service
|
||||
}
|
||||
|
||||
@ -247,103 +194,20 @@ func (hcs *server) SyncEndpoints(newEndpoints map[types.NamespacedName]int) erro
|
||||
return nil
|
||||
}
|
||||
|
||||
// HealthzUpdater allows callers to update healthz timestamp only.
|
||||
type HealthzUpdater interface {
|
||||
UpdateTimestamp()
|
||||
// FakeServiceHealthServer is a fake ServiceHealthServer for test programs
|
||||
type FakeServiceHealthServer struct{}
|
||||
|
||||
// NewFakeServiceHealthServer allocates a new fake service healthcheck server manager
|
||||
func NewFakeServiceHealthServer() ServiceHealthServer {
|
||||
return FakeServiceHealthServer{}
|
||||
}
|
||||
|
||||
// HealthzServer returns 200 "OK" by default. Once timestamp has been
|
||||
// updated, it verifies we don't exceed max no respond duration since
|
||||
// last update.
|
||||
type HealthzServer struct {
|
||||
listener Listener
|
||||
httpFactory HTTPServerFactory
|
||||
clock clock.Clock
|
||||
|
||||
addr string
|
||||
port int32
|
||||
healthTimeout time.Duration
|
||||
recorder record.EventRecorder
|
||||
nodeRef *v1.ObjectReference
|
||||
|
||||
lastUpdated atomic.Value
|
||||
// SyncServices is part of ServiceHealthServer
|
||||
func (fake FakeServiceHealthServer) SyncServices(_ map[types.NamespacedName]uint16) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
// NewDefaultHealthzServer returns a default healthz http server.
|
||||
func NewDefaultHealthzServer(addr string, healthTimeout time.Duration, recorder record.EventRecorder, nodeRef *v1.ObjectReference) *HealthzServer {
|
||||
return newHealthzServer(nil, nil, nil, addr, healthTimeout, recorder, nodeRef)
|
||||
}
|
||||
|
||||
func newHealthzServer(listener Listener, httpServerFactory HTTPServerFactory, c clock.Clock, addr string, healthTimeout time.Duration, recorder record.EventRecorder, nodeRef *v1.ObjectReference) *HealthzServer {
|
||||
if listener == nil {
|
||||
listener = stdNetListener{}
|
||||
}
|
||||
if httpServerFactory == nil {
|
||||
httpServerFactory = stdHTTPServerFactory{}
|
||||
}
|
||||
if c == nil {
|
||||
c = clock.RealClock{}
|
||||
}
|
||||
return &HealthzServer{
|
||||
listener: listener,
|
||||
httpFactory: httpServerFactory,
|
||||
clock: c,
|
||||
addr: addr,
|
||||
healthTimeout: healthTimeout,
|
||||
recorder: recorder,
|
||||
nodeRef: nodeRef,
|
||||
}
|
||||
}
|
||||
|
||||
// UpdateTimestamp updates the lastUpdated timestamp.
|
||||
func (hs *HealthzServer) UpdateTimestamp() {
|
||||
hs.lastUpdated.Store(hs.clock.Now())
|
||||
}
|
||||
|
||||
// Run starts the healthz http server and returns.
|
||||
func (hs *HealthzServer) Run() {
|
||||
serveMux := http.NewServeMux()
|
||||
serveMux.Handle("/healthz", healthzHandler{hs: hs})
|
||||
server := hs.httpFactory.New(hs.addr, serveMux)
|
||||
|
||||
go wait.Until(func() {
|
||||
klog.V(3).Infof("Starting goroutine for healthz on %s", hs.addr)
|
||||
|
||||
listener, err := hs.listener.Listen(hs.addr)
|
||||
if err != nil {
|
||||
msg := fmt.Sprintf("Failed to start node healthz on %s: %v", hs.addr, err)
|
||||
if hs.recorder != nil {
|
||||
hs.recorder.Eventf(hs.nodeRef, api.EventTypeWarning, "FailedToStartNodeHealthcheck", msg)
|
||||
}
|
||||
klog.Error(msg)
|
||||
return
|
||||
}
|
||||
|
||||
if err := server.Serve(listener); err != nil {
|
||||
klog.Errorf("Healthz closed with error: %v", err)
|
||||
return
|
||||
}
|
||||
klog.Error("Unexpected healthz closed.")
|
||||
}, nodeHealthzRetryInterval, wait.NeverStop)
|
||||
}
|
||||
|
||||
type healthzHandler struct {
|
||||
hs *HealthzServer
|
||||
}
|
||||
|
||||
func (h healthzHandler) ServeHTTP(resp http.ResponseWriter, req *http.Request) {
|
||||
lastUpdated := time.Time{}
|
||||
if val := h.hs.lastUpdated.Load(); val != nil {
|
||||
lastUpdated = val.(time.Time)
|
||||
}
|
||||
currentTime := h.hs.clock.Now()
|
||||
|
||||
resp.Header().Set("Content-Type", "application/json")
|
||||
resp.Header().Set("X-Content-Type-Options", "nosniff")
|
||||
if !lastUpdated.IsZero() && currentTime.After(lastUpdated.Add(h.hs.healthTimeout)) {
|
||||
resp.WriteHeader(http.StatusServiceUnavailable)
|
||||
} else {
|
||||
resp.WriteHeader(http.StatusOK)
|
||||
}
|
||||
fmt.Fprintf(resp, fmt.Sprintf(`{"lastUpdated": %q,"currentTime": %q}`, lastUpdated, currentTime))
|
||||
// SyncEndpoints is part of ServiceHealthServer
|
||||
func (fake FakeServiceHealthServer) SyncEndpoints(_ map[types.NamespacedName]int) error {
|
||||
return nil
|
||||
}
|
@ -38,6 +38,7 @@ go_test(
|
||||
embed = [":go_default_library"],
|
||||
deps = [
|
||||
"//pkg/proxy:go_default_library",
|
||||
"//pkg/proxy/healthcheck:go_default_library",
|
||||
"//pkg/proxy/util:go_default_library",
|
||||
"//pkg/proxy/util/testing:go_default_library",
|
||||
"//pkg/util/async:go_default_library",
|
||||
|
@ -200,8 +200,9 @@ type Proxier struct {
|
||||
nodeIP net.IP
|
||||
portMapper utilproxy.PortOpener
|
||||
recorder record.EventRecorder
|
||||
healthChecker healthcheck.Server
|
||||
healthzServer healthcheck.HealthzUpdater
|
||||
|
||||
serviceHealthServer healthcheck.ServiceHealthServer
|
||||
healthzServer healthcheck.ProxierHealthUpdater
|
||||
|
||||
// Since converting probabilities (floats) to strings is expensive
|
||||
// and we are using only probabilities in the format of 1/n, we are
|
||||
@ -257,7 +258,7 @@ func NewProxier(ipt utiliptables.Interface,
|
||||
hostname string,
|
||||
nodeIP net.IP,
|
||||
recorder record.EventRecorder,
|
||||
healthzServer healthcheck.HealthzUpdater,
|
||||
healthzServer healthcheck.ProxierHealthUpdater,
|
||||
nodePortAddresses []string,
|
||||
) (*Proxier, error) {
|
||||
// Set the route_localnet sysctl we need for
|
||||
@ -291,7 +292,7 @@ func NewProxier(ipt utiliptables.Interface,
|
||||
|
||||
endpointSlicesEnabled := utilfeature.DefaultFeatureGate.Enabled(features.EndpointSlice)
|
||||
|
||||
healthChecker := healthcheck.NewServer(hostname, recorder, nil, nil) // use default implementations of deps
|
||||
serviceHealthServer := healthcheck.NewServiceHealthServer(hostname, recorder)
|
||||
|
||||
isIPv6 := ipt.IsIpv6()
|
||||
proxier := &Proxier{
|
||||
@ -309,7 +310,7 @@ func NewProxier(ipt utiliptables.Interface,
|
||||
nodeIP: nodeIP,
|
||||
portMapper: &listenPortOpener{},
|
||||
recorder: recorder,
|
||||
healthChecker: healthChecker,
|
||||
serviceHealthServer: serviceHealthServer,
|
||||
healthzServer: healthzServer,
|
||||
precomputedProbabilities: make([]string, 0, 1001),
|
||||
iptablesData: bytes.NewBuffer(nil),
|
||||
@ -1449,19 +1450,18 @@ func (proxier *Proxier) syncProxyRules() {
|
||||
}
|
||||
proxier.portsMap = replacementPortsMap
|
||||
|
||||
// Update healthz timestamp.
|
||||
if proxier.healthzServer != nil {
|
||||
proxier.healthzServer.UpdateTimestamp()
|
||||
}
|
||||
metrics.SyncProxyRulesLastTimestamp.SetToCurrentTime()
|
||||
|
||||
// Update healthchecks. The endpoints list might include services that are
|
||||
// not "OnlyLocal", but the services list will not, and the healthChecker
|
||||
// Update service healthchecks. The endpoints list might include services that are
|
||||
// not "OnlyLocal", but the services list will not, and the serviceHealthServer
|
||||
// will just drop those endpoints.
|
||||
if err := proxier.healthChecker.SyncServices(serviceUpdateResult.HCServiceNodePorts); err != nil {
|
||||
if err := proxier.serviceHealthServer.SyncServices(serviceUpdateResult.HCServiceNodePorts); err != nil {
|
||||
klog.Errorf("Error syncing healthcheck services: %v", err)
|
||||
}
|
||||
if err := proxier.healthChecker.SyncEndpoints(endpointUpdateResult.HCEndpointsLocalIPSize); err != nil {
|
||||
if err := proxier.serviceHealthServer.SyncEndpoints(endpointUpdateResult.HCEndpointsLocalIPSize); err != nil {
|
||||
klog.Errorf("Error syncing healthcheck endpoints: %v", err)
|
||||
}
|
||||
|
||||
|
@ -35,6 +35,7 @@ import (
|
||||
"k8s.io/apimachinery/pkg/types"
|
||||
"k8s.io/apimachinery/pkg/util/intstr"
|
||||
"k8s.io/kubernetes/pkg/proxy"
|
||||
"k8s.io/kubernetes/pkg/proxy/healthcheck"
|
||||
utilproxy "k8s.io/kubernetes/pkg/proxy/util"
|
||||
utilproxytest "k8s.io/kubernetes/pkg/proxy/util/testing"
|
||||
"k8s.io/kubernetes/pkg/util/async"
|
||||
@ -342,28 +343,6 @@ func (f *fakePortOpener) OpenLocalPort(lp *utilproxy.LocalPort) (utilproxy.Close
|
||||
return nil, nil
|
||||
}
|
||||
|
||||
type fakeHealthChecker struct {
|
||||
services map[types.NamespacedName]uint16
|
||||
endpoints map[types.NamespacedName]int
|
||||
}
|
||||
|
||||
func newFakeHealthChecker() *fakeHealthChecker {
|
||||
return &fakeHealthChecker{
|
||||
services: map[types.NamespacedName]uint16{},
|
||||
endpoints: map[types.NamespacedName]int{},
|
||||
}
|
||||
}
|
||||
|
||||
func (fake *fakeHealthChecker) SyncServices(newServices map[types.NamespacedName]uint16) error {
|
||||
fake.services = newServices
|
||||
return nil
|
||||
}
|
||||
|
||||
func (fake *fakeHealthChecker) SyncEndpoints(newEndpoints map[types.NamespacedName]int) error {
|
||||
fake.endpoints = newEndpoints
|
||||
return nil
|
||||
}
|
||||
|
||||
const testHostname = "test-hostname"
|
||||
|
||||
func NewFakeProxier(ipt utiliptables.Interface, endpointSlicesEnabled bool) *Proxier {
|
||||
@ -380,7 +359,7 @@ func NewFakeProxier(ipt utiliptables.Interface, endpointSlicesEnabled bool) *Pro
|
||||
hostname: testHostname,
|
||||
portsMap: make(map[utilproxy.LocalPort]utilproxy.Closeable),
|
||||
portMapper: &fakePortOpener{[]*utilproxy.LocalPort{}},
|
||||
healthChecker: newFakeHealthChecker(),
|
||||
serviceHealthServer: healthcheck.NewFakeServiceHealthServer(),
|
||||
precomputedProbabilities: make([]string, 0, 1001),
|
||||
iptablesData: bytes.NewBuffer(nil),
|
||||
existingFilterChainsData: bytes.NewBuffer(nil),
|
||||
|
@ -16,6 +16,7 @@ go_test(
|
||||
embed = [":go_default_library"],
|
||||
deps = [
|
||||
"//pkg/proxy:go_default_library",
|
||||
"//pkg/proxy/healthcheck:go_default_library",
|
||||
"//pkg/proxy/ipvs/testing:go_default_library",
|
||||
"//pkg/proxy/util:go_default_library",
|
||||
"//pkg/proxy/util/testing:go_default_library",
|
||||
|
@ -227,9 +227,11 @@ type Proxier struct {
|
||||
nodeIP net.IP
|
||||
portMapper utilproxy.PortOpener
|
||||
recorder record.EventRecorder
|
||||
healthChecker healthcheck.Server
|
||||
healthzServer healthcheck.HealthzUpdater
|
||||
ipvsScheduler string
|
||||
|
||||
serviceHealthServer healthcheck.ServiceHealthServer
|
||||
healthzServer healthcheck.ProxierHealthUpdater
|
||||
|
||||
ipvsScheduler string
|
||||
// Added as a member to the struct to allow injection for testing.
|
||||
ipGetter IPGetter
|
||||
// The following buffers are used to reuse memory and avoid allocations
|
||||
@ -328,7 +330,7 @@ func NewProxier(ipt utiliptables.Interface,
|
||||
hostname string,
|
||||
nodeIP net.IP,
|
||||
recorder record.EventRecorder,
|
||||
healthzServer healthcheck.HealthzUpdater,
|
||||
healthzServer healthcheck.ProxierHealthUpdater,
|
||||
scheduler string,
|
||||
nodePortAddresses []string,
|
||||
) (*Proxier, error) {
|
||||
@ -421,7 +423,7 @@ func NewProxier(ipt utiliptables.Interface,
|
||||
scheduler = DefaultScheduler
|
||||
}
|
||||
|
||||
healthChecker := healthcheck.NewServer(hostname, recorder, nil, nil) // use default implementations of deps
|
||||
serviceHealthServer := healthcheck.NewServiceHealthServer(hostname, recorder)
|
||||
|
||||
endpointSlicesEnabled := utilfeature.DefaultFeatureGate.Enabled(features.EndpointSlice)
|
||||
|
||||
@ -443,7 +445,7 @@ func NewProxier(ipt utiliptables.Interface,
|
||||
nodeIP: nodeIP,
|
||||
portMapper: &listenPortOpener{},
|
||||
recorder: recorder,
|
||||
healthChecker: healthChecker,
|
||||
serviceHealthServer: serviceHealthServer,
|
||||
healthzServer: healthzServer,
|
||||
ipvs: ipvs,
|
||||
ipvsScheduler: scheduler,
|
||||
@ -489,7 +491,7 @@ func NewDualStackProxier(
|
||||
hostname string,
|
||||
nodeIP [2]net.IP,
|
||||
recorder record.EventRecorder,
|
||||
healthzServer healthcheck.HealthzUpdater,
|
||||
healthzServer healthcheck.ProxierHealthUpdater,
|
||||
scheduler string,
|
||||
nodePortAddresses []string,
|
||||
) (proxy.Provider, error) {
|
||||
@ -1489,19 +1491,18 @@ func (proxier *Proxier) syncProxyRules() {
|
||||
}
|
||||
proxier.cleanLegacyService(activeIPVSServices, currentIPVSServices, legacyBindAddrs)
|
||||
|
||||
// Update healthz timestamp
|
||||
if proxier.healthzServer != nil {
|
||||
proxier.healthzServer.UpdateTimestamp()
|
||||
}
|
||||
metrics.SyncProxyRulesLastTimestamp.SetToCurrentTime()
|
||||
|
||||
// Update healthchecks. The endpoints list might include services that are
|
||||
// not "OnlyLocal", but the services list will not, and the healthChecker
|
||||
// Update service healthchecks. The endpoints list might include services that are
|
||||
// not "OnlyLocal", but the services list will not, and the serviceHealthServer
|
||||
// will just drop those endpoints.
|
||||
if err := proxier.healthChecker.SyncServices(serviceUpdateResult.HCServiceNodePorts); err != nil {
|
||||
if err := proxier.serviceHealthServer.SyncServices(serviceUpdateResult.HCServiceNodePorts); err != nil {
|
||||
klog.Errorf("Error syncing healthcheck services: %v", err)
|
||||
}
|
||||
if err := proxier.healthChecker.SyncEndpoints(endpointUpdateResult.HCEndpointsLocalIPSize); err != nil {
|
||||
if err := proxier.serviceHealthServer.SyncEndpoints(endpointUpdateResult.HCEndpointsLocalIPSize); err != nil {
|
||||
klog.Errorf("Error syncing healthcheck endpoints: %v", err)
|
||||
}
|
||||
|
||||
|
@ -34,6 +34,7 @@ import (
|
||||
"k8s.io/apimachinery/pkg/util/intstr"
|
||||
"k8s.io/apimachinery/pkg/util/sets"
|
||||
"k8s.io/kubernetes/pkg/proxy"
|
||||
"k8s.io/kubernetes/pkg/proxy/healthcheck"
|
||||
netlinktest "k8s.io/kubernetes/pkg/proxy/ipvs/testing"
|
||||
utilproxy "k8s.io/kubernetes/pkg/proxy/util"
|
||||
proxyutiltest "k8s.io/kubernetes/pkg/proxy/util/testing"
|
||||
@ -59,18 +60,6 @@ func (f *fakeIPGetter) NodeIPs() ([]net.IP, error) {
|
||||
return f.nodeIPs, nil
|
||||
}
|
||||
|
||||
type fakeHealthChecker struct {
|
||||
services map[types.NamespacedName]uint16
|
||||
Endpoints map[types.NamespacedName]int
|
||||
}
|
||||
|
||||
func newFakeHealthChecker() *fakeHealthChecker {
|
||||
return &fakeHealthChecker{
|
||||
services: map[types.NamespacedName]uint16{},
|
||||
Endpoints: map[types.NamespacedName]int{},
|
||||
}
|
||||
}
|
||||
|
||||
// fakePortOpener implements portOpener.
|
||||
type fakePortOpener struct {
|
||||
openPorts []*utilproxy.LocalPort
|
||||
@ -83,16 +72,6 @@ func (f *fakePortOpener) OpenLocalPort(lp *utilproxy.LocalPort) (utilproxy.Close
|
||||
return nil, nil
|
||||
}
|
||||
|
||||
func (fake *fakeHealthChecker) SyncServices(newServices map[types.NamespacedName]uint16) error {
|
||||
fake.services = newServices
|
||||
return nil
|
||||
}
|
||||
|
||||
func (fake *fakeHealthChecker) SyncEndpoints(newEndpoints map[types.NamespacedName]int) error {
|
||||
fake.Endpoints = newEndpoints
|
||||
return nil
|
||||
}
|
||||
|
||||
// fakeKernelHandler implements KernelHandler.
|
||||
type fakeKernelHandler struct {
|
||||
modules []string
|
||||
@ -151,7 +130,7 @@ func NewFakeProxier(ipt utiliptables.Interface, ipvs utilipvs.Interface, ipset u
|
||||
hostname: testHostname,
|
||||
portsMap: make(map[utilproxy.LocalPort]utilproxy.Closeable),
|
||||
portMapper: &fakePortOpener{[]*utilproxy.LocalPort{}},
|
||||
healthChecker: newFakeHealthChecker(),
|
||||
serviceHealthServer: healthcheck.NewFakeServiceHealthServer(),
|
||||
ipvsScheduler: DefaultScheduler,
|
||||
ipGetter: &fakeIPGetter{nodeIPs: nodeIPs},
|
||||
iptablesData: bytes.NewBuffer(nil),
|
||||
|
@ -62,6 +62,7 @@ go_test(
|
||||
deps = select({
|
||||
"@io_bazel_rules_go//go/platform:windows": [
|
||||
"//pkg/proxy:go_default_library",
|
||||
"//pkg/proxy/healthcheck:go_default_library",
|
||||
"//staging/src/k8s.io/api/core/v1:go_default_library",
|
||||
"//staging/src/k8s.io/api/discovery/v1alpha1:go_default_library",
|
||||
"//staging/src/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library",
|
||||
|
@ -471,8 +471,9 @@ type Proxier struct {
|
||||
hostname string
|
||||
nodeIP net.IP
|
||||
recorder record.EventRecorder
|
||||
healthChecker healthcheck.Server
|
||||
healthzServer healthcheck.HealthzUpdater
|
||||
|
||||
serviceHealthServer healthcheck.ServiceHealthServer
|
||||
healthzServer healthcheck.ProxierHealthUpdater
|
||||
|
||||
// Since converting probabilities (floats) to strings is expensive
|
||||
// and we are using only probabilities in the format of 1/n, we are
|
||||
@ -527,7 +528,7 @@ func NewProxier(
|
||||
hostname string,
|
||||
nodeIP net.IP,
|
||||
recorder record.EventRecorder,
|
||||
healthzServer healthcheck.HealthzUpdater,
|
||||
healthzServer healthcheck.ProxierHealthUpdater,
|
||||
config config.KubeProxyWinkernelConfiguration,
|
||||
) (*Proxier, error) {
|
||||
masqueradeValue := 1 << uint(masqueradeBit)
|
||||
@ -542,7 +543,7 @@ func NewProxier(
|
||||
klog.Warningf("clusterCIDR not specified, unable to distinguish between internal and external traffic")
|
||||
}
|
||||
|
||||
healthChecker := healthcheck.NewServer(hostname, recorder, nil, nil) // use default implementations of deps
|
||||
serviceHealthServer := healthcheck.NewServiceHealthServer(hostname, recorder)
|
||||
var hns HostNetworkService
|
||||
hns = hnsV1{}
|
||||
supportedFeatures := hcn.GetSupportedFeatures()
|
||||
@ -622,24 +623,24 @@ func NewProxier(
|
||||
}
|
||||
|
||||
proxier := &Proxier{
|
||||
portsMap: make(map[localPort]closeable),
|
||||
serviceMap: make(proxyServiceMap),
|
||||
serviceChanges: newServiceChangeMap(),
|
||||
endpointsMap: make(proxyEndpointsMap),
|
||||
endpointsChanges: newEndpointsChangeMap(hostname),
|
||||
masqueradeAll: masqueradeAll,
|
||||
masqueradeMark: masqueradeMark,
|
||||
clusterCIDR: clusterCIDR,
|
||||
hostname: hostname,
|
||||
nodeIP: nodeIP,
|
||||
recorder: recorder,
|
||||
healthChecker: healthChecker,
|
||||
healthzServer: healthzServer,
|
||||
hns: hns,
|
||||
network: *hnsNetworkInfo,
|
||||
sourceVip: sourceVip,
|
||||
hostMac: hostMac,
|
||||
isDSR: isDSR,
|
||||
portsMap: make(map[localPort]closeable),
|
||||
serviceMap: make(proxyServiceMap),
|
||||
serviceChanges: newServiceChangeMap(),
|
||||
endpointsMap: make(proxyEndpointsMap),
|
||||
endpointsChanges: newEndpointsChangeMap(hostname),
|
||||
masqueradeAll: masqueradeAll,
|
||||
masqueradeMark: masqueradeMark,
|
||||
clusterCIDR: clusterCIDR,
|
||||
hostname: hostname,
|
||||
nodeIP: nodeIP,
|
||||
recorder: recorder,
|
||||
serviceHealthServer: serviceHealthServer,
|
||||
healthzServer: healthzServer,
|
||||
hns: hns,
|
||||
network: *hnsNetworkInfo,
|
||||
sourceVip: sourceVip,
|
||||
hostMac: hostMac,
|
||||
isDSR: isDSR,
|
||||
}
|
||||
|
||||
burstSyncs := 2
|
||||
@ -1276,19 +1277,18 @@ func (proxier *Proxier) syncProxyRules() {
|
||||
Log(svcInfo, "+++Policy Successfully applied for service +++", 2)
|
||||
}
|
||||
|
||||
// Update healthz timestamp.
|
||||
if proxier.healthzServer != nil {
|
||||
proxier.healthzServer.UpdateTimestamp()
|
||||
}
|
||||
SyncProxyRulesLastTimestamp.SetToCurrentTime()
|
||||
|
||||
// Update healthchecks. The endpoints list might include services that are
|
||||
// not "OnlyLocal", but the services list will not, and the healthChecker
|
||||
// Update service healthchecks. The endpoints list might include services that are
|
||||
// not "OnlyLocal", but the services list will not, and the serviceHealthServer
|
||||
// will just drop those endpoints.
|
||||
if err := proxier.healthChecker.SyncServices(serviceUpdateResult.hcServices); err != nil {
|
||||
if err := proxier.serviceHealthServer.SyncServices(serviceUpdateResult.hcServices); err != nil {
|
||||
klog.Errorf("Error syncing healthcheck services: %v", err)
|
||||
}
|
||||
if err := proxier.healthChecker.SyncEndpoints(endpointUpdateResult.hcEndpoints); err != nil {
|
||||
if err := proxier.serviceHealthServer.SyncEndpoints(endpointUpdateResult.hcEndpoints); err != nil {
|
||||
klog.Errorf("Error syncing healthcheck endpoints: %v", err)
|
||||
}
|
||||
|
||||
|
@ -23,6 +23,7 @@ import (
|
||||
discovery "k8s.io/api/discovery/v1alpha1"
|
||||
"k8s.io/apimachinery/pkg/types"
|
||||
"k8s.io/kubernetes/pkg/proxy"
|
||||
"k8s.io/kubernetes/pkg/proxy/healthcheck"
|
||||
|
||||
"net"
|
||||
"strings"
|
||||
@ -39,27 +40,6 @@ const destinationPrefix = "192.168.2.0/24"
|
||||
const providerAddress = "10.0.0.3"
|
||||
const guid = "123ABC"
|
||||
|
||||
type fakeHealthChecker struct {
|
||||
services map[types.NamespacedName]uint16
|
||||
endpoints map[types.NamespacedName]int
|
||||
}
|
||||
|
||||
func newFakeHealthChecker() *fakeHealthChecker {
|
||||
return &fakeHealthChecker{
|
||||
services: map[types.NamespacedName]uint16{},
|
||||
endpoints: map[types.NamespacedName]int{},
|
||||
}
|
||||
}
|
||||
func (fake *fakeHealthChecker) SyncServices(newServices map[types.NamespacedName]uint16) error {
|
||||
fake.services = newServices
|
||||
return nil
|
||||
}
|
||||
|
||||
func (fake *fakeHealthChecker) SyncEndpoints(newEndpoints map[types.NamespacedName]int) error {
|
||||
fake.endpoints = newEndpoints
|
||||
return nil
|
||||
}
|
||||
|
||||
type fakeHNS struct{}
|
||||
|
||||
func newFakeHNS() *fakeHNS {
|
||||
@ -126,20 +106,20 @@ func NewFakeProxier(syncPeriod time.Duration, minSyncPeriod time.Duration, clust
|
||||
networkType: networkType,
|
||||
}
|
||||
proxier := &Proxier{
|
||||
portsMap: make(map[localPort]closeable),
|
||||
serviceMap: make(proxyServiceMap),
|
||||
serviceChanges: newServiceChangeMap(),
|
||||
endpointsMap: make(proxyEndpointsMap),
|
||||
endpointsChanges: newEndpointsChangeMap(hostname),
|
||||
clusterCIDR: clusterCIDR,
|
||||
hostname: testHostName,
|
||||
nodeIP: nodeIP,
|
||||
healthChecker: newFakeHealthChecker(),
|
||||
network: *hnsNetworkInfo,
|
||||
sourceVip: sourceVip,
|
||||
hostMac: macAddress,
|
||||
isDSR: false,
|
||||
hns: newFakeHNS(),
|
||||
portsMap: make(map[localPort]closeable),
|
||||
serviceMap: make(proxyServiceMap),
|
||||
serviceChanges: newServiceChangeMap(),
|
||||
endpointsMap: make(proxyEndpointsMap),
|
||||
endpointsChanges: newEndpointsChangeMap(hostname),
|
||||
clusterCIDR: clusterCIDR,
|
||||
hostname: testHostName,
|
||||
nodeIP: nodeIP,
|
||||
serviceHealthServer: healthcheck.NewFakeServiceHealthServer(),
|
||||
network: *hnsNetworkInfo,
|
||||
sourceVip: sourceVip,
|
||||
hostMac: macAddress,
|
||||
isDSR: false,
|
||||
hns: newFakeHNS(),
|
||||
}
|
||||
return proxier
|
||||
}
|
||||
|
Loading…
Reference in New Issue
Block a user