mirror of
https://github.com/k3s-io/kubernetes.git
synced 2025-08-14 14:23:37 +00:00
Merge pull request #116470 from alexanderConstantinescu/kep-3836-impl
[Kube-proxy]: Implement KEP-3836
This commit is contained in:
commit
f34365789d
@ -27,6 +27,7 @@ import (
|
|||||||
"strings"
|
"strings"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
|
"k8s.io/kubernetes/pkg/features"
|
||||||
utilnode "k8s.io/kubernetes/pkg/util/node"
|
utilnode "k8s.io/kubernetes/pkg/util/node"
|
||||||
|
|
||||||
"github.com/fsnotify/fsnotify"
|
"github.com/fsnotify/fsnotify"
|
||||||
@ -877,6 +878,11 @@ func (s *ProxyServer) Run() error {
|
|||||||
if s.Config.DetectLocalMode == kubeproxyconfig.LocalModeNodeCIDR {
|
if s.Config.DetectLocalMode == kubeproxyconfig.LocalModeNodeCIDR {
|
||||||
nodeConfig.RegisterEventHandler(proxy.NewNodePodCIDRHandler(s.podCIDRs))
|
nodeConfig.RegisterEventHandler(proxy.NewNodePodCIDRHandler(s.podCIDRs))
|
||||||
}
|
}
|
||||||
|
if utilfeature.DefaultFeatureGate.Enabled(features.KubeProxyDrainingTerminatingNodes) {
|
||||||
|
nodeConfig.RegisterEventHandler(&proxy.NodeEligibleHandler{
|
||||||
|
HealthServer: s.HealthzServer,
|
||||||
|
})
|
||||||
|
}
|
||||||
nodeConfig.RegisterEventHandler(s.Proxier)
|
nodeConfig.RegisterEventHandler(s.Proxier)
|
||||||
|
|
||||||
go nodeConfig.Run(wait.NeverStop)
|
go nodeConfig.Run(wait.NeverStop)
|
||||||
|
@ -451,6 +451,14 @@ const (
|
|||||||
// Add support for distributed tracing in the kubelet
|
// Add support for distributed tracing in the kubelet
|
||||||
KubeletTracing featuregate.Feature = "KubeletTracing"
|
KubeletTracing featuregate.Feature = "KubeletTracing"
|
||||||
|
|
||||||
|
// owner: @alexanderConstantinescu
|
||||||
|
// kep: http://kep.k8s.io/3836
|
||||||
|
// alpha: v1.28
|
||||||
|
//
|
||||||
|
// Implement connection draining for terminating nodes for
|
||||||
|
// `externalTrafficPolicy: Cluster` services.
|
||||||
|
KubeProxyDrainingTerminatingNodes featuregate.Feature = "KubeProxyDrainingTerminatingNodes"
|
||||||
|
|
||||||
// owner: @zshihang
|
// owner: @zshihang
|
||||||
// kep: https://kep.k8s.io/2800
|
// kep: https://kep.k8s.io/2800
|
||||||
// beta: v1.24
|
// beta: v1.24
|
||||||
@ -1002,6 +1010,8 @@ var defaultKubernetesFeatureGates = map[featuregate.Feature]featuregate.FeatureS
|
|||||||
|
|
||||||
KubeletTracing: {Default: true, PreRelease: featuregate.Beta},
|
KubeletTracing: {Default: true, PreRelease: featuregate.Beta},
|
||||||
|
|
||||||
|
KubeProxyDrainingTerminatingNodes: {Default: false, PreRelease: featuregate.Alpha},
|
||||||
|
|
||||||
LegacyServiceAccountTokenNoAutoGeneration: {Default: true, PreRelease: featuregate.GA, LockToDefault: true}, // remove in 1.29
|
LegacyServiceAccountTokenNoAutoGeneration: {Default: true, PreRelease: featuregate.GA, LockToDefault: true}, // remove in 1.29
|
||||||
|
|
||||||
LegacyServiceAccountTokenTracking: {Default: true, PreRelease: featuregate.GA, LockToDefault: true}, // remove in 1.30
|
LegacyServiceAccountTokenTracking: {Default: true, PreRelease: featuregate.GA, LockToDefault: true}, // remove in 1.30
|
||||||
|
@ -27,10 +27,15 @@ import (
|
|||||||
|
|
||||||
"github.com/google/go-cmp/cmp"
|
"github.com/google/go-cmp/cmp"
|
||||||
"k8s.io/api/core/v1"
|
"k8s.io/api/core/v1"
|
||||||
|
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||||
|
"k8s.io/component-base/metrics/testutil"
|
||||||
|
|
||||||
"k8s.io/apimachinery/pkg/types"
|
"k8s.io/apimachinery/pkg/types"
|
||||||
"k8s.io/apimachinery/pkg/util/dump"
|
"k8s.io/apimachinery/pkg/util/dump"
|
||||||
"k8s.io/apimachinery/pkg/util/sets"
|
"k8s.io/apimachinery/pkg/util/sets"
|
||||||
|
|
||||||
|
basemetrics "k8s.io/component-base/metrics"
|
||||||
|
"k8s.io/kubernetes/pkg/proxy/metrics"
|
||||||
proxyutil "k8s.io/kubernetes/pkg/proxy/util"
|
proxyutil "k8s.io/kubernetes/pkg/proxy/util"
|
||||||
testingclock "k8s.io/utils/clock/testing"
|
testingclock "k8s.io/utils/clock/testing"
|
||||||
)
|
)
|
||||||
@ -131,6 +136,7 @@ type hcPayload struct {
|
|||||||
type healthzPayload struct {
|
type healthzPayload struct {
|
||||||
LastUpdated string
|
LastUpdated string
|
||||||
CurrentTime string
|
CurrentTime string
|
||||||
|
NodeHealthy bool
|
||||||
}
|
}
|
||||||
|
|
||||||
type fakeProxierHealthChecker struct {
|
type fakeProxierHealthChecker struct {
|
||||||
@ -427,7 +433,39 @@ func tHandler(hcs *server, nsn types.NamespacedName, status int, endpoints int,
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
type nodeTweak func(n *v1.Node)
|
||||||
|
|
||||||
|
func makeNode(tweaks ...nodeTweak) *v1.Node {
|
||||||
|
n := &v1.Node{}
|
||||||
|
for _, tw := range tweaks {
|
||||||
|
tw(n)
|
||||||
|
}
|
||||||
|
return n
|
||||||
|
}
|
||||||
|
|
||||||
|
func tweakDeleted() nodeTweak {
|
||||||
|
return func(n *v1.Node) {
|
||||||
|
n.DeletionTimestamp = &metav1.Time{
|
||||||
|
Time: time.Now(),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func tweakTainted(key string) nodeTweak {
|
||||||
|
return func(n *v1.Node) {
|
||||||
|
n.Spec.Taints = append(n.Spec.Taints, v1.Taint{Key: key})
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
type serverTest struct {
|
||||||
|
server httpServer
|
||||||
|
url url
|
||||||
|
tracking200 int
|
||||||
|
tracking503 int
|
||||||
|
}
|
||||||
|
|
||||||
func TestHealthzServer(t *testing.T) {
|
func TestHealthzServer(t *testing.T) {
|
||||||
|
metrics.RegisterMetrics()
|
||||||
listener := newFakeListener()
|
listener := newFakeListener()
|
||||||
httpFactory := newFakeHTTPServerFactory()
|
httpFactory := newFakeHTTPServerFactory()
|
||||||
fakeClock := testingclock.NewFakeClock(time.Now())
|
fakeClock := testingclock.NewFakeClock(time.Now())
|
||||||
@ -435,31 +473,115 @@ func TestHealthzServer(t *testing.T) {
|
|||||||
hs := newProxierHealthServer(listener, httpFactory, fakeClock, "127.0.0.1:10256", 10*time.Second, nil, nil)
|
hs := newProxierHealthServer(listener, httpFactory, fakeClock, "127.0.0.1:10256", 10*time.Second, nil, nil)
|
||||||
server := hs.httpFactory.New(hs.addr, healthzHandler{hs: hs})
|
server := hs.httpFactory.New(hs.addr, healthzHandler{hs: hs})
|
||||||
|
|
||||||
|
hsTest := &serverTest{
|
||||||
|
server: server,
|
||||||
|
url: healthzURL,
|
||||||
|
tracking200: 0,
|
||||||
|
tracking503: 0,
|
||||||
|
}
|
||||||
|
|
||||||
// Should return 200 "OK" by default.
|
// Should return 200 "OK" by default.
|
||||||
testHealthzHandler(server, http.StatusOK, t)
|
testHTTPHandler(hsTest, http.StatusOK, t)
|
||||||
|
|
||||||
// Should return 200 "OK" after first update
|
// Should return 200 "OK" after first update
|
||||||
hs.Updated()
|
hs.Updated()
|
||||||
testHealthzHandler(server, http.StatusOK, t)
|
testHTTPHandler(hsTest, http.StatusOK, t)
|
||||||
|
|
||||||
// Should continue to return 200 "OK" as long as no further updates are queued
|
// Should continue to return 200 "OK" as long as no further updates are queued
|
||||||
fakeClock.Step(25 * time.Second)
|
fakeClock.Step(25 * time.Second)
|
||||||
testHealthzHandler(server, http.StatusOK, t)
|
testHTTPHandler(hsTest, http.StatusOK, t)
|
||||||
|
|
||||||
// Should return 503 "ServiceUnavailable" if exceed max update-processing time
|
// Should return 503 "ServiceUnavailable" if exceed max update-processing time
|
||||||
hs.QueuedUpdate()
|
hs.QueuedUpdate()
|
||||||
fakeClock.Step(25 * time.Second)
|
fakeClock.Step(25 * time.Second)
|
||||||
testHealthzHandler(server, http.StatusServiceUnavailable, t)
|
testHTTPHandler(hsTest, http.StatusServiceUnavailable, t)
|
||||||
|
|
||||||
// Should return 200 "OK" after processing update
|
// Should return 200 "OK" after processing update
|
||||||
hs.Updated()
|
hs.Updated()
|
||||||
fakeClock.Step(5 * time.Second)
|
fakeClock.Step(5 * time.Second)
|
||||||
testHealthzHandler(server, http.StatusOK, t)
|
testHTTPHandler(hsTest, http.StatusOK, t)
|
||||||
|
|
||||||
|
// Should return 200 "OK" if we've synced a node, tainted in any other way
|
||||||
|
hs.SyncNode(makeNode(tweakTainted("other")))
|
||||||
|
testHTTPHandler(hsTest, http.StatusOK, t)
|
||||||
|
|
||||||
|
// Should return 503 "ServiceUnavailable" if we've synced a ToBeDeletedTaint node
|
||||||
|
hs.SyncNode(makeNode(tweakTainted(ToBeDeletedTaint)))
|
||||||
|
testHTTPHandler(hsTest, http.StatusServiceUnavailable, t)
|
||||||
|
|
||||||
|
// Should return 200 "OK" if we've synced a node, tainted in any other way
|
||||||
|
hs.SyncNode(makeNode(tweakTainted("other")))
|
||||||
|
testHTTPHandler(hsTest, http.StatusOK, t)
|
||||||
|
|
||||||
|
// Should return 503 "ServiceUnavailable" if we've synced a deleted node
|
||||||
|
hs.SyncNode(makeNode(tweakDeleted()))
|
||||||
|
testHTTPHandler(hsTest, http.StatusServiceUnavailable, t)
|
||||||
}
|
}
|
||||||
|
|
||||||
func testHealthzHandler(server httpServer, status int, t *testing.T) {
|
func TestLivezServer(t *testing.T) {
|
||||||
handler := server.(*fakeHTTPServer).handler
|
metrics.RegisterMetrics()
|
||||||
req, err := http.NewRequest("GET", "/healthz", nil)
|
listener := newFakeListener()
|
||||||
|
httpFactory := newFakeHTTPServerFactory()
|
||||||
|
fakeClock := testingclock.NewFakeClock(time.Now())
|
||||||
|
|
||||||
|
hs := newProxierHealthServer(listener, httpFactory, fakeClock, "127.0.0.1:10256", 10*time.Second, nil, nil)
|
||||||
|
server := hs.httpFactory.New(hs.addr, livezHandler{hs: hs})
|
||||||
|
|
||||||
|
hsTest := &serverTest{
|
||||||
|
server: server,
|
||||||
|
url: livezURL,
|
||||||
|
tracking200: 0,
|
||||||
|
tracking503: 0,
|
||||||
|
}
|
||||||
|
|
||||||
|
// Should return 200 "OK" by default.
|
||||||
|
testHTTPHandler(hsTest, http.StatusOK, t)
|
||||||
|
|
||||||
|
// Should return 200 "OK" after first update
|
||||||
|
hs.Updated()
|
||||||
|
testHTTPHandler(hsTest, http.StatusOK, t)
|
||||||
|
|
||||||
|
// Should continue to return 200 "OK" as long as no further updates are queued
|
||||||
|
fakeClock.Step(25 * time.Second)
|
||||||
|
testHTTPHandler(hsTest, http.StatusOK, t)
|
||||||
|
|
||||||
|
// Should return 503 "ServiceUnavailable" if exceed max update-processing time
|
||||||
|
hs.QueuedUpdate()
|
||||||
|
fakeClock.Step(25 * time.Second)
|
||||||
|
testHTTPHandler(hsTest, http.StatusServiceUnavailable, t)
|
||||||
|
|
||||||
|
// Should return 200 "OK" after processing update
|
||||||
|
hs.Updated()
|
||||||
|
fakeClock.Step(5 * time.Second)
|
||||||
|
testHTTPHandler(hsTest, http.StatusOK, t)
|
||||||
|
|
||||||
|
// Should return 200 "OK" irrespective of node syncs
|
||||||
|
hs.SyncNode(makeNode(tweakTainted("other")))
|
||||||
|
testHTTPHandler(hsTest, http.StatusOK, t)
|
||||||
|
|
||||||
|
// Should return 200 "OK" irrespective of node syncs
|
||||||
|
hs.SyncNode(makeNode(tweakTainted(ToBeDeletedTaint)))
|
||||||
|
testHTTPHandler(hsTest, http.StatusOK, t)
|
||||||
|
|
||||||
|
// Should return 200 "OK" irrespective of node syncs
|
||||||
|
hs.SyncNode(makeNode(tweakTainted("other")))
|
||||||
|
testHTTPHandler(hsTest, http.StatusOK, t)
|
||||||
|
|
||||||
|
// Should return 200 "OK" irrespective of node syncs
|
||||||
|
hs.SyncNode(makeNode(tweakDeleted()))
|
||||||
|
testHTTPHandler(hsTest, http.StatusOK, t)
|
||||||
|
}
|
||||||
|
|
||||||
|
type url string
|
||||||
|
|
||||||
|
var (
|
||||||
|
healthzURL url = "/healthz"
|
||||||
|
livezURL url = "/livez"
|
||||||
|
)
|
||||||
|
|
||||||
|
func testHTTPHandler(hsTest *serverTest, status int, t *testing.T) {
|
||||||
|
handler := hsTest.server.(*fakeHTTPServer).handler
|
||||||
|
req, err := http.NewRequest("GET", string(hsTest.url), nil)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatal(err)
|
t.Fatal(err)
|
||||||
}
|
}
|
||||||
@ -474,6 +596,31 @@ func testHealthzHandler(server httpServer, status int, t *testing.T) {
|
|||||||
if err := json.Unmarshal(resp.Body.Bytes(), &payload); err != nil {
|
if err := json.Unmarshal(resp.Body.Bytes(), &payload); err != nil {
|
||||||
t.Fatal(err)
|
t.Fatal(err)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if status == http.StatusOK {
|
||||||
|
hsTest.tracking200++
|
||||||
|
}
|
||||||
|
if status == http.StatusServiceUnavailable {
|
||||||
|
hsTest.tracking503++
|
||||||
|
}
|
||||||
|
if hsTest.url == healthzURL {
|
||||||
|
testMetricEquals(metrics.ProxyHealthz200Total, float64(hsTest.tracking200), t)
|
||||||
|
testMetricEquals(metrics.ProxyHealthz503Total, float64(hsTest.tracking503), t)
|
||||||
|
}
|
||||||
|
if hsTest.url == livezURL {
|
||||||
|
testMetricEquals(metrics.ProxyLivez200Total, float64(hsTest.tracking200), t)
|
||||||
|
testMetricEquals(metrics.ProxyLivez503Total, float64(hsTest.tracking503), t)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func testMetricEquals(metric *basemetrics.Counter, expected float64, t *testing.T) {
|
||||||
|
val, err := testutil.GetCounterMetricValue(metric)
|
||||||
|
if err != nil {
|
||||||
|
t.Errorf("unable to retrieve value for metric: %s, err: %v", metric.Name, err)
|
||||||
|
}
|
||||||
|
if val != expected {
|
||||||
|
t.Errorf("unexpected metric: %s, expected: %v, found: %v", metric.Name, expected, val)
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestServerWithSelectiveListeningAddress(t *testing.T) {
|
func TestServerWithSelectiveListeningAddress(t *testing.T) {
|
||||||
|
@ -26,9 +26,16 @@ import (
|
|||||||
"k8s.io/client-go/tools/events"
|
"k8s.io/client-go/tools/events"
|
||||||
"k8s.io/klog/v2"
|
"k8s.io/klog/v2"
|
||||||
api "k8s.io/kubernetes/pkg/apis/core"
|
api "k8s.io/kubernetes/pkg/apis/core"
|
||||||
|
"k8s.io/kubernetes/pkg/proxy/metrics"
|
||||||
"k8s.io/utils/clock"
|
"k8s.io/utils/clock"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
const (
|
||||||
|
// ToBeDeletedTaint is a taint used by the CLuster Autoscaler before marking a node for deletion. Defined in
|
||||||
|
// https://github.com/kubernetes/autoscaler/blob/e80ab518340f88f364fe3ef063f8303755125971/cluster-autoscaler/utils/deletetaint/delete.go#L36
|
||||||
|
ToBeDeletedTaint = "ToBeDeletedByClusterAutoscaler"
|
||||||
|
)
|
||||||
|
|
||||||
// ProxierHealthUpdater allows callers to update healthz timestamp only.
|
// ProxierHealthUpdater allows callers to update healthz timestamp only.
|
||||||
type ProxierHealthUpdater interface {
|
type ProxierHealthUpdater interface {
|
||||||
// QueuedUpdate should be called when the proxier receives a Service or Endpoints
|
// QueuedUpdate should be called when the proxier receives a Service or Endpoints
|
||||||
@ -42,6 +49,10 @@ type ProxierHealthUpdater interface {
|
|||||||
// Run starts the healthz HTTP server and blocks until it exits.
|
// Run starts the healthz HTTP server and blocks until it exits.
|
||||||
Run() error
|
Run() error
|
||||||
|
|
||||||
|
// Sync the node and determine if its eligible or not. Eligible is
|
||||||
|
// defined as being: not tainted by ToBeDeletedTaint and not deleted.
|
||||||
|
SyncNode(node *v1.Node)
|
||||||
|
|
||||||
proxierHealthChecker
|
proxierHealthChecker
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -62,6 +73,7 @@ type proxierHealthServer struct {
|
|||||||
|
|
||||||
lastUpdated atomic.Value
|
lastUpdated atomic.Value
|
||||||
oldestPendingQueued atomic.Value
|
oldestPendingQueued atomic.Value
|
||||||
|
nodeEligible atomic.Bool
|
||||||
}
|
}
|
||||||
|
|
||||||
// NewProxierHealthServer returns a proxier health http server.
|
// NewProxierHealthServer returns a proxier health http server.
|
||||||
@ -70,7 +82,7 @@ func NewProxierHealthServer(addr string, healthTimeout time.Duration, recorder e
|
|||||||
}
|
}
|
||||||
|
|
||||||
func newProxierHealthServer(listener listener, httpServerFactory httpServerFactory, c clock.Clock, addr string, healthTimeout time.Duration, recorder events.EventRecorder, nodeRef *v1.ObjectReference) *proxierHealthServer {
|
func newProxierHealthServer(listener listener, httpServerFactory httpServerFactory, c clock.Clock, addr string, healthTimeout time.Duration, recorder events.EventRecorder, nodeRef *v1.ObjectReference) *proxierHealthServer {
|
||||||
return &proxierHealthServer{
|
hs := &proxierHealthServer{
|
||||||
listener: listener,
|
listener: listener,
|
||||||
httpFactory: httpServerFactory,
|
httpFactory: httpServerFactory,
|
||||||
clock: c,
|
clock: c,
|
||||||
@ -79,6 +91,11 @@ func newProxierHealthServer(listener listener, httpServerFactory httpServerFacto
|
|||||||
recorder: recorder,
|
recorder: recorder,
|
||||||
nodeRef: nodeRef,
|
nodeRef: nodeRef,
|
||||||
}
|
}
|
||||||
|
// The node is eligible (and thus the proxy healthy) while it's starting up
|
||||||
|
// and until we've processed the first node event that indicates the
|
||||||
|
// contrary.
|
||||||
|
hs.nodeEligible.Store(true)
|
||||||
|
return hs
|
||||||
}
|
}
|
||||||
|
|
||||||
// Updated indicates that kube-proxy has successfully updated its backend, so it should
|
// Updated indicates that kube-proxy has successfully updated its backend, so it should
|
||||||
@ -96,8 +113,8 @@ func (hs *proxierHealthServer) QueuedUpdate() {
|
|||||||
hs.oldestPendingQueued.CompareAndSwap(zeroTime, hs.clock.Now())
|
hs.oldestPendingQueued.CompareAndSwap(zeroTime, hs.clock.Now())
|
||||||
}
|
}
|
||||||
|
|
||||||
// IsHealthy returns the proxier's health state, following the same definition
|
// IsHealthy returns only the proxier's health state, following the same
|
||||||
// the HTTP server defines.
|
// definition the HTTP server defines, but ignoring the state of the Node.
|
||||||
func (hs *proxierHealthServer) IsHealthy() bool {
|
func (hs *proxierHealthServer) IsHealthy() bool {
|
||||||
isHealthy, _, _ := hs.isHealthy()
|
isHealthy, _, _ := hs.isHealthy()
|
||||||
return isHealthy
|
return isHealthy
|
||||||
@ -123,14 +140,28 @@ func (hs *proxierHealthServer) isHealthy() (bool, time.Time, time.Time) {
|
|||||||
// There's an unprocessed update queued, but it's not late yet
|
// There's an unprocessed update queued, but it's not late yet
|
||||||
healthy = true
|
healthy = true
|
||||||
}
|
}
|
||||||
|
|
||||||
return healthy, lastUpdated, currentTime
|
return healthy, lastUpdated, currentTime
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (hs *proxierHealthServer) SyncNode(node *v1.Node) {
|
||||||
|
if !node.DeletionTimestamp.IsZero() {
|
||||||
|
hs.nodeEligible.Store(false)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
for _, taint := range node.Spec.Taints {
|
||||||
|
if taint.Key == ToBeDeletedTaint {
|
||||||
|
hs.nodeEligible.Store(false)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
}
|
||||||
|
hs.nodeEligible.Store(true)
|
||||||
|
}
|
||||||
|
|
||||||
// Run starts the healthz HTTP server and blocks until it exits.
|
// Run starts the healthz HTTP server and blocks until it exits.
|
||||||
func (hs *proxierHealthServer) Run() error {
|
func (hs *proxierHealthServer) Run() error {
|
||||||
serveMux := http.NewServeMux()
|
serveMux := http.NewServeMux()
|
||||||
serveMux.Handle("/healthz", healthzHandler{hs: hs})
|
serveMux.Handle("/healthz", healthzHandler{hs: hs})
|
||||||
|
serveMux.Handle("/livez", livezHandler{hs: hs})
|
||||||
server := hs.httpFactory.New(hs.addr, serveMux)
|
server := hs.httpFactory.New(hs.addr, serveMux)
|
||||||
|
|
||||||
listener, err := hs.listener.Listen(hs.addr)
|
listener, err := hs.listener.Listen(hs.addr)
|
||||||
@ -156,12 +187,40 @@ type healthzHandler struct {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (h healthzHandler) ServeHTTP(resp http.ResponseWriter, req *http.Request) {
|
func (h healthzHandler) ServeHTTP(resp http.ResponseWriter, req *http.Request) {
|
||||||
|
nodeEligible := h.hs.nodeEligible.Load()
|
||||||
|
healthy, lastUpdated, currentTime := h.hs.isHealthy()
|
||||||
|
healthy = healthy && nodeEligible
|
||||||
|
resp.Header().Set("Content-Type", "application/json")
|
||||||
|
resp.Header().Set("X-Content-Type-Options", "nosniff")
|
||||||
|
if !healthy {
|
||||||
|
metrics.ProxyHealthz503Total.Inc()
|
||||||
|
resp.WriteHeader(http.StatusServiceUnavailable)
|
||||||
|
} else {
|
||||||
|
metrics.ProxyHealthz200Total.Inc()
|
||||||
|
resp.WriteHeader(http.StatusOK)
|
||||||
|
// In older releases, the returned "lastUpdated" time indicated the last
|
||||||
|
// time the proxier sync loop ran, even if nothing had changed. To
|
||||||
|
// preserve compatibility, we use the same semantics: the returned
|
||||||
|
// lastUpdated value is "recent" if the server is healthy. The kube-proxy
|
||||||
|
// metrics provide more detailed information.
|
||||||
|
lastUpdated = currentTime
|
||||||
|
}
|
||||||
|
fmt.Fprintf(resp, `{"lastUpdated": %q,"currentTime": %q, "nodeEligible": %v}`, lastUpdated, currentTime, nodeEligible)
|
||||||
|
}
|
||||||
|
|
||||||
|
type livezHandler struct {
|
||||||
|
hs *proxierHealthServer
|
||||||
|
}
|
||||||
|
|
||||||
|
func (h livezHandler) ServeHTTP(resp http.ResponseWriter, req *http.Request) {
|
||||||
healthy, lastUpdated, currentTime := h.hs.isHealthy()
|
healthy, lastUpdated, currentTime := h.hs.isHealthy()
|
||||||
resp.Header().Set("Content-Type", "application/json")
|
resp.Header().Set("Content-Type", "application/json")
|
||||||
resp.Header().Set("X-Content-Type-Options", "nosniff")
|
resp.Header().Set("X-Content-Type-Options", "nosniff")
|
||||||
if !healthy {
|
if !healthy {
|
||||||
|
metrics.ProxyLivez503Total.Inc()
|
||||||
resp.WriteHeader(http.StatusServiceUnavailable)
|
resp.WriteHeader(http.StatusServiceUnavailable)
|
||||||
} else {
|
} else {
|
||||||
|
metrics.ProxyLivez200Total.Inc()
|
||||||
resp.WriteHeader(http.StatusOK)
|
resp.WriteHeader(http.StatusOK)
|
||||||
// In older releases, the returned "lastUpdated" time indicated the last
|
// In older releases, the returned "lastUpdated" time indicated the last
|
||||||
// time the proxier sync loop ran, even if nothing had changed. To
|
// time the proxier sync loop ran, even if nothing had changed. To
|
||||||
|
@ -652,6 +652,7 @@ func (proxier *Proxier) OnNodeDelete(node *v1.Node) {
|
|||||||
"eventNode", node.Name, "currentNode", proxier.hostname)
|
"eventNode", node.Name, "currentNode", proxier.hostname)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
proxier.mu.Lock()
|
proxier.mu.Lock()
|
||||||
proxier.nodeLabels = nil
|
proxier.nodeLabels = nil
|
||||||
proxier.needFullSync = true
|
proxier.needFullSync = true
|
||||||
|
@ -902,6 +902,7 @@ func (proxier *Proxier) OnNodeDelete(node *v1.Node) {
|
|||||||
klog.ErrorS(nil, "Received a watch event for a node that doesn't match the current node", "eventNode", node.Name, "currentNode", proxier.hostname)
|
klog.ErrorS(nil, "Received a watch event for a node that doesn't match the current node", "eventNode", node.Name, "currentNode", proxier.hostname)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
proxier.mu.Lock()
|
proxier.mu.Lock()
|
||||||
proxier.nodeLabels = nil
|
proxier.nodeLabels = nil
|
||||||
proxier.mu.Unlock()
|
proxier.mu.Unlock()
|
||||||
|
@ -184,6 +184,50 @@ var (
|
|||||||
[]string{"table"},
|
[]string{"table"},
|
||||||
)
|
)
|
||||||
|
|
||||||
|
// ProxyHealthz200Total is the number of returned HTTP Status 200 for each
|
||||||
|
// healthz probe.
|
||||||
|
ProxyHealthz200Total = metrics.NewCounter(
|
||||||
|
&metrics.CounterOpts{
|
||||||
|
Subsystem: kubeProxySubsystem,
|
||||||
|
Name: "proxy_healthz_200_total",
|
||||||
|
Help: "Cumulative proxy healthz HTTP status 200",
|
||||||
|
StabilityLevel: metrics.ALPHA,
|
||||||
|
},
|
||||||
|
)
|
||||||
|
|
||||||
|
// ProxyHealthz503Total is the number of returned HTTP Status 503 for each
|
||||||
|
// healthz probe.
|
||||||
|
ProxyHealthz503Total = metrics.NewCounter(
|
||||||
|
&metrics.CounterOpts{
|
||||||
|
Subsystem: kubeProxySubsystem,
|
||||||
|
Name: "proxy_healthz_503_total",
|
||||||
|
Help: "Cumulative proxy healthz HTTP status 503",
|
||||||
|
StabilityLevel: metrics.ALPHA,
|
||||||
|
},
|
||||||
|
)
|
||||||
|
|
||||||
|
// ProxyLivez200Total is the number of returned HTTP Status 200 for each
|
||||||
|
// livez probe.
|
||||||
|
ProxyLivez200Total = metrics.NewCounter(
|
||||||
|
&metrics.CounterOpts{
|
||||||
|
Subsystem: kubeProxySubsystem,
|
||||||
|
Name: "proxy_livez_200_total",
|
||||||
|
Help: "Cumulative proxy livez HTTP status 200",
|
||||||
|
StabilityLevel: metrics.ALPHA,
|
||||||
|
},
|
||||||
|
)
|
||||||
|
|
||||||
|
// ProxyLivez503Total is the number of returned HTTP Status 503 for each
|
||||||
|
// livez probe.
|
||||||
|
ProxyLivez503Total = metrics.NewCounter(
|
||||||
|
&metrics.CounterOpts{
|
||||||
|
Subsystem: kubeProxySubsystem,
|
||||||
|
Name: "proxy_livez_503_total",
|
||||||
|
Help: "Cumulative proxy livez HTTP status 503",
|
||||||
|
StabilityLevel: metrics.ALPHA,
|
||||||
|
},
|
||||||
|
)
|
||||||
|
|
||||||
// SyncProxyRulesLastQueuedTimestamp is the last time a proxy sync was
|
// SyncProxyRulesLastQueuedTimestamp is the last time a proxy sync was
|
||||||
// requested. If this is much larger than
|
// requested. If this is much larger than
|
||||||
// kubeproxy_sync_proxy_rules_last_timestamp_seconds, then something is hung.
|
// kubeproxy_sync_proxy_rules_last_timestamp_seconds, then something is hung.
|
||||||
@ -230,6 +274,11 @@ func RegisterMetrics() {
|
|||||||
legacyregistry.MustRegister(IptablesPartialRestoreFailuresTotal)
|
legacyregistry.MustRegister(IptablesPartialRestoreFailuresTotal)
|
||||||
legacyregistry.MustRegister(SyncProxyRulesLastQueuedTimestamp)
|
legacyregistry.MustRegister(SyncProxyRulesLastQueuedTimestamp)
|
||||||
legacyregistry.MustRegister(SyncProxyRulesNoLocalEndpointsTotal)
|
legacyregistry.MustRegister(SyncProxyRulesNoLocalEndpointsTotal)
|
||||||
|
legacyregistry.MustRegister(ProxyHealthz200Total)
|
||||||
|
legacyregistry.MustRegister(ProxyHealthz503Total)
|
||||||
|
legacyregistry.MustRegister(ProxyLivez200Total)
|
||||||
|
legacyregistry.MustRegister(ProxyLivez503Total)
|
||||||
|
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -23,6 +23,7 @@ import (
|
|||||||
v1 "k8s.io/api/core/v1"
|
v1 "k8s.io/api/core/v1"
|
||||||
"k8s.io/klog/v2"
|
"k8s.io/klog/v2"
|
||||||
"k8s.io/kubernetes/pkg/proxy/config"
|
"k8s.io/kubernetes/pkg/proxy/config"
|
||||||
|
"k8s.io/kubernetes/pkg/proxy/healthcheck"
|
||||||
)
|
)
|
||||||
|
|
||||||
// NodePodCIDRHandler handles the life cycle of kube-proxy based on the node PodCIDR assigned
|
// NodePodCIDRHandler handles the life cycle of kube-proxy based on the node PodCIDR assigned
|
||||||
@ -85,3 +86,23 @@ func (n *NodePodCIDRHandler) OnNodeDelete(node *v1.Node) {
|
|||||||
|
|
||||||
// OnNodeSynced is a handler for Node syncs.
|
// OnNodeSynced is a handler for Node syncs.
|
||||||
func (n *NodePodCIDRHandler) OnNodeSynced() {}
|
func (n *NodePodCIDRHandler) OnNodeSynced() {}
|
||||||
|
|
||||||
|
// NodeEligibleHandler handles the life cycle of the Node's eligibility, as
|
||||||
|
// determined by the health server for directing load balancer traffic.
|
||||||
|
type NodeEligibleHandler struct {
|
||||||
|
HealthServer healthcheck.ProxierHealthUpdater
|
||||||
|
}
|
||||||
|
|
||||||
|
var _ config.NodeHandler = &NodeEligibleHandler{}
|
||||||
|
|
||||||
|
// OnNodeAdd is a handler for Node creates.
|
||||||
|
func (n *NodeEligibleHandler) OnNodeAdd(node *v1.Node) { n.HealthServer.SyncNode(node) }
|
||||||
|
|
||||||
|
// OnNodeUpdate is a handler for Node updates.
|
||||||
|
func (n *NodeEligibleHandler) OnNodeUpdate(_, node *v1.Node) { n.HealthServer.SyncNode(node) }
|
||||||
|
|
||||||
|
// OnNodeDelete is a handler for Node deletes.
|
||||||
|
func (n *NodeEligibleHandler) OnNodeDelete(node *v1.Node) { n.HealthServer.SyncNode(node) }
|
||||||
|
|
||||||
|
// OnNodeSynced is a handler for Node syncs.
|
||||||
|
func (n *NodeEligibleHandler) OnNodeSynced() {}
|
||||||
|
Loading…
Reference in New Issue
Block a user