mirror of
https://github.com/k3s-io/kubernetes.git
synced 2025-07-23 19:56:01 +00:00
Merge pull request #98246 from YoyinZyc/add_context_to_apiserver
Apply request context to metrics in apiserver.
This commit is contained in:
commit
c07e7f8974
@ -17,6 +17,7 @@ limitations under the License.
|
|||||||
package audit
|
package audit
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"context"
|
||||||
"fmt"
|
"fmt"
|
||||||
|
|
||||||
auditinternal "k8s.io/apiserver/pkg/apis/audit"
|
auditinternal "k8s.io/apiserver/pkg/apis/audit"
|
||||||
@ -84,13 +85,13 @@ func init() {
|
|||||||
}
|
}
|
||||||
|
|
||||||
// ObserveEvent updates the relevant prometheus metrics for the generated audit event.
|
// ObserveEvent updates the relevant prometheus metrics for the generated audit event.
|
||||||
func ObserveEvent() {
|
func ObserveEvent(ctx context.Context) {
|
||||||
eventCounter.Inc()
|
eventCounter.WithContext(ctx).Inc()
|
||||||
}
|
}
|
||||||
|
|
||||||
// ObservePolicyLevel updates the relevant prometheus metrics with the audit level for a request.
|
// ObservePolicyLevel updates the relevant prometheus metrics with the audit level for a request.
|
||||||
func ObservePolicyLevel(level auditinternal.Level) {
|
func ObservePolicyLevel(ctx context.Context, level auditinternal.Level) {
|
||||||
levelCounter.WithLabelValues(string(level)).Inc()
|
levelCounter.WithContext(ctx).WithLabelValues(string(level)).Inc()
|
||||||
}
|
}
|
||||||
|
|
||||||
// HandlePluginError handles an error that occurred in an audit plugin. This method should only be
|
// HandlePluginError handles an error that occurred in an audit plugin. This method should only be
|
||||||
|
@ -149,7 +149,7 @@ func (a *Authenticator) AuthenticateRequest(req *http.Request) (*authenticator.R
|
|||||||
}
|
}
|
||||||
|
|
||||||
remaining := req.TLS.PeerCertificates[0].NotAfter.Sub(time.Now())
|
remaining := req.TLS.PeerCertificates[0].NotAfter.Sub(time.Now())
|
||||||
clientCertificateExpirationHistogram.Observe(remaining.Seconds())
|
clientCertificateExpirationHistogram.WithContext(req.Context()).Observe(remaining.Seconds())
|
||||||
chains, err := req.TLS.PeerCertificates[0].Verify(optsCopy)
|
chains, err := req.TLS.PeerCertificates[0].Verify(optsCopy)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, false, fmt.Errorf(
|
return nil, false, fmt.Errorf(
|
||||||
|
@ -133,7 +133,7 @@ func (a *cachedTokenAuthenticator) AuthenticateToken(ctx context.Context, token
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (a *cachedTokenAuthenticator) doAuthenticateToken(ctx context.Context, token string) *cacheRecord {
|
func (a *cachedTokenAuthenticator) doAuthenticateToken(ctx context.Context, token string) *cacheRecord {
|
||||||
doneAuthenticating := stats.authenticating()
|
doneAuthenticating := stats.authenticating(ctx)
|
||||||
|
|
||||||
auds, audsOk := authenticator.AudiencesFrom(ctx)
|
auds, audsOk := authenticator.AudiencesFrom(ctx)
|
||||||
|
|
||||||
@ -145,7 +145,7 @@ func (a *cachedTokenAuthenticator) doAuthenticateToken(ctx context.Context, toke
|
|||||||
}
|
}
|
||||||
|
|
||||||
// Record cache miss
|
// Record cache miss
|
||||||
doneBlocking := stats.blocking()
|
doneBlocking := stats.blocking(ctx)
|
||||||
defer doneBlocking()
|
defer doneBlocking()
|
||||||
defer doneAuthenticating(false)
|
defer doneAuthenticating(false)
|
||||||
|
|
||||||
@ -153,7 +153,7 @@ func (a *cachedTokenAuthenticator) doAuthenticateToken(ctx context.Context, toke
|
|||||||
// always use one place to read and write the output of AuthenticateToken
|
// always use one place to read and write the output of AuthenticateToken
|
||||||
record := &cacheRecord{}
|
record := &cacheRecord{}
|
||||||
|
|
||||||
doneFetching := stats.fetching()
|
doneFetching := stats.fetching(ctx)
|
||||||
// We're leaving the request handling stack so we need to handle crashes
|
// We're leaving the request handling stack so we need to handle crashes
|
||||||
// ourselves. Log a stack trace and return a 500 if something panics.
|
// ourselves. Log a stack trace and return a 500 if something panics.
|
||||||
defer func() {
|
defer func() {
|
||||||
|
@ -17,6 +17,7 @@ limitations under the License.
|
|||||||
package cache
|
package cache
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"context"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
"k8s.io/component-base/metrics"
|
"k8s.io/component-base/metrics"
|
||||||
@ -86,7 +87,7 @@ type statsCollector struct{}
|
|||||||
|
|
||||||
var stats = statsCollector{}
|
var stats = statsCollector{}
|
||||||
|
|
||||||
func (statsCollector) authenticating() func(hit bool) {
|
func (statsCollector) authenticating(ctx context.Context) func(hit bool) {
|
||||||
start := time.Now()
|
start := time.Now()
|
||||||
return func(hit bool) {
|
return func(hit bool) {
|
||||||
var tag string
|
var tag string
|
||||||
@ -98,18 +99,18 @@ func (statsCollector) authenticating() func(hit bool) {
|
|||||||
|
|
||||||
latency := time.Since(start)
|
latency := time.Since(start)
|
||||||
|
|
||||||
requestCount.WithLabelValues(tag).Inc()
|
requestCount.WithContext(ctx).WithLabelValues(tag).Inc()
|
||||||
requestLatency.WithLabelValues(tag).Observe(float64(latency.Milliseconds()) / 1000)
|
requestLatency.WithContext(ctx).WithLabelValues(tag).Observe(float64(latency.Milliseconds()) / 1000)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (statsCollector) blocking() func() {
|
func (statsCollector) blocking(ctx context.Context) func() {
|
||||||
activeFetchCount.WithLabelValues(fetchBlockedTag).Inc()
|
activeFetchCount.WithContext(ctx).WithLabelValues(fetchBlockedTag).Inc()
|
||||||
return activeFetchCount.WithLabelValues(fetchBlockedTag).Dec
|
return activeFetchCount.WithContext(ctx).WithLabelValues(fetchBlockedTag).Dec
|
||||||
}
|
}
|
||||||
|
|
||||||
func (statsCollector) fetching() func(ok bool) {
|
func (statsCollector) fetching(ctx context.Context) func(ok bool) {
|
||||||
activeFetchCount.WithLabelValues(fetchInFlightTag).Inc()
|
activeFetchCount.WithContext(ctx).WithLabelValues(fetchInFlightTag).Inc()
|
||||||
return func(ok bool) {
|
return func(ok bool) {
|
||||||
var tag string
|
var tag string
|
||||||
if ok {
|
if ok {
|
||||||
@ -118,8 +119,8 @@ func (statsCollector) fetching() func(ok bool) {
|
|||||||
tag = fetchFailedTag
|
tag = fetchFailedTag
|
||||||
}
|
}
|
||||||
|
|
||||||
fetchCount.WithLabelValues(tag).Inc()
|
fetchCount.WithContext(ctx).WithLabelValues(tag).Inc()
|
||||||
|
|
||||||
activeFetchCount.WithLabelValues(fetchInFlightTag).Dec()
|
activeFetchCount.WithContext(ctx).WithLabelValues(fetchInFlightTag).Dec()
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -56,8 +56,8 @@ func TrackStarted(handler http.Handler, name string) http.Handler {
|
|||||||
// TrackCompleted measures the timestamp the given handler has completed execution and then
|
// TrackCompleted measures the timestamp the given handler has completed execution and then
|
||||||
// it updates the corresponding metric with the filter latency duration.
|
// it updates the corresponding metric with the filter latency duration.
|
||||||
func TrackCompleted(handler http.Handler) http.Handler {
|
func TrackCompleted(handler http.Handler) http.Handler {
|
||||||
return trackCompleted(handler, utilclock.RealClock{}, func(fr *requestFilterRecord, completedAt time.Time) {
|
return trackCompleted(handler, utilclock.RealClock{}, func(ctx context.Context, fr *requestFilterRecord, completedAt time.Time) {
|
||||||
metrics.RecordFilterLatency(fr.name, completedAt.Sub(fr.startedTimestamp))
|
metrics.RecordFilterLatency(ctx, fr.name, completedAt.Sub(fr.startedTimestamp))
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -81,7 +81,7 @@ func trackStarted(handler http.Handler, name string, clock utilclock.PassiveCloc
|
|||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
func trackCompleted(handler http.Handler, clock utilclock.PassiveClock, action func(*requestFilterRecord, time.Time)) http.Handler {
|
func trackCompleted(handler http.Handler, clock utilclock.PassiveClock, action func(context.Context, *requestFilterRecord, time.Time)) http.Handler {
|
||||||
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
||||||
// The previous filter has just completed.
|
// The previous filter has just completed.
|
||||||
completedAt := clock.Now()
|
completedAt := clock.Now()
|
||||||
@ -90,7 +90,7 @@ func trackCompleted(handler http.Handler, clock utilclock.PassiveClock, action f
|
|||||||
|
|
||||||
ctx := r.Context()
|
ctx := r.Context()
|
||||||
if fr := requestFilterRecordFrom(ctx); fr != nil {
|
if fr := requestFilterRecordFrom(ctx); fr != nil {
|
||||||
action(fr, completedAt)
|
action(ctx, fr, completedAt)
|
||||||
}
|
}
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
@ -17,6 +17,7 @@ limitations under the License.
|
|||||||
package filterlatency
|
package filterlatency
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"context"
|
||||||
"net/http"
|
"net/http"
|
||||||
"net/http/httptest"
|
"net/http/httptest"
|
||||||
"testing"
|
"testing"
|
||||||
@ -120,7 +121,7 @@ func TestTrackCompletedContextHasFilterRecord(t *testing.T) {
|
|||||||
})
|
})
|
||||||
|
|
||||||
requestFilterEndedAt := time.Now()
|
requestFilterEndedAt := time.Now()
|
||||||
wrapped := trackCompleted(handler, utilclock.NewFakeClock(requestFilterEndedAt), func(fr *requestFilterRecord, completedAt time.Time) {
|
wrapped := trackCompleted(handler, utilclock.NewFakeClock(requestFilterEndedAt), func(_ context.Context, fr *requestFilterRecord, completedAt time.Time) {
|
||||||
actionCallCount++
|
actionCallCount++
|
||||||
filterRecordGot = fr
|
filterRecordGot = fr
|
||||||
filterCompletedAtGot = completedAt
|
filterCompletedAtGot = completedAt
|
||||||
@ -156,7 +157,7 @@ func TestTrackCompletedContextDoesNotHaveFilterRecord(t *testing.T) {
|
|||||||
handlerCallCount++
|
handlerCallCount++
|
||||||
})
|
})
|
||||||
|
|
||||||
wrapped := trackCompleted(handler, utilclock.NewFakeClock(time.Now()), func(_ *requestFilterRecord, _ time.Time) {
|
wrapped := trackCompleted(handler, utilclock.NewFakeClock(time.Now()), func(_ context.Context, _ *requestFilterRecord, _ time.Time) {
|
||||||
actionCallCount++
|
actionCallCount++
|
||||||
})
|
})
|
||||||
|
|
||||||
|
@ -18,6 +18,7 @@ package filters
|
|||||||
|
|
||||||
import (
|
import (
|
||||||
"bufio"
|
"bufio"
|
||||||
|
"context"
|
||||||
"errors"
|
"errors"
|
||||||
"fmt"
|
"fmt"
|
||||||
"net"
|
"net"
|
||||||
@ -56,8 +57,8 @@ func WithAudit(handler http.Handler, sink audit.Sink, policy policy.Checker, lon
|
|||||||
}
|
}
|
||||||
|
|
||||||
ev.Stage = auditinternal.StageRequestReceived
|
ev.Stage = auditinternal.StageRequestReceived
|
||||||
if processed := processAuditEvent(sink, ev, omitStages); !processed {
|
if processed := processAuditEvent(ctx, sink, ev, omitStages); !processed {
|
||||||
audit.ApiserverAuditDroppedCounter.Inc()
|
audit.ApiserverAuditDroppedCounter.WithContext(ctx).Inc()
|
||||||
responsewriters.InternalError(w, req, errors.New("failed to store audit event"))
|
responsewriters.InternalError(w, req, errors.New("failed to store audit event"))
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
@ -70,7 +71,7 @@ func WithAudit(handler http.Handler, sink audit.Sink, policy policy.Checker, lon
|
|||||||
longRunningSink = sink
|
longRunningSink = sink
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
respWriter := decorateResponseWriter(w, ev, longRunningSink, omitStages)
|
respWriter := decorateResponseWriter(ctx, w, ev, longRunningSink, omitStages)
|
||||||
|
|
||||||
// send audit event when we leave this func, either via a panic or cleanly. In the case of long
|
// send audit event when we leave this func, either via a panic or cleanly. In the case of long
|
||||||
// running requests, this will be the second audit event.
|
// running requests, this will be the second audit event.
|
||||||
@ -84,7 +85,7 @@ func WithAudit(handler http.Handler, sink audit.Sink, policy policy.Checker, lon
|
|||||||
Reason: metav1.StatusReasonInternalError,
|
Reason: metav1.StatusReasonInternalError,
|
||||||
Message: fmt.Sprintf("APIServer panic'd: %v", r),
|
Message: fmt.Sprintf("APIServer panic'd: %v", r),
|
||||||
}
|
}
|
||||||
processAuditEvent(sink, ev, omitStages)
|
processAuditEvent(ctx, sink, ev, omitStages)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -98,14 +99,14 @@ func WithAudit(handler http.Handler, sink audit.Sink, policy policy.Checker, lon
|
|||||||
if ev.ResponseStatus == nil && longRunningSink != nil {
|
if ev.ResponseStatus == nil && longRunningSink != nil {
|
||||||
ev.ResponseStatus = fakedSuccessStatus
|
ev.ResponseStatus = fakedSuccessStatus
|
||||||
ev.Stage = auditinternal.StageResponseStarted
|
ev.Stage = auditinternal.StageResponseStarted
|
||||||
processAuditEvent(longRunningSink, ev, omitStages)
|
processAuditEvent(ctx, longRunningSink, ev, omitStages)
|
||||||
}
|
}
|
||||||
|
|
||||||
ev.Stage = auditinternal.StageResponseComplete
|
ev.Stage = auditinternal.StageResponseComplete
|
||||||
if ev.ResponseStatus == nil {
|
if ev.ResponseStatus == nil {
|
||||||
ev.ResponseStatus = fakedSuccessStatus
|
ev.ResponseStatus = fakedSuccessStatus
|
||||||
}
|
}
|
||||||
processAuditEvent(sink, ev, omitStages)
|
processAuditEvent(ctx, sink, ev, omitStages)
|
||||||
}()
|
}()
|
||||||
handler.ServeHTTP(respWriter, req)
|
handler.ServeHTTP(respWriter, req)
|
||||||
})
|
})
|
||||||
@ -125,7 +126,7 @@ func createAuditEventAndAttachToContext(req *http.Request, policy policy.Checker
|
|||||||
}
|
}
|
||||||
|
|
||||||
level, omitStages := policy.LevelAndStages(attribs)
|
level, omitStages := policy.LevelAndStages(attribs)
|
||||||
audit.ObservePolicyLevel(level)
|
audit.ObservePolicyLevel(ctx, level)
|
||||||
if level == auditinternal.LevelNone {
|
if level == auditinternal.LevelNone {
|
||||||
// Don't audit.
|
// Don't audit.
|
||||||
return req, nil, nil, nil
|
return req, nil, nil, nil
|
||||||
@ -145,7 +146,7 @@ func createAuditEventAndAttachToContext(req *http.Request, policy policy.Checker
|
|||||||
return req, ev, omitStages, nil
|
return req, ev, omitStages, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func processAuditEvent(sink audit.Sink, ev *auditinternal.Event, omitStages []auditinternal.Stage) bool {
|
func processAuditEvent(ctx context.Context, sink audit.Sink, ev *auditinternal.Event, omitStages []auditinternal.Stage) bool {
|
||||||
for _, stage := range omitStages {
|
for _, stage := range omitStages {
|
||||||
if ev.Stage == stage {
|
if ev.Stage == stage {
|
||||||
return true
|
return true
|
||||||
@ -157,12 +158,13 @@ func processAuditEvent(sink audit.Sink, ev *auditinternal.Event, omitStages []au
|
|||||||
} else {
|
} else {
|
||||||
ev.StageTimestamp = metav1.NewMicroTime(time.Now())
|
ev.StageTimestamp = metav1.NewMicroTime(time.Now())
|
||||||
}
|
}
|
||||||
audit.ObserveEvent()
|
audit.ObserveEvent(ctx)
|
||||||
return sink.ProcessEvents(ev)
|
return sink.ProcessEvents(ev)
|
||||||
}
|
}
|
||||||
|
|
||||||
func decorateResponseWriter(responseWriter http.ResponseWriter, ev *auditinternal.Event, sink audit.Sink, omitStages []auditinternal.Stage) http.ResponseWriter {
|
func decorateResponseWriter(ctx context.Context, responseWriter http.ResponseWriter, ev *auditinternal.Event, sink audit.Sink, omitStages []auditinternal.Stage) http.ResponseWriter {
|
||||||
delegate := &auditResponseWriter{
|
delegate := &auditResponseWriter{
|
||||||
|
ctx: ctx,
|
||||||
ResponseWriter: responseWriter,
|
ResponseWriter: responseWriter,
|
||||||
event: ev,
|
event: ev,
|
||||||
sink: sink,
|
sink: sink,
|
||||||
@ -186,6 +188,7 @@ var _ http.ResponseWriter = &auditResponseWriter{}
|
|||||||
// create immediately an event (for long running requests).
|
// create immediately an event (for long running requests).
|
||||||
type auditResponseWriter struct {
|
type auditResponseWriter struct {
|
||||||
http.ResponseWriter
|
http.ResponseWriter
|
||||||
|
ctx context.Context
|
||||||
event *auditinternal.Event
|
event *auditinternal.Event
|
||||||
once sync.Once
|
once sync.Once
|
||||||
sink audit.Sink
|
sink audit.Sink
|
||||||
@ -205,7 +208,7 @@ func (a *auditResponseWriter) processCode(code int) {
|
|||||||
a.event.Stage = auditinternal.StageResponseStarted
|
a.event.Stage = auditinternal.StageResponseStarted
|
||||||
|
|
||||||
if a.sink != nil {
|
if a.sink != nil {
|
||||||
processAuditEvent(a.sink, a.event, a.omitStages)
|
processAuditEvent(a.ctx, a.sink, a.event, a.omitStages)
|
||||||
}
|
}
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
@ -18,6 +18,7 @@ package filters
|
|||||||
|
|
||||||
import (
|
import (
|
||||||
"bufio"
|
"bufio"
|
||||||
|
"context"
|
||||||
"net"
|
"net"
|
||||||
"net/http"
|
"net/http"
|
||||||
"net/http/httptest"
|
"net/http/httptest"
|
||||||
@ -92,14 +93,14 @@ func (*fancyResponseWriter) Flush() {}
|
|||||||
func (*fancyResponseWriter) Hijack() (net.Conn, *bufio.ReadWriter, error) { return nil, nil, nil }
|
func (*fancyResponseWriter) Hijack() (net.Conn, *bufio.ReadWriter, error) { return nil, nil, nil }
|
||||||
|
|
||||||
func TestConstructResponseWriter(t *testing.T) {
|
func TestConstructResponseWriter(t *testing.T) {
|
||||||
actual := decorateResponseWriter(&simpleResponseWriter{}, nil, nil, nil)
|
actual := decorateResponseWriter(context.Background(), &simpleResponseWriter{}, nil, nil, nil)
|
||||||
switch v := actual.(type) {
|
switch v := actual.(type) {
|
||||||
case *auditResponseWriter:
|
case *auditResponseWriter:
|
||||||
default:
|
default:
|
||||||
t.Errorf("Expected auditResponseWriter, got %v", reflect.TypeOf(v))
|
t.Errorf("Expected auditResponseWriter, got %v", reflect.TypeOf(v))
|
||||||
}
|
}
|
||||||
|
|
||||||
actual = decorateResponseWriter(&fancyResponseWriter{}, nil, nil, nil)
|
actual = decorateResponseWriter(context.Background(), &fancyResponseWriter{}, nil, nil, nil)
|
||||||
switch v := actual.(type) {
|
switch v := actual.(type) {
|
||||||
case *fancyResponseWriterDelegator:
|
case *fancyResponseWriterDelegator:
|
||||||
default:
|
default:
|
||||||
@ -109,7 +110,7 @@ func TestConstructResponseWriter(t *testing.T) {
|
|||||||
|
|
||||||
func TestDecorateResponseWriterWithoutChannel(t *testing.T) {
|
func TestDecorateResponseWriterWithoutChannel(t *testing.T) {
|
||||||
ev := &auditinternal.Event{}
|
ev := &auditinternal.Event{}
|
||||||
actual := decorateResponseWriter(&simpleResponseWriter{}, ev, nil, nil)
|
actual := decorateResponseWriter(context.Background(), &simpleResponseWriter{}, ev, nil, nil)
|
||||||
|
|
||||||
// write status. This will not block because firstEventSentCh is nil
|
// write status. This will not block because firstEventSentCh is nil
|
||||||
actual.WriteHeader(42)
|
actual.WriteHeader(42)
|
||||||
@ -123,7 +124,7 @@ func TestDecorateResponseWriterWithoutChannel(t *testing.T) {
|
|||||||
|
|
||||||
func TestDecorateResponseWriterWithImplicitWrite(t *testing.T) {
|
func TestDecorateResponseWriterWithImplicitWrite(t *testing.T) {
|
||||||
ev := &auditinternal.Event{}
|
ev := &auditinternal.Event{}
|
||||||
actual := decorateResponseWriter(&simpleResponseWriter{}, ev, nil, nil)
|
actual := decorateResponseWriter(context.Background(), &simpleResponseWriter{}, ev, nil, nil)
|
||||||
|
|
||||||
// write status. This will not block because firstEventSentCh is nil
|
// write status. This will not block because firstEventSentCh is nil
|
||||||
actual.Write([]byte("foo"))
|
actual.Write([]byte("foo"))
|
||||||
@ -138,7 +139,7 @@ func TestDecorateResponseWriterWithImplicitWrite(t *testing.T) {
|
|||||||
func TestDecorateResponseWriterChannel(t *testing.T) {
|
func TestDecorateResponseWriterChannel(t *testing.T) {
|
||||||
sink := &fakeAuditSink{}
|
sink := &fakeAuditSink{}
|
||||||
ev := &auditinternal.Event{}
|
ev := &auditinternal.Event{}
|
||||||
actual := decorateResponseWriter(&simpleResponseWriter{}, ev, sink, nil)
|
actual := decorateResponseWriter(context.Background(), &simpleResponseWriter{}, ev, sink, nil)
|
||||||
|
|
||||||
done := make(chan struct{})
|
done := make(chan struct{})
|
||||||
go func() {
|
go func() {
|
||||||
|
@ -47,7 +47,7 @@ func WithAuthentication(handler http.Handler, auth authenticator.Request, failed
|
|||||||
req = req.WithContext(authenticator.WithAudiences(req.Context(), apiAuds))
|
req = req.WithContext(authenticator.WithAudiences(req.Context(), apiAuds))
|
||||||
}
|
}
|
||||||
resp, ok, err := auth.AuthenticateRequest(req)
|
resp, ok, err := auth.AuthenticateRequest(req)
|
||||||
defer recordAuthMetrics(resp, ok, err, apiAuds, authenticationStart)
|
defer recordAuthMetrics(req.Context(), resp, ok, err, apiAuds, authenticationStart)
|
||||||
if err != nil || !ok {
|
if err != nil || !ok {
|
||||||
if err != nil {
|
if err != nil {
|
||||||
klog.ErrorS(err, "Unable to authenticate the request")
|
klog.ErrorS(err, "Unable to authenticate the request")
|
||||||
|
@ -52,7 +52,7 @@ func WithFailedAuthenticationAudit(failedHandler http.Handler, sink audit.Sink,
|
|||||||
ev.ResponseStatus.Message = getAuthMethods(req)
|
ev.ResponseStatus.Message = getAuthMethods(req)
|
||||||
ev.Stage = auditinternal.StageResponseStarted
|
ev.Stage = auditinternal.StageResponseStarted
|
||||||
|
|
||||||
rw := decorateResponseWriter(w, ev, sink, omitStages)
|
rw := decorateResponseWriter(req.Context(), w, ev, sink, omitStages)
|
||||||
failedHandler.ServeHTTP(rw, req)
|
failedHandler.ServeHTTP(rw, req)
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
@ -17,6 +17,7 @@ limitations under the License.
|
|||||||
package filters
|
package filters
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"context"
|
||||||
"strings"
|
"strings"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
@ -75,7 +76,7 @@ func init() {
|
|||||||
legacyregistry.MustRegister(authenticationLatency)
|
legacyregistry.MustRegister(authenticationLatency)
|
||||||
}
|
}
|
||||||
|
|
||||||
func recordAuthMetrics(resp *authenticator.Response, ok bool, err error, apiAudiences authenticator.Audiences, authStart time.Time) {
|
func recordAuthMetrics(ctx context.Context, resp *authenticator.Response, ok bool, err error, apiAudiences authenticator.Audiences, authStart time.Time) {
|
||||||
var resultLabel string
|
var resultLabel string
|
||||||
|
|
||||||
switch {
|
switch {
|
||||||
@ -85,11 +86,11 @@ func recordAuthMetrics(resp *authenticator.Response, ok bool, err error, apiAudi
|
|||||||
resultLabel = failureLabel
|
resultLabel = failureLabel
|
||||||
default:
|
default:
|
||||||
resultLabel = successLabel
|
resultLabel = successLabel
|
||||||
authenticatedUserCounter.WithLabelValues(compressUsername(resp.User.GetName())).Inc()
|
authenticatedUserCounter.WithContext(ctx).WithLabelValues(compressUsername(resp.User.GetName())).Inc()
|
||||||
}
|
}
|
||||||
|
|
||||||
authenticatedAttemptsCounter.WithLabelValues(resultLabel).Inc()
|
authenticatedAttemptsCounter.WithContext(ctx).WithLabelValues(resultLabel).Inc()
|
||||||
authenticationLatency.WithLabelValues(resultLabel).Observe(time.Since(authStart).Seconds())
|
authenticationLatency.WithContext(ctx).WithLabelValues(resultLabel).Observe(time.Since(authStart).Seconds())
|
||||||
}
|
}
|
||||||
|
|
||||||
// compressUsername maps all possible usernames onto a small set of categories
|
// compressUsername maps all possible usernames onto a small set of categories
|
||||||
|
@ -126,7 +126,7 @@ func withFailedRequestAudit(failedHandler http.Handler, statusErr *apierrors.Sta
|
|||||||
ev.ResponseStatus.Message = statusErr.Error()
|
ev.ResponseStatus.Message = statusErr.Error()
|
||||||
}
|
}
|
||||||
|
|
||||||
rw := decorateResponseWriter(w, ev, sink, omitStages)
|
rw := decorateResponseWriter(req.Context(), w, ev, sink, omitStages)
|
||||||
failedHandler.ServeHTTP(rw, req)
|
failedHandler.ServeHTTP(rw, req)
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
@ -163,8 +163,8 @@ type WatchServer struct {
|
|||||||
// or over a websocket connection.
|
// or over a websocket connection.
|
||||||
func (s *WatchServer) ServeHTTP(w http.ResponseWriter, req *http.Request) {
|
func (s *WatchServer) ServeHTTP(w http.ResponseWriter, req *http.Request) {
|
||||||
kind := s.Scope.Kind
|
kind := s.Scope.Kind
|
||||||
metrics.RegisteredWatchers.WithLabelValues(kind.Group, kind.Version, kind.Kind).Inc()
|
metrics.RegisteredWatchers.WithContext(req.Context()).WithLabelValues(kind.Group, kind.Version, kind.Kind).Inc()
|
||||||
defer metrics.RegisteredWatchers.WithLabelValues(kind.Group, kind.Version, kind.Kind).Dec()
|
defer metrics.RegisteredWatchers.WithContext(req.Context()).WithLabelValues(kind.Group, kind.Version, kind.Kind).Dec()
|
||||||
|
|
||||||
w = httplog.Unlogged(req, w)
|
w = httplog.Unlogged(req, w)
|
||||||
|
|
||||||
@ -220,7 +220,7 @@ func (s *WatchServer) ServeHTTP(w http.ResponseWriter, req *http.Request) {
|
|||||||
// End of results.
|
// End of results.
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
metrics.WatchEvents.WithLabelValues(kind.Group, kind.Version, kind.Kind).Inc()
|
metrics.WatchEvents.WithContext(req.Context()).WithLabelValues(kind.Group, kind.Version, kind.Kind).Inc()
|
||||||
|
|
||||||
obj := s.Fixup(event.Object)
|
obj := s.Fixup(event.Object)
|
||||||
if err := s.EmbeddedEncoder.Encode(obj, buf); err != nil {
|
if err := s.EmbeddedEncoder.Encode(obj, buf); err != nil {
|
||||||
@ -233,7 +233,7 @@ func (s *WatchServer) ServeHTTP(w http.ResponseWriter, req *http.Request) {
|
|||||||
// type
|
// type
|
||||||
unknown.Raw = buf.Bytes()
|
unknown.Raw = buf.Bytes()
|
||||||
event.Object = &unknown
|
event.Object = &unknown
|
||||||
metrics.WatchEventsSizes.WithLabelValues(kind.Group, kind.Version, kind.Kind).Observe(float64(len(unknown.Raw)))
|
metrics.WatchEventsSizes.WithContext(req.Context()).WithLabelValues(kind.Group, kind.Version, kind.Kind).Observe(float64(len(unknown.Raw)))
|
||||||
|
|
||||||
*outEvent = metav1.WatchEvent{}
|
*outEvent = metav1.WatchEvent{}
|
||||||
|
|
||||||
|
@ -18,6 +18,7 @@ package metrics
|
|||||||
|
|
||||||
import (
|
import (
|
||||||
"bufio"
|
"bufio"
|
||||||
|
"context"
|
||||||
"net"
|
"net"
|
||||||
"net/http"
|
"net/http"
|
||||||
"net/url"
|
"net/url"
|
||||||
@ -324,8 +325,8 @@ func UpdateInflightRequestMetrics(phase string, nonmutating, mutating int) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func RecordFilterLatency(name string, elapsed time.Duration) {
|
func RecordFilterLatency(ctx context.Context, name string, elapsed time.Duration) {
|
||||||
requestFilterDuration.WithLabelValues(name).Observe(elapsed.Seconds())
|
requestFilterDuration.WithContext(ctx).WithLabelValues(name).Observe(elapsed.Seconds())
|
||||||
}
|
}
|
||||||
|
|
||||||
// RecordRequestAbort records that the request was aborted possibly due to a timeout.
|
// RecordRequestAbort records that the request was aborted possibly due to a timeout.
|
||||||
@ -341,7 +342,7 @@ func RecordRequestAbort(req *http.Request, requestInfo *request.RequestInfo) {
|
|||||||
group := requestInfo.APIGroup
|
group := requestInfo.APIGroup
|
||||||
version := requestInfo.APIVersion
|
version := requestInfo.APIVersion
|
||||||
|
|
||||||
requestAbortsTotal.WithLabelValues(reportedVerb, group, version, resource, subresource, scope).Inc()
|
requestAbortsTotal.WithContext(req.Context()).WithLabelValues(reportedVerb, group, version, resource, subresource, scope).Inc()
|
||||||
}
|
}
|
||||||
|
|
||||||
// RecordRequestTermination records that the request was terminated early as part of a resource
|
// RecordRequestTermination records that the request was terminated early as part of a resource
|
||||||
@ -361,9 +362,9 @@ func RecordRequestTermination(req *http.Request, requestInfo *request.RequestInf
|
|||||||
reportedVerb := cleanVerb(canonicalVerb(strings.ToUpper(req.Method), scope), req)
|
reportedVerb := cleanVerb(canonicalVerb(strings.ToUpper(req.Method), scope), req)
|
||||||
|
|
||||||
if requestInfo.IsResourceRequest {
|
if requestInfo.IsResourceRequest {
|
||||||
requestTerminationsTotal.WithLabelValues(reportedVerb, requestInfo.APIGroup, requestInfo.APIVersion, requestInfo.Resource, requestInfo.Subresource, scope, component, codeToString(code)).Inc()
|
requestTerminationsTotal.WithContext(req.Context()).WithLabelValues(reportedVerb, requestInfo.APIGroup, requestInfo.APIVersion, requestInfo.Resource, requestInfo.Subresource, scope, component, codeToString(code)).Inc()
|
||||||
} else {
|
} else {
|
||||||
requestTerminationsTotal.WithLabelValues(reportedVerb, "", "", "", requestInfo.Path, scope, component, codeToString(code)).Inc()
|
requestTerminationsTotal.WithContext(req.Context()).WithLabelValues(reportedVerb, "", "", "", requestInfo.Path, scope, component, codeToString(code)).Inc()
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -383,9 +384,9 @@ func RecordLongRunning(req *http.Request, requestInfo *request.RequestInfo, comp
|
|||||||
reportedVerb := cleanVerb(canonicalVerb(strings.ToUpper(req.Method), scope), req)
|
reportedVerb := cleanVerb(canonicalVerb(strings.ToUpper(req.Method), scope), req)
|
||||||
|
|
||||||
if requestInfo.IsResourceRequest {
|
if requestInfo.IsResourceRequest {
|
||||||
g = longRunningRequestGauge.WithLabelValues(reportedVerb, requestInfo.APIGroup, requestInfo.APIVersion, requestInfo.Resource, requestInfo.Subresource, scope, component)
|
g = longRunningRequestGauge.WithContext(req.Context()).WithLabelValues(reportedVerb, requestInfo.APIGroup, requestInfo.APIVersion, requestInfo.Resource, requestInfo.Subresource, scope, component)
|
||||||
} else {
|
} else {
|
||||||
g = longRunningRequestGauge.WithLabelValues(reportedVerb, "", "", "", requestInfo.Path, scope, component)
|
g = longRunningRequestGauge.WithContext(req.Context()).WithLabelValues(reportedVerb, "", "", "", requestInfo.Path, scope, component)
|
||||||
}
|
}
|
||||||
g.Inc()
|
g.Inc()
|
||||||
defer g.Dec()
|
defer g.Dec()
|
||||||
@ -404,23 +405,23 @@ func MonitorRequest(req *http.Request, verb, group, version, resource, subresour
|
|||||||
dryRun := cleanDryRun(req.URL)
|
dryRun := cleanDryRun(req.URL)
|
||||||
elapsedSeconds := elapsed.Seconds()
|
elapsedSeconds := elapsed.Seconds()
|
||||||
cleanContentType := cleanContentType(contentType)
|
cleanContentType := cleanContentType(contentType)
|
||||||
requestCounter.WithLabelValues(reportedVerb, dryRun, group, version, resource, subresource, scope, component, cleanContentType, codeToString(httpCode)).Inc()
|
requestCounter.WithContext(req.Context()).WithLabelValues(reportedVerb, dryRun, group, version, resource, subresource, scope, component, cleanContentType, codeToString(httpCode)).Inc()
|
||||||
// MonitorRequest happens after authentication, so we can trust the username given by the request
|
// MonitorRequest happens after authentication, so we can trust the username given by the request
|
||||||
info, ok := request.UserFrom(req.Context())
|
info, ok := request.UserFrom(req.Context())
|
||||||
if ok && info.GetName() == user.APIServerUser {
|
if ok && info.GetName() == user.APIServerUser {
|
||||||
apiSelfRequestCounter.WithLabelValues(reportedVerb, resource, subresource).Inc()
|
apiSelfRequestCounter.WithContext(req.Context()).WithLabelValues(reportedVerb, resource, subresource).Inc()
|
||||||
}
|
}
|
||||||
if deprecated {
|
if deprecated {
|
||||||
deprecatedRequestGauge.WithLabelValues(group, version, resource, subresource, removedRelease).Set(1)
|
deprecatedRequestGauge.WithContext(req.Context()).WithLabelValues(group, version, resource, subresource, removedRelease).Set(1)
|
||||||
audit.AddAuditAnnotation(req.Context(), deprecatedAnnotationKey, "true")
|
audit.AddAuditAnnotation(req.Context(), deprecatedAnnotationKey, "true")
|
||||||
if len(removedRelease) > 0 {
|
if len(removedRelease) > 0 {
|
||||||
audit.AddAuditAnnotation(req.Context(), removedReleaseAnnotationKey, removedRelease)
|
audit.AddAuditAnnotation(req.Context(), removedReleaseAnnotationKey, removedRelease)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
requestLatencies.WithLabelValues(reportedVerb, dryRun, group, version, resource, subresource, scope, component).Observe(elapsedSeconds)
|
requestLatencies.WithContext(req.Context()).WithLabelValues(reportedVerb, dryRun, group, version, resource, subresource, scope, component).Observe(elapsedSeconds)
|
||||||
// We are only interested in response sizes of read requests.
|
// We are only interested in response sizes of read requests.
|
||||||
if verb == "GET" || verb == "LIST" {
|
if verb == "GET" || verb == "LIST" {
|
||||||
responseSizes.WithLabelValues(reportedVerb, group, version, resource, subresource, scope, component).Observe(float64(respSize))
|
responseSizes.WithContext(req.Context()).WithLabelValues(reportedVerb, group, version, resource, subresource, scope, component).Observe(float64(respSize))
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -196,9 +196,9 @@ func WithMaxInFlightLimit(
|
|||||||
}
|
}
|
||||||
// We need to split this data between buckets used for throttling.
|
// We need to split this data between buckets used for throttling.
|
||||||
if isMutatingRequest {
|
if isMutatingRequest {
|
||||||
metrics.DroppedRequests.WithLabelValues(metrics.MutatingKind).Inc()
|
metrics.DroppedRequests.WithContext(ctx).WithLabelValues(metrics.MutatingKind).Inc()
|
||||||
} else {
|
} else {
|
||||||
metrics.DroppedRequests.WithLabelValues(metrics.ReadOnlyKind).Inc()
|
metrics.DroppedRequests.WithContext(ctx).WithLabelValues(metrics.ReadOnlyKind).Inc()
|
||||||
}
|
}
|
||||||
metrics.RecordRequestTermination(r, requestInfo, metrics.APIServerComponent, http.StatusTooManyRequests)
|
metrics.RecordRequestTermination(r, requestInfo, metrics.APIServerComponent, http.StatusTooManyRequests)
|
||||||
tooManyRequests(r, w)
|
tooManyRequests(r, w)
|
||||||
|
@ -141,9 +141,9 @@ func WithPriorityAndFairness(
|
|||||||
}, execute)
|
}, execute)
|
||||||
if !served {
|
if !served {
|
||||||
if isMutatingRequest {
|
if isMutatingRequest {
|
||||||
epmetrics.DroppedRequests.WithLabelValues(epmetrics.MutatingKind).Inc()
|
epmetrics.DroppedRequests.WithContext(ctx).WithLabelValues(epmetrics.MutatingKind).Inc()
|
||||||
} else {
|
} else {
|
||||||
epmetrics.DroppedRequests.WithLabelValues(epmetrics.ReadOnlyKind).Inc()
|
epmetrics.DroppedRequests.WithContext(ctx).WithLabelValues(epmetrics.ReadOnlyKind).Inc()
|
||||||
}
|
}
|
||||||
epmetrics.RecordRequestTermination(r, requestInfo, epmetrics.APIServerComponent, http.StatusTooManyRequests)
|
epmetrics.RecordRequestTermination(r, requestInfo, epmetrics.APIServerComponent, http.StatusTooManyRequests)
|
||||||
tooManyRequests(r, w)
|
tooManyRequests(r, w)
|
||||||
|
@ -123,7 +123,7 @@ func (cfgCtlr *configController) Handle(ctx context.Context, requestDigest Reque
|
|||||||
noteFn(fs, pl)
|
noteFn(fs, pl)
|
||||||
if req == nil {
|
if req == nil {
|
||||||
if queued {
|
if queued {
|
||||||
metrics.ObserveWaitingDuration(pl.Name, fs.Name, strconv.FormatBool(req != nil), time.Since(startWaitingTime))
|
metrics.ObserveWaitingDuration(ctx, pl.Name, fs.Name, strconv.FormatBool(req != nil), time.Since(startWaitingTime))
|
||||||
}
|
}
|
||||||
klog.V(7).Infof("Handle(%#+v) => fsName=%q, distMethod=%#+v, plName=%q, isExempt=%v, reject", requestDigest, fs.Name, fs.Spec.DistinguisherMethod, pl.Name, isExempt)
|
klog.V(7).Infof("Handle(%#+v) => fsName=%q, distMethod=%#+v, plName=%q, isExempt=%v, reject", requestDigest, fs.Name, fs.Spec.DistinguisherMethod, pl.Name, isExempt)
|
||||||
return
|
return
|
||||||
@ -140,18 +140,18 @@ func (cfgCtlr *configController) Handle(ctx context.Context, requestDigest Reque
|
|||||||
}()
|
}()
|
||||||
idle = req.Finish(func() {
|
idle = req.Finish(func() {
|
||||||
if queued {
|
if queued {
|
||||||
metrics.ObserveWaitingDuration(pl.Name, fs.Name, strconv.FormatBool(req != nil), time.Since(startWaitingTime))
|
metrics.ObserveWaitingDuration(ctx, pl.Name, fs.Name, strconv.FormatBool(req != nil), time.Since(startWaitingTime))
|
||||||
}
|
}
|
||||||
metrics.AddDispatch(pl.Name, fs.Name)
|
metrics.AddDispatch(ctx, pl.Name, fs.Name)
|
||||||
executed = true
|
executed = true
|
||||||
startExecutionTime := time.Now()
|
startExecutionTime := time.Now()
|
||||||
defer func() {
|
defer func() {
|
||||||
metrics.ObserveExecutionDuration(pl.Name, fs.Name, time.Since(startExecutionTime))
|
metrics.ObserveExecutionDuration(ctx, pl.Name, fs.Name, time.Since(startExecutionTime))
|
||||||
}()
|
}()
|
||||||
execFn()
|
execFn()
|
||||||
})
|
})
|
||||||
if queued && !executed {
|
if queued && !executed {
|
||||||
metrics.ObserveWaitingDuration(pl.Name, fs.Name, strconv.FormatBool(req != nil), time.Since(startWaitingTime))
|
metrics.ObserveWaitingDuration(ctx, pl.Name, fs.Name, strconv.FormatBool(req != nil), time.Since(startWaitingTime))
|
||||||
}
|
}
|
||||||
panicking = false
|
panicking = false
|
||||||
}
|
}
|
||||||
|
@ -243,7 +243,7 @@ func (qs *queueSet) StartRequest(ctx context.Context, hashValue uint64, flowDist
|
|||||||
if qs.qCfg.DesiredNumQueues < 1 {
|
if qs.qCfg.DesiredNumQueues < 1 {
|
||||||
if qs.totRequestsExecuting >= qs.dCfg.ConcurrencyLimit {
|
if qs.totRequestsExecuting >= qs.dCfg.ConcurrencyLimit {
|
||||||
klog.V(5).Infof("QS(%s): rejecting request %q %#+v %#+v because %d are executing and the limit is %d", qs.qCfg.Name, fsName, descr1, descr2, qs.totRequestsExecuting, qs.dCfg.ConcurrencyLimit)
|
klog.V(5).Infof("QS(%s): rejecting request %q %#+v %#+v because %d are executing and the limit is %d", qs.qCfg.Name, fsName, descr1, descr2, qs.totRequestsExecuting, qs.dCfg.ConcurrencyLimit)
|
||||||
metrics.AddReject(qs.qCfg.Name, fsName, "concurrency-limit")
|
metrics.AddReject(ctx, qs.qCfg.Name, fsName, "concurrency-limit")
|
||||||
return nil, qs.isIdleLocked()
|
return nil, qs.isIdleLocked()
|
||||||
}
|
}
|
||||||
req = qs.dispatchSansQueueLocked(ctx, flowDistinguisher, fsName, descr1, descr2)
|
req = qs.dispatchSansQueueLocked(ctx, flowDistinguisher, fsName, descr1, descr2)
|
||||||
@ -262,7 +262,7 @@ func (qs *queueSet) StartRequest(ctx context.Context, hashValue uint64, flowDist
|
|||||||
// concurrency shares and at max queue length already
|
// concurrency shares and at max queue length already
|
||||||
if req == nil {
|
if req == nil {
|
||||||
klog.V(5).Infof("QS(%s): rejecting request %q %#+v %#+v due to queue full", qs.qCfg.Name, fsName, descr1, descr2)
|
klog.V(5).Infof("QS(%s): rejecting request %q %#+v %#+v due to queue full", qs.qCfg.Name, fsName, descr1, descr2)
|
||||||
metrics.AddReject(qs.qCfg.Name, fsName, "queue-full")
|
metrics.AddReject(ctx, qs.qCfg.Name, fsName, "queue-full")
|
||||||
return nil, qs.isIdleLocked()
|
return nil, qs.isIdleLocked()
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -351,7 +351,7 @@ func (req *request) wait() (bool, bool) {
|
|||||||
switch decision {
|
switch decision {
|
||||||
case decisionReject:
|
case decisionReject:
|
||||||
klog.V(5).Infof("QS(%s): request %#+v %#+v timed out after being enqueued\n", qs.qCfg.Name, req.descr1, req.descr2)
|
klog.V(5).Infof("QS(%s): request %#+v %#+v timed out after being enqueued\n", qs.qCfg.Name, req.descr1, req.descr2)
|
||||||
metrics.AddReject(qs.qCfg.Name, req.fsName, "time-out")
|
metrics.AddReject(req.ctx, qs.qCfg.Name, req.fsName, "time-out")
|
||||||
return false, qs.isIdleLocked()
|
return false, qs.isIdleLocked()
|
||||||
case decisionCancel:
|
case decisionCancel:
|
||||||
// TODO(aaron-prindle) add metrics for this case
|
// TODO(aaron-prindle) add metrics for this case
|
||||||
@ -448,7 +448,7 @@ func (qs *queueSet) timeoutOldRequestsAndRejectOrEnqueueLocked(ctx context.Conte
|
|||||||
if ok := qs.rejectOrEnqueueLocked(req); !ok {
|
if ok := qs.rejectOrEnqueueLocked(req); !ok {
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
metrics.ObserveQueueLength(qs.qCfg.Name, fsName, len(queue.requests))
|
metrics.ObserveQueueLength(ctx, qs.qCfg.Name, fsName, len(queue.requests))
|
||||||
return req
|
return req
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -486,7 +486,7 @@ func (qs *queueSet) removeTimedOutRequestsFromQueueLocked(queue *queue, fsName s
|
|||||||
req.decision.SetLocked(decisionReject)
|
req.decision.SetLocked(decisionReject)
|
||||||
// get index for timed out requests
|
// get index for timed out requests
|
||||||
timeoutIdx = i
|
timeoutIdx = i
|
||||||
metrics.AddRequestsInQueues(qs.qCfg.Name, req.fsName, -1)
|
metrics.AddRequestsInQueues(req.ctx, qs.qCfg.Name, req.fsName, -1)
|
||||||
req.NoteQueued(false)
|
req.NoteQueued(false)
|
||||||
} else {
|
} else {
|
||||||
break
|
break
|
||||||
@ -534,7 +534,7 @@ func (qs *queueSet) enqueueLocked(request *request) {
|
|||||||
}
|
}
|
||||||
queue.Enqueue(request)
|
queue.Enqueue(request)
|
||||||
qs.totRequestsWaiting++
|
qs.totRequestsWaiting++
|
||||||
metrics.AddRequestsInQueues(qs.qCfg.Name, request.fsName, 1)
|
metrics.AddRequestsInQueues(request.ctx, qs.qCfg.Name, request.fsName, 1)
|
||||||
request.NoteQueued(true)
|
request.NoteQueued(true)
|
||||||
qs.obsPair.RequestsWaiting.Add(1)
|
qs.obsPair.RequestsWaiting.Add(1)
|
||||||
}
|
}
|
||||||
@ -569,7 +569,7 @@ func (qs *queueSet) dispatchSansQueueLocked(ctx context.Context, flowDistinguish
|
|||||||
}
|
}
|
||||||
req.decision.SetLocked(decisionExecute)
|
req.decision.SetLocked(decisionExecute)
|
||||||
qs.totRequestsExecuting++
|
qs.totRequestsExecuting++
|
||||||
metrics.AddRequestsExecuting(qs.qCfg.Name, fsName, 1)
|
metrics.AddRequestsExecuting(ctx, qs.qCfg.Name, fsName, 1)
|
||||||
qs.obsPair.RequestsExecuting.Add(1)
|
qs.obsPair.RequestsExecuting.Add(1)
|
||||||
if klog.V(5).Enabled() {
|
if klog.V(5).Enabled() {
|
||||||
klog.Infof("QS(%s) at r=%s v=%.9fs: immediate dispatch of request %q %#+v %#+v, qs will have %d executing", qs.qCfg.Name, now.Format(nsTimeFmt), qs.virtualTime, fsName, descr1, descr2, qs.totRequestsExecuting)
|
klog.Infof("QS(%s) at r=%s v=%.9fs: immediate dispatch of request %q %#+v %#+v, qs will have %d executing", qs.qCfg.Name, now.Format(nsTimeFmt), qs.virtualTime, fsName, descr1, descr2, qs.totRequestsExecuting)
|
||||||
@ -599,9 +599,9 @@ func (qs *queueSet) dispatchLocked() bool {
|
|||||||
qs.totRequestsWaiting--
|
qs.totRequestsWaiting--
|
||||||
qs.totRequestsExecuting++
|
qs.totRequestsExecuting++
|
||||||
queue.requestsExecuting++
|
queue.requestsExecuting++
|
||||||
metrics.AddRequestsInQueues(qs.qCfg.Name, request.fsName, -1)
|
metrics.AddRequestsInQueues(request.ctx, qs.qCfg.Name, request.fsName, -1)
|
||||||
request.NoteQueued(false)
|
request.NoteQueued(false)
|
||||||
metrics.AddRequestsExecuting(qs.qCfg.Name, request.fsName, 1)
|
metrics.AddRequestsExecuting(request.ctx, qs.qCfg.Name, request.fsName, 1)
|
||||||
qs.obsPair.RequestsWaiting.Add(-1)
|
qs.obsPair.RequestsWaiting.Add(-1)
|
||||||
qs.obsPair.RequestsExecuting.Add(1)
|
qs.obsPair.RequestsExecuting.Add(1)
|
||||||
if klog.V(6).Enabled() {
|
if klog.V(6).Enabled() {
|
||||||
@ -631,7 +631,7 @@ func (qs *queueSet) cancelWait(req *request) {
|
|||||||
// remove the request
|
// remove the request
|
||||||
queue.requests = append(queue.requests[:i], queue.requests[i+1:]...)
|
queue.requests = append(queue.requests[:i], queue.requests[i+1:]...)
|
||||||
qs.totRequestsWaiting--
|
qs.totRequestsWaiting--
|
||||||
metrics.AddRequestsInQueues(qs.qCfg.Name, req.fsName, -1)
|
metrics.AddRequestsInQueues(req.ctx, qs.qCfg.Name, req.fsName, -1)
|
||||||
req.NoteQueued(false)
|
req.NoteQueued(false)
|
||||||
qs.obsPair.RequestsWaiting.Add(-1)
|
qs.obsPair.RequestsWaiting.Add(-1)
|
||||||
break
|
break
|
||||||
@ -704,7 +704,7 @@ func (qs *queueSet) finishRequestAndDispatchAsMuchAsPossible(req *request) bool
|
|||||||
func (qs *queueSet) finishRequestLocked(r *request) {
|
func (qs *queueSet) finishRequestLocked(r *request) {
|
||||||
now := qs.clock.Now()
|
now := qs.clock.Now()
|
||||||
qs.totRequestsExecuting--
|
qs.totRequestsExecuting--
|
||||||
metrics.AddRequestsExecuting(qs.qCfg.Name, r.fsName, -1)
|
metrics.AddRequestsExecuting(r.ctx, qs.qCfg.Name, r.fsName, -1)
|
||||||
qs.obsPair.RequestsExecuting.Add(-1)
|
qs.obsPair.RequestsExecuting.Add(-1)
|
||||||
|
|
||||||
if r.queue == nil {
|
if r.queue == nil {
|
||||||
|
@ -17,6 +17,7 @@ limitations under the License.
|
|||||||
package metrics
|
package metrics
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"context"
|
||||||
"strings"
|
"strings"
|
||||||
"sync"
|
"sync"
|
||||||
"time"
|
"time"
|
||||||
@ -221,12 +222,12 @@ var (
|
|||||||
)
|
)
|
||||||
|
|
||||||
// AddRequestsInQueues adds the given delta to the gauge of the # of requests in the queues of the specified flowSchema and priorityLevel
|
// AddRequestsInQueues adds the given delta to the gauge of the # of requests in the queues of the specified flowSchema and priorityLevel
|
||||||
func AddRequestsInQueues(priorityLevel, flowSchema string, delta int) {
|
func AddRequestsInQueues(ctx context.Context, priorityLevel, flowSchema string, delta int) {
|
||||||
apiserverCurrentInqueueRequests.WithLabelValues(priorityLevel, flowSchema).Add(float64(delta))
|
apiserverCurrentInqueueRequests.WithLabelValues(priorityLevel, flowSchema).Add(float64(delta))
|
||||||
}
|
}
|
||||||
|
|
||||||
// AddRequestsExecuting adds the given delta to the gauge of executing requests of the given flowSchema and priorityLevel
|
// AddRequestsExecuting adds the given delta to the gauge of executing requests of the given flowSchema and priorityLevel
|
||||||
func AddRequestsExecuting(priorityLevel, flowSchema string, delta int) {
|
func AddRequestsExecuting(ctx context.Context, priorityLevel, flowSchema string, delta int) {
|
||||||
apiserverCurrentExecutingRequests.WithLabelValues(priorityLevel, flowSchema).Add(float64(delta))
|
apiserverCurrentExecutingRequests.WithLabelValues(priorityLevel, flowSchema).Add(float64(delta))
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -236,26 +237,26 @@ func UpdateSharedConcurrencyLimit(priorityLevel string, limit int) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
// AddReject increments the # of rejected requests for flow control
|
// AddReject increments the # of rejected requests for flow control
|
||||||
func AddReject(priorityLevel, flowSchema, reason string) {
|
func AddReject(ctx context.Context, priorityLevel, flowSchema, reason string) {
|
||||||
apiserverRejectedRequestsTotal.WithLabelValues(priorityLevel, flowSchema, reason).Add(1)
|
apiserverRejectedRequestsTotal.WithContext(ctx).WithLabelValues(priorityLevel, flowSchema, reason).Add(1)
|
||||||
}
|
}
|
||||||
|
|
||||||
// AddDispatch increments the # of dispatched requests for flow control
|
// AddDispatch increments the # of dispatched requests for flow control
|
||||||
func AddDispatch(priorityLevel, flowSchema string) {
|
func AddDispatch(ctx context.Context, priorityLevel, flowSchema string) {
|
||||||
apiserverDispatchedRequestsTotal.WithLabelValues(priorityLevel, flowSchema).Add(1)
|
apiserverDispatchedRequestsTotal.WithContext(ctx).WithLabelValues(priorityLevel, flowSchema).Add(1)
|
||||||
}
|
}
|
||||||
|
|
||||||
// ObserveQueueLength observes the queue length for flow control
|
// ObserveQueueLength observes the queue length for flow control
|
||||||
func ObserveQueueLength(priorityLevel, flowSchema string, length int) {
|
func ObserveQueueLength(ctx context.Context, priorityLevel, flowSchema string, length int) {
|
||||||
apiserverRequestQueueLength.WithLabelValues(priorityLevel, flowSchema).Observe(float64(length))
|
apiserverRequestQueueLength.WithContext(ctx).WithLabelValues(priorityLevel, flowSchema).Observe(float64(length))
|
||||||
}
|
}
|
||||||
|
|
||||||
// ObserveWaitingDuration observes the queue length for flow control
|
// ObserveWaitingDuration observes the queue length for flow control
|
||||||
func ObserveWaitingDuration(priorityLevel, flowSchema, execute string, waitTime time.Duration) {
|
func ObserveWaitingDuration(ctx context.Context, priorityLevel, flowSchema, execute string, waitTime time.Duration) {
|
||||||
apiserverRequestWaitingSeconds.WithLabelValues(priorityLevel, flowSchema, execute).Observe(waitTime.Seconds())
|
apiserverRequestWaitingSeconds.WithContext(ctx).WithLabelValues(priorityLevel, flowSchema, execute).Observe(waitTime.Seconds())
|
||||||
}
|
}
|
||||||
|
|
||||||
// ObserveExecutionDuration observes the execution duration for flow control
|
// ObserveExecutionDuration observes the execution duration for flow control
|
||||||
func ObserveExecutionDuration(priorityLevel, flowSchema string, executionTime time.Duration) {
|
func ObserveExecutionDuration(ctx context.Context, priorityLevel, flowSchema string, executionTime time.Duration) {
|
||||||
apiserverRequestExecutionSeconds.WithLabelValues(priorityLevel, flowSchema).Observe(executionTime.Seconds())
|
apiserverRequestExecutionSeconds.WithContext(ctx).WithLabelValues(priorityLevel, flowSchema).Observe(executionTime.Seconds())
|
||||||
}
|
}
|
||||||
|
Loading…
Reference in New Issue
Block a user