Implement metrics agreed on the KEP

This commit is contained in:
Alexander Constantinescu 2023-03-28 18:06:10 +02:00
parent 9b1c4c7b57
commit 08dd657a71
3 changed files with 126 additions and 21 deletions

View File

@ -28,11 +28,14 @@ import (
"github.com/google/go-cmp/cmp"
"k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/component-base/metrics/testutil"
"k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/dump"
"k8s.io/apimachinery/pkg/util/sets"
basemetrics "k8s.io/component-base/metrics"
"k8s.io/kubernetes/pkg/proxy/metrics"
proxyutil "k8s.io/kubernetes/pkg/proxy/util"
testingclock "k8s.io/utils/clock/testing"
)
@ -454,7 +457,15 @@ func tweakTainted(key string) nodeTweak {
}
}
type serverTest struct {
server httpServer
url url
tracking200 int
tracking503 int
}
func TestHealthzServer(t *testing.T) {
metrics.RegisterMetrics()
listener := newFakeListener()
httpFactory := newFakeHTTPServerFactory()
fakeClock := testingclock.NewFakeClock(time.Now())
@ -462,45 +473,53 @@ func TestHealthzServer(t *testing.T) {
hs := newProxierHealthServer(listener, httpFactory, fakeClock, "127.0.0.1:10256", 10*time.Second, nil, nil)
server := hs.httpFactory.New(hs.addr, healthzHandler{hs: hs})
hsTest := &serverTest{
server: server,
url: healthzURL,
tracking200: 0,
tracking503: 0,
}
// Should return 200 "OK" by default.
testHTTPHandler(server, healthzURL, http.StatusOK, t)
testHTTPHandler(hsTest, http.StatusOK, t)
// Should return 200 "OK" after first update
hs.Updated()
testHTTPHandler(server, healthzURL, http.StatusOK, t)
testHTTPHandler(hsTest, http.StatusOK, t)
// Should continue to return 200 "OK" as long as no further updates are queued
fakeClock.Step(25 * time.Second)
testHTTPHandler(server, healthzURL, http.StatusOK, t)
testHTTPHandler(hsTest, http.StatusOK, t)
// Should return 503 "ServiceUnavailable" if exceed max update-processing time
hs.QueuedUpdate()
fakeClock.Step(25 * time.Second)
testHTTPHandler(server, healthzURL, http.StatusServiceUnavailable, t)
testHTTPHandler(hsTest, http.StatusServiceUnavailable, t)
// Should return 200 "OK" after processing update
hs.Updated()
fakeClock.Step(5 * time.Second)
testHTTPHandler(server, healthzURL, http.StatusOK, t)
testHTTPHandler(hsTest, http.StatusOK, t)
// Should return 200 "OK" if we've synced a node, tainted in any other way
hs.SyncNode(makeNode(tweakTainted("other")))
testHTTPHandler(server, healthzURL, http.StatusOK, t)
testHTTPHandler(hsTest, http.StatusOK, t)
// Should return 503 "ServiceUnavailable" if we've synced a ToBeDeletedTaint node
hs.SyncNode(makeNode(tweakTainted(ToBeDeletedTaint)))
testHTTPHandler(server, healthzURL, http.StatusServiceUnavailable, t)
testHTTPHandler(hsTest, http.StatusServiceUnavailable, t)
// Should return 200 "OK" if we've synced a node, tainted in any other way
hs.SyncNode(makeNode(tweakTainted("other")))
testHTTPHandler(server, healthzURL, http.StatusOK, t)
testHTTPHandler(hsTest, http.StatusOK, t)
// Should return 503 "ServiceUnavailable" if we've synced a deleted node
hs.SyncNode(makeNode(tweakDeleted()))
testHTTPHandler(server, healthzURL, http.StatusServiceUnavailable, t)
testHTTPHandler(hsTest, http.StatusServiceUnavailable, t)
}
func TestLivezServer(t *testing.T) {
metrics.RegisterMetrics()
listener := newFakeListener()
httpFactory := newFakeHTTPServerFactory()
fakeClock := testingclock.NewFakeClock(time.Now())
@ -508,42 +527,49 @@ func TestLivezServer(t *testing.T) {
hs := newProxierHealthServer(listener, httpFactory, fakeClock, "127.0.0.1:10256", 10*time.Second, nil, nil)
server := hs.httpFactory.New(hs.addr, livezHandler{hs: hs})
hsTest := &serverTest{
server: server,
url: livezURL,
tracking200: 0,
tracking503: 0,
}
// Should return 200 "OK" by default.
testHTTPHandler(server, livezURL, http.StatusOK, t)
testHTTPHandler(hsTest, http.StatusOK, t)
// Should return 200 "OK" after first update
hs.Updated()
testHTTPHandler(server, livezURL, http.StatusOK, t)
testHTTPHandler(hsTest, http.StatusOK, t)
// Should continue to return 200 "OK" as long as no further updates are queued
fakeClock.Step(25 * time.Second)
testHTTPHandler(server, livezURL, http.StatusOK, t)
testHTTPHandler(hsTest, http.StatusOK, t)
// Should return 503 "ServiceUnavailable" if exceed max update-processing time
hs.QueuedUpdate()
fakeClock.Step(25 * time.Second)
testHTTPHandler(server, livezURL, http.StatusServiceUnavailable, t)
testHTTPHandler(hsTest, http.StatusServiceUnavailable, t)
// Should return 200 "OK" after processing update
hs.Updated()
fakeClock.Step(5 * time.Second)
testHTTPHandler(server, livezURL, http.StatusOK, t)
testHTTPHandler(hsTest, http.StatusOK, t)
// Should return 200 "OK" irrespective of node syncs
hs.SyncNode(makeNode(tweakTainted("other")))
testHTTPHandler(server, livezURL, http.StatusOK, t)
testHTTPHandler(hsTest, http.StatusOK, t)
// Should return 200 "OK" irrespective of node syncs
hs.SyncNode(makeNode(tweakTainted(ToBeDeletedTaint)))
testHTTPHandler(server, livezURL, http.StatusOK, t)
testHTTPHandler(hsTest, http.StatusOK, t)
// Should return 200 "OK" irrespective of node syncs
hs.SyncNode(makeNode(tweakTainted("other")))
testHTTPHandler(server, livezURL, http.StatusOK, t)
testHTTPHandler(hsTest, http.StatusOK, t)
// Should return 200 "OK" irrespective of node syncs
hs.SyncNode(makeNode(tweakDeleted()))
testHTTPHandler(server, livezURL, http.StatusOK, t)
testHTTPHandler(hsTest, http.StatusOK, t)
}
type url string
@ -553,9 +579,9 @@ var (
livezURL url = "/livez"
)
func testHTTPHandler(server httpServer, u url, status int, t *testing.T) {
handler := server.(*fakeHTTPServer).handler
req, err := http.NewRequest("GET", string(u), nil)
func testHTTPHandler(hsTest *serverTest, status int, t *testing.T) {
handler := hsTest.server.(*fakeHTTPServer).handler
req, err := http.NewRequest("GET", string(hsTest.url), nil)
if err != nil {
t.Fatal(err)
}
@ -570,6 +596,31 @@ func testHTTPHandler(server httpServer, u url, status int, t *testing.T) {
if err := json.Unmarshal(resp.Body.Bytes(), &payload); err != nil {
t.Fatal(err)
}
if status == http.StatusOK {
hsTest.tracking200++
}
if status == http.StatusServiceUnavailable {
hsTest.tracking503++
}
if hsTest.url == healthzURL {
testMetricEquals(metrics.ProxyHealthz200Total, float64(hsTest.tracking200), t)
testMetricEquals(metrics.ProxyHealthz503Total, float64(hsTest.tracking503), t)
}
if hsTest.url == livezURL {
testMetricEquals(metrics.ProxyLivez200Total, float64(hsTest.tracking200), t)
testMetricEquals(metrics.ProxyLivez503Total, float64(hsTest.tracking503), t)
}
}
func testMetricEquals(metric *basemetrics.Counter, expected float64, t *testing.T) {
val, err := testutil.GetCounterMetricValue(metric)
if err != nil {
t.Errorf("unable to retrieve value for metric: %s, err: %v", metric.Name, err)
}
if val != expected {
t.Errorf("unexpected metric: %s, expected: %v, found: %v", metric.Name, expected, val)
}
}
func TestServerWithSelectiveListeningAddress(t *testing.T) {

View File

@ -26,6 +26,7 @@ import (
"k8s.io/client-go/tools/events"
"k8s.io/klog/v2"
api "k8s.io/kubernetes/pkg/apis/core"
"k8s.io/kubernetes/pkg/proxy/metrics"
"k8s.io/utils/clock"
)
@ -192,8 +193,10 @@ func (h healthzHandler) ServeHTTP(resp http.ResponseWriter, req *http.Request) {
resp.Header().Set("Content-Type", "application/json")
resp.Header().Set("X-Content-Type-Options", "nosniff")
if !healthy {
metrics.ProxyHealthz503Total.Inc()
resp.WriteHeader(http.StatusServiceUnavailable)
} else {
metrics.ProxyHealthz200Total.Inc()
resp.WriteHeader(http.StatusOK)
// In older releases, the returned "lastUpdated" time indicated the last
// time the proxier sync loop ran, even if nothing had changed. To
@ -214,8 +217,10 @@ func (h livezHandler) ServeHTTP(resp http.ResponseWriter, req *http.Request) {
resp.Header().Set("Content-Type", "application/json")
resp.Header().Set("X-Content-Type-Options", "nosniff")
if !healthy {
metrics.ProxyLivez503Total.Inc()
resp.WriteHeader(http.StatusServiceUnavailable)
} else {
metrics.ProxyLivez200Total.Inc()
resp.WriteHeader(http.StatusOK)
// In older releases, the returned "lastUpdated" time indicated the last
// time the proxier sync loop ran, even if nothing had changed. To

View File

@ -171,6 +171,50 @@ var (
[]string{"table"},
)
// ProxyHealthz200Total is the number of returned HTTP Status 200 for each
// healthz probe.
ProxyHealthz200Total = metrics.NewCounter(
&metrics.CounterOpts{
Subsystem: kubeProxySubsystem,
Name: "proxy_healthz_200_total",
Help: "Cumulative proxy healthz HTTP status 200",
StabilityLevel: metrics.ALPHA,
},
)
// ProxyHealthz503Total is the number of returned HTTP Status 503 for each
// healthz probe.
ProxyHealthz503Total = metrics.NewCounter(
&metrics.CounterOpts{
Subsystem: kubeProxySubsystem,
Name: "proxy_healthz_503_total",
Help: "Cumulative proxy healthz HTTP status 503",
StabilityLevel: metrics.ALPHA,
},
)
// ProxyLivez200Total is the number of returned HTTP Status 200 for each
// livez probe.
ProxyLivez200Total = metrics.NewCounter(
&metrics.CounterOpts{
Subsystem: kubeProxySubsystem,
Name: "proxy_livez_200_total",
Help: "Cumulative proxy livez HTTP status 200",
StabilityLevel: metrics.ALPHA,
},
)
// ProxyLivez503Total is the number of returned HTTP Status 503 for each
// livez probe.
ProxyLivez503Total = metrics.NewCounter(
&metrics.CounterOpts{
Subsystem: kubeProxySubsystem,
Name: "proxy_livez_503_total",
Help: "Cumulative proxy livez HTTP status 503",
StabilityLevel: metrics.ALPHA,
},
)
// SyncProxyRulesLastQueuedTimestamp is the last time a proxy sync was
// requested. If this is much larger than
// kubeproxy_sync_proxy_rules_last_timestamp_seconds, then something is hung.
@ -216,6 +260,11 @@ func RegisterMetrics() {
legacyregistry.MustRegister(IptablesPartialRestoreFailuresTotal)
legacyregistry.MustRegister(SyncProxyRulesLastQueuedTimestamp)
legacyregistry.MustRegister(SyncProxyRulesNoLocalEndpointsTotal)
legacyregistry.MustRegister(ProxyHealthz200Total)
legacyregistry.MustRegister(ProxyHealthz503Total)
legacyregistry.MustRegister(ProxyLivez200Total)
legacyregistry.MustRegister(ProxyLivez503Total)
})
}