mirror of
https://github.com/k3s-io/kubernetes.git
synced 2025-07-23 19:56:01 +00:00
Add multi request test
This commit is contained in:
parent
ce00f4ee54
commit
833ce487b9
@ -22,6 +22,7 @@ import (
|
|||||||
"fmt"
|
"fmt"
|
||||||
"net/http"
|
"net/http"
|
||||||
"net/http/httptest"
|
"net/http/httptest"
|
||||||
|
"sync"
|
||||||
"testing"
|
"testing"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
@ -39,8 +40,10 @@ import (
|
|||||||
"k8s.io/component-base/metrics/legacyregistry"
|
"k8s.io/component-base/metrics/legacyregistry"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
type mockDecision int
|
||||||
|
|
||||||
const (
|
const (
|
||||||
decisionNoQueuingExecute = iota
|
decisionNoQueuingExecute mockDecision = iota
|
||||||
decisionQueuingExecute
|
decisionQueuingExecute
|
||||||
decisionCancelWait
|
decisionCancelWait
|
||||||
decisionReject
|
decisionReject
|
||||||
@ -48,7 +51,7 @@ const (
|
|||||||
)
|
)
|
||||||
|
|
||||||
type fakeApfFilter struct {
|
type fakeApfFilter struct {
|
||||||
mockDecision int
|
mockDecision mockDecision
|
||||||
postEnqueue func()
|
postEnqueue func()
|
||||||
postDequeue func()
|
postDequeue func()
|
||||||
}
|
}
|
||||||
@ -92,29 +95,41 @@ func (t fakeApfFilter) Run(stopCh <-chan struct{}) error {
|
|||||||
func (t fakeApfFilter) Install(c *mux.PathRecorderMux) {
|
func (t fakeApfFilter) Install(c *mux.PathRecorderMux) {
|
||||||
}
|
}
|
||||||
|
|
||||||
func newApfServer(decision int, t *testing.T) *httptest.Server {
|
func newApfServerWithSingleRequest(decision mockDecision, t *testing.T) *httptest.Server {
|
||||||
|
onExecuteFunc := func() {
|
||||||
|
if decision == decisionCancelWait {
|
||||||
|
t.Errorf("execute should not be invoked")
|
||||||
|
}
|
||||||
|
// atomicReadOnlyExecuting can be either 0 or 1 as we test one request at a time.
|
||||||
|
if decision != decisionSkipFilter && atomicReadOnlyExecuting != 1 {
|
||||||
|
t.Errorf("Wanted %d requests executing, got %d", 1, atomicReadOnlyExecuting)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
postExecuteFunc := func() {}
|
||||||
|
// atomicReadOnlyWaiting can be either 0 or 1 as we test one request at a time.
|
||||||
|
postEnqueueFunc := func() {
|
||||||
|
if atomicReadOnlyWaiting != 1 {
|
||||||
|
t.Errorf("Wanted %d requests in queue, got %d", 1, atomicReadOnlyWaiting)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
postDequeueFunc := func() {
|
||||||
|
if atomicReadOnlyWaiting != 0 {
|
||||||
|
t.Errorf("Wanted %d requests in queue, got %d", 0, atomicReadOnlyWaiting)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return newApfServerWithHooks(decision, onExecuteFunc, postExecuteFunc, postEnqueueFunc, postDequeueFunc, t)
|
||||||
|
}
|
||||||
|
|
||||||
|
func newApfServerWithHooks(decision mockDecision, onExecute, postExecute, postEnqueue, postDequeue func(), t *testing.T) *httptest.Server {
|
||||||
requestInfoFactory := &apirequest.RequestInfoFactory{APIPrefixes: sets.NewString("apis", "api"), GrouplessAPIPrefixes: sets.NewString("api")}
|
requestInfoFactory := &apirequest.RequestInfoFactory{APIPrefixes: sets.NewString("apis", "api"), GrouplessAPIPrefixes: sets.NewString("api")}
|
||||||
longRunningRequestCheck := BasicLongRunningRequestCheck(sets.NewString("watch"), sets.NewString("proxy"))
|
longRunningRequestCheck := BasicLongRunningRequestCheck(sets.NewString("watch"), sets.NewString("proxy"))
|
||||||
|
|
||||||
apfHandler := WithPriorityAndFairness(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
apfHandler := WithPriorityAndFairness(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
||||||
if decision == decisionCancelWait {
|
onExecute()
|
||||||
t.Errorf("execute should not be invoked")
|
|
||||||
}
|
|
||||||
if decision != decisionSkipFilter && atomicReadOnlyExecuting != 1 {
|
|
||||||
t.Errorf("Wanted %d requests executing, got %d", 1, atomicReadOnlyExecuting)
|
|
||||||
}
|
|
||||||
}), longRunningRequestCheck, fakeApfFilter{
|
}), longRunningRequestCheck, fakeApfFilter{
|
||||||
mockDecision: decision,
|
mockDecision: decision,
|
||||||
postEnqueue: func() {
|
postEnqueue: postEnqueue,
|
||||||
if atomicReadOnlyWaiting != 1 {
|
postDequeue: postDequeue,
|
||||||
t.Errorf("Wanted %d requests in queue, got %d", 1, atomicReadOnlyWaiting)
|
|
||||||
}
|
|
||||||
},
|
|
||||||
postDequeue: func() {
|
|
||||||
if atomicReadOnlyWaiting != 0 {
|
|
||||||
t.Errorf("Wanted %d requests in queue, got %d", 0, atomicReadOnlyWaiting)
|
|
||||||
}
|
|
||||||
},
|
|
||||||
})
|
})
|
||||||
|
|
||||||
handler := apifilters.WithRequestInfo(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
handler := apifilters.WithRequestInfo(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
||||||
@ -122,6 +137,7 @@ func newApfServer(decision int, t *testing.T) *httptest.Server {
|
|||||||
Groups: []string{user.AllUnauthenticated},
|
Groups: []string{user.AllUnauthenticated},
|
||||||
}))
|
}))
|
||||||
apfHandler.ServeHTTP(w, r)
|
apfHandler.ServeHTTP(w, r)
|
||||||
|
postExecute()
|
||||||
if atomicReadOnlyExecuting != 0 {
|
if atomicReadOnlyExecuting != 0 {
|
||||||
t.Errorf("Wanted %d requests executing, got %d", 0, atomicReadOnlyExecuting)
|
t.Errorf("Wanted %d requests executing, got %d", 0, atomicReadOnlyExecuting)
|
||||||
}
|
}
|
||||||
@ -134,9 +150,10 @@ func newApfServer(decision int, t *testing.T) *httptest.Server {
|
|||||||
func TestApfSkipLongRunningRequest(t *testing.T) {
|
func TestApfSkipLongRunningRequest(t *testing.T) {
|
||||||
epmetrics.Register()
|
epmetrics.Register()
|
||||||
|
|
||||||
server := newApfServer(decisionSkipFilter, t)
|
server := newApfServerWithSingleRequest(decisionSkipFilter, t)
|
||||||
defer server.Close()
|
defer server.Close()
|
||||||
|
|
||||||
|
// send a watch request to test skipping long running request
|
||||||
if err := expectHTTPGet(fmt.Sprintf("%s/api/v1/namespaces?watch=true", server.URL), http.StatusOK); err != nil {
|
if err := expectHTTPGet(fmt.Sprintf("%s/api/v1/namespaces?watch=true", server.URL), http.StatusOK); err != nil {
|
||||||
// request should not be rejected
|
// request should not be rejected
|
||||||
t.Error(err)
|
t.Error(err)
|
||||||
@ -146,7 +163,7 @@ func TestApfSkipLongRunningRequest(t *testing.T) {
|
|||||||
func TestApfRejectRequest(t *testing.T) {
|
func TestApfRejectRequest(t *testing.T) {
|
||||||
epmetrics.Register()
|
epmetrics.Register()
|
||||||
|
|
||||||
server := newApfServer(decisionReject, t)
|
server := newApfServerWithSingleRequest(decisionReject, t)
|
||||||
defer server.Close()
|
defer server.Close()
|
||||||
|
|
||||||
if err := expectHTTPGet(fmt.Sprintf("%s/api/v1/namespaces/default", server.URL), http.StatusTooManyRequests); err != nil {
|
if err := expectHTTPGet(fmt.Sprintf("%s/api/v1/namespaces/default", server.URL), http.StatusTooManyRequests); err != nil {
|
||||||
@ -163,10 +180,11 @@ func TestApfExemptRequest(t *testing.T) {
|
|||||||
epmetrics.Register()
|
epmetrics.Register()
|
||||||
fcmetrics.Register()
|
fcmetrics.Register()
|
||||||
|
|
||||||
// wait the first sampleAndWaterMark metrics to be collected
|
// Wait for at least one sampling window to pass since creation of metrics.ReadWriteConcurrencyObserverPairGenerator,
|
||||||
|
// so that an observation will cause some data to go into the Prometheus metrics.
|
||||||
time.Sleep(time.Millisecond * 50)
|
time.Sleep(time.Millisecond * 50)
|
||||||
|
|
||||||
server := newApfServer(decisionNoQueuingExecute, t)
|
server := newApfServerWithSingleRequest(decisionNoQueuingExecute, t)
|
||||||
defer server.Close()
|
defer server.Close()
|
||||||
|
|
||||||
if err := expectHTTPGet(fmt.Sprintf("%s/api/v1/namespaces/default", server.URL), http.StatusOK); err != nil {
|
if err := expectHTTPGet(fmt.Sprintf("%s/api/v1/namespaces/default", server.URL), http.StatusOK); err != nil {
|
||||||
@ -184,10 +202,11 @@ func TestApfExecuteRequest(t *testing.T) {
|
|||||||
epmetrics.Register()
|
epmetrics.Register()
|
||||||
fcmetrics.Register()
|
fcmetrics.Register()
|
||||||
|
|
||||||
// wait the first sampleAndWaterMark metrics to be collected
|
// Wait for at least one sampling window to pass since creation of metrics.ReadWriteConcurrencyObserverPairGenerator,
|
||||||
|
// so that an observation will cause some data to go into the Prometheus metrics.
|
||||||
time.Sleep(time.Millisecond * 50)
|
time.Sleep(time.Millisecond * 50)
|
||||||
|
|
||||||
server := newApfServer(decisionQueuingExecute, t)
|
server := newApfServerWithSingleRequest(decisionQueuingExecute, t)
|
||||||
defer server.Close()
|
defer server.Close()
|
||||||
|
|
||||||
if err := expectHTTPGet(fmt.Sprintf("%s/api/v1/namespaces/default", server.URL), http.StatusOK); err != nil {
|
if err := expectHTTPGet(fmt.Sprintf("%s/api/v1/namespaces/default", server.URL), http.StatusOK); err != nil {
|
||||||
@ -202,10 +221,81 @@ func TestApfExecuteRequest(t *testing.T) {
|
|||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func TestApfExecuteMultipleRequests(t *testing.T) {
|
||||||
|
epmetrics.Register()
|
||||||
|
fcmetrics.Register()
|
||||||
|
|
||||||
|
// Wait for at least one sampling window to pass since creation of metrics.ReadWriteConcurrencyObserverPairGenerator,
|
||||||
|
// so that an observation will cause some data to go into the Prometheus metrics.
|
||||||
|
time.Sleep(time.Millisecond * 50)
|
||||||
|
|
||||||
|
concurrentRequests := 5
|
||||||
|
var preStartExecute, postStartExecute, preEnqueue, postEnqueue, preDequeue, postDequeue, finishExecute sync.WaitGroup
|
||||||
|
for _, wg := range []*sync.WaitGroup{&preStartExecute, &postStartExecute, &preEnqueue, &postEnqueue, &preDequeue, &postDequeue, &finishExecute} {
|
||||||
|
wg.Add(concurrentRequests)
|
||||||
|
}
|
||||||
|
|
||||||
|
onExecuteFunc := func() {
|
||||||
|
preStartExecute.Done()
|
||||||
|
preStartExecute.Wait()
|
||||||
|
if int(atomicReadOnlyExecuting) != concurrentRequests {
|
||||||
|
t.Errorf("Wanted %d requests executing, got %d", concurrentRequests, atomicReadOnlyExecuting)
|
||||||
|
}
|
||||||
|
postStartExecute.Done()
|
||||||
|
postStartExecute.Wait()
|
||||||
|
}
|
||||||
|
|
||||||
|
postEnqueueFunc := func() {
|
||||||
|
preEnqueue.Done()
|
||||||
|
preEnqueue.Wait()
|
||||||
|
if int(atomicReadOnlyWaiting) != concurrentRequests {
|
||||||
|
t.Errorf("Wanted %d requests in queue, got %d", 1, atomicReadOnlyWaiting)
|
||||||
|
|
||||||
|
}
|
||||||
|
postEnqueue.Done()
|
||||||
|
postEnqueue.Wait()
|
||||||
|
}
|
||||||
|
|
||||||
|
postDequeueFunc := func() {
|
||||||
|
preDequeue.Done()
|
||||||
|
preDequeue.Wait()
|
||||||
|
if atomicReadOnlyWaiting != 0 {
|
||||||
|
t.Errorf("Wanted %d requests in queue, got %d", 0, atomicReadOnlyWaiting)
|
||||||
|
}
|
||||||
|
postDequeue.Done()
|
||||||
|
postDequeue.Wait()
|
||||||
|
}
|
||||||
|
|
||||||
|
postExecuteFunc := func() {
|
||||||
|
finishExecute.Done()
|
||||||
|
finishExecute.Wait()
|
||||||
|
}
|
||||||
|
|
||||||
|
server := newApfServerWithHooks(decisionQueuingExecute, onExecuteFunc, postExecuteFunc, postEnqueueFunc, postDequeueFunc, t)
|
||||||
|
defer server.Close()
|
||||||
|
|
||||||
|
for i := 0; i < concurrentRequests; i++ {
|
||||||
|
var err error
|
||||||
|
go func() {
|
||||||
|
err = expectHTTPGet(fmt.Sprintf("%s/api/v1/namespaces/default", server.URL), http.StatusOK)
|
||||||
|
}()
|
||||||
|
if err != nil {
|
||||||
|
t.Error(err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
checkForExpectedMetricsWithRetry(t, []string{
|
||||||
|
"apiserver_current_inflight_requests",
|
||||||
|
"apiserver_current_inqueue_requests",
|
||||||
|
"apiserver_flowcontrol_read_vs_write_request_count_watermarks",
|
||||||
|
"apiserver_flowcontrol_read_vs_write_request_count_samples",
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
func TestApfCancelWaitRequest(t *testing.T) {
|
func TestApfCancelWaitRequest(t *testing.T) {
|
||||||
epmetrics.Register()
|
epmetrics.Register()
|
||||||
|
|
||||||
server := newApfServer(decisionCancelWait, t)
|
server := newApfServerWithSingleRequest(decisionCancelWait, t)
|
||||||
defer server.Close()
|
defer server.Close()
|
||||||
|
|
||||||
if err := expectHTTPGet(fmt.Sprintf("%s/api/v1/namespaces/default", server.URL), http.StatusTooManyRequests); err != nil {
|
if err := expectHTTPGet(fmt.Sprintf("%s/api/v1/namespaces/default", server.URL), http.StatusTooManyRequests); err != nil {
|
||||||
@ -232,7 +322,6 @@ func checkForExpectedMetricsWithRetry(t *testing.T, expectedMetrics []string) {
|
|||||||
|
|
||||||
metrics := map[string]interface{}{}
|
metrics := map[string]interface{}{}
|
||||||
for _, mf := range metricsFamily {
|
for _, mf := range metricsFamily {
|
||||||
mf := mf
|
|
||||||
metrics[*mf.Name] = mf
|
metrics[*mf.Name] = mf
|
||||||
}
|
}
|
||||||
|
|
||||||
|
Loading…
Reference in New Issue
Block a user