mirror of
https://github.com/k3s-io/kubernetes.git
synced 2025-07-23 19:56:01 +00:00
fix the deadlock in priority and fairness config controller
This commit is contained in:
parent
f6ccec3c14
commit
7114319b3e
@ -78,6 +78,7 @@ go_test(
|
|||||||
"//staging/src/k8s.io/api/flowcontrol/v1beta1:go_default_library",
|
"//staging/src/k8s.io/api/flowcontrol/v1beta1:go_default_library",
|
||||||
"//staging/src/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library",
|
"//staging/src/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library",
|
||||||
"//staging/src/k8s.io/apimachinery/pkg/util/sets:go_default_library",
|
"//staging/src/k8s.io/apimachinery/pkg/util/sets:go_default_library",
|
||||||
|
"//staging/src/k8s.io/apimachinery/pkg/util/wait:go_default_library",
|
||||||
"//staging/src/k8s.io/apiserver/pkg/apis/flowcontrol/bootstrap:go_default_library",
|
"//staging/src/k8s.io/apiserver/pkg/apis/flowcontrol/bootstrap:go_default_library",
|
||||||
"//staging/src/k8s.io/apiserver/pkg/authentication/user:go_default_library",
|
"//staging/src/k8s.io/apiserver/pkg/authentication/user:go_default_library",
|
||||||
"//staging/src/k8s.io/apiserver/pkg/endpoints/request:go_default_library",
|
"//staging/src/k8s.io/apiserver/pkg/endpoints/request:go_default_library",
|
||||||
|
@ -34,6 +34,7 @@ import (
|
|||||||
"k8s.io/apimachinery/pkg/labels"
|
"k8s.io/apimachinery/pkg/labels"
|
||||||
apitypes "k8s.io/apimachinery/pkg/types"
|
apitypes "k8s.io/apimachinery/pkg/types"
|
||||||
apierrors "k8s.io/apimachinery/pkg/util/errors"
|
apierrors "k8s.io/apimachinery/pkg/util/errors"
|
||||||
|
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
|
||||||
"k8s.io/apimachinery/pkg/util/wait"
|
"k8s.io/apimachinery/pkg/util/wait"
|
||||||
fcboot "k8s.io/apiserver/pkg/apis/flowcontrol/bootstrap"
|
fcboot "k8s.io/apiserver/pkg/apis/flowcontrol/bootstrap"
|
||||||
"k8s.io/apiserver/pkg/authentication/user"
|
"k8s.io/apiserver/pkg/authentication/user"
|
||||||
@ -232,14 +233,28 @@ func (cfgCtlr *configController) updateObservations() {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// used from the unit tests only.
|
||||||
|
func (cfgCtlr *configController) getPriorityLevelState(plName string) *priorityLevelState {
|
||||||
|
cfgCtlr.lock.Lock()
|
||||||
|
defer cfgCtlr.lock.Unlock()
|
||||||
|
return cfgCtlr.priorityLevelStates[plName]
|
||||||
|
}
|
||||||
|
|
||||||
func (cfgCtlr *configController) Run(stopCh <-chan struct{}) error {
|
func (cfgCtlr *configController) Run(stopCh <-chan struct{}) error {
|
||||||
|
defer utilruntime.HandleCrash()
|
||||||
|
|
||||||
|
// Let the config worker stop when we are done
|
||||||
defer cfgCtlr.configQueue.ShutDown()
|
defer cfgCtlr.configQueue.ShutDown()
|
||||||
|
|
||||||
klog.Info("Starting API Priority and Fairness config controller")
|
klog.Info("Starting API Priority and Fairness config controller")
|
||||||
if ok := cache.WaitForCacheSync(stopCh, cfgCtlr.plInformerSynced, cfgCtlr.fsInformerSynced); !ok {
|
if ok := cache.WaitForCacheSync(stopCh, cfgCtlr.plInformerSynced, cfgCtlr.fsInformerSynced); !ok {
|
||||||
return fmt.Errorf("Never achieved initial sync")
|
return fmt.Errorf("Never achieved initial sync")
|
||||||
}
|
}
|
||||||
|
|
||||||
klog.Info("Running API Priority and Fairness config worker")
|
klog.Info("Running API Priority and Fairness config worker")
|
||||||
wait.Until(cfgCtlr.runWorker, time.Second, stopCh)
|
go wait.Until(cfgCtlr.runWorker, time.Second, stopCh)
|
||||||
|
|
||||||
|
<-stopCh
|
||||||
klog.Info("Shutting down API Priority and Fairness config worker")
|
klog.Info("Shutting down API Priority and Fairness config worker")
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
@ -21,12 +21,15 @@ import (
|
|||||||
"fmt"
|
"fmt"
|
||||||
"math/rand"
|
"math/rand"
|
||||||
"os"
|
"os"
|
||||||
|
"reflect"
|
||||||
"sync"
|
"sync"
|
||||||
"testing"
|
"testing"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
flowcontrol "k8s.io/api/flowcontrol/v1beta1"
|
flowcontrol "k8s.io/api/flowcontrol/v1beta1"
|
||||||
|
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||||
"k8s.io/apimachinery/pkg/util/sets"
|
"k8s.io/apimachinery/pkg/util/sets"
|
||||||
|
"k8s.io/apimachinery/pkg/util/wait"
|
||||||
fcboot "k8s.io/apiserver/pkg/apis/flowcontrol/bootstrap"
|
fcboot "k8s.io/apiserver/pkg/apis/flowcontrol/bootstrap"
|
||||||
"k8s.io/apiserver/pkg/util/flowcontrol/debug"
|
"k8s.io/apiserver/pkg/util/flowcontrol/debug"
|
||||||
fq "k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing"
|
fq "k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing"
|
||||||
@ -307,6 +310,106 @@ func TestConfigConsumer(t *testing.T) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func TestAPFControllerWithGracefulShutdown(t *testing.T) {
|
||||||
|
const plName = "test-ps"
|
||||||
|
fs := &flowcontrol.FlowSchema{
|
||||||
|
ObjectMeta: metav1.ObjectMeta{
|
||||||
|
Name: "test-fs",
|
||||||
|
},
|
||||||
|
Spec: flowcontrol.FlowSchemaSpec{
|
||||||
|
MatchingPrecedence: 100,
|
||||||
|
PriorityLevelConfiguration: flowcontrol.PriorityLevelConfigurationReference{
|
||||||
|
Name: plName,
|
||||||
|
},
|
||||||
|
DistinguisherMethod: &flowcontrol.FlowDistinguisherMethod{
|
||||||
|
Type: flowcontrol.FlowDistinguisherMethodByUserType,
|
||||||
|
},
|
||||||
|
},
|
||||||
|
}
|
||||||
|
|
||||||
|
pl := &flowcontrol.PriorityLevelConfiguration{
|
||||||
|
ObjectMeta: metav1.ObjectMeta{
|
||||||
|
Name: plName,
|
||||||
|
},
|
||||||
|
Spec: flowcontrol.PriorityLevelConfigurationSpec{
|
||||||
|
Type: flowcontrol.PriorityLevelEnablementLimited,
|
||||||
|
Limited: &flowcontrol.LimitedPriorityLevelConfiguration{
|
||||||
|
AssuredConcurrencyShares: 10,
|
||||||
|
LimitResponse: flowcontrol.LimitResponse{
|
||||||
|
Type: flowcontrol.LimitResponseTypeReject,
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
}
|
||||||
|
|
||||||
|
clientset := clientsetfake.NewSimpleClientset(fs, pl)
|
||||||
|
informerFactory := informers.NewSharedInformerFactory(clientset, time.Second)
|
||||||
|
flowcontrolClient := clientset.FlowcontrolV1beta1()
|
||||||
|
cts := &ctlrTestState{t: t,
|
||||||
|
fcIfc: flowcontrolClient,
|
||||||
|
existingFSs: map[string]*flowcontrol.FlowSchema{},
|
||||||
|
existingPLs: map[string]*flowcontrol.PriorityLevelConfiguration{},
|
||||||
|
heldRequestsMap: map[string][]heldRequest{},
|
||||||
|
queues: map[string]*ctlrTestQueueSet{},
|
||||||
|
}
|
||||||
|
controller := newTestableController(TestableConfig{
|
||||||
|
informerFactory,
|
||||||
|
flowcontrolClient,
|
||||||
|
100,
|
||||||
|
time.Minute,
|
||||||
|
metrics.PriorityLevelConcurrencyObserverPairGenerator,
|
||||||
|
cts,
|
||||||
|
})
|
||||||
|
|
||||||
|
stopCh, controllerCompletedCh := make(chan struct{}), make(chan struct{})
|
||||||
|
var controllerErr error
|
||||||
|
|
||||||
|
informerFactory.Start(stopCh)
|
||||||
|
|
||||||
|
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
|
||||||
|
defer cancel()
|
||||||
|
status := informerFactory.WaitForCacheSync(ctx.Done())
|
||||||
|
if names := unsynced(status); len(names) > 0 {
|
||||||
|
t.Fatalf("WaitForCacheSync did not successfully complete, resources=%#v", names)
|
||||||
|
}
|
||||||
|
|
||||||
|
go func() {
|
||||||
|
defer close(controllerCompletedCh)
|
||||||
|
controllerErr = controller.Run(stopCh)
|
||||||
|
}()
|
||||||
|
|
||||||
|
// ensure that the controller has run its first loop.
|
||||||
|
err := wait.PollImmediate(100*time.Millisecond, 5*time.Second, func() (done bool, err error) {
|
||||||
|
if controller.getPriorityLevelState(plName) == nil {
|
||||||
|
return false, nil
|
||||||
|
}
|
||||||
|
return true, nil
|
||||||
|
})
|
||||||
|
if err != nil {
|
||||||
|
t.Errorf("expected the controller to reconcile the priority level configuration object: %s, error: %s", plName, err)
|
||||||
|
}
|
||||||
|
|
||||||
|
close(stopCh)
|
||||||
|
t.Log("waiting for the controller Run function to shutdown gracefully")
|
||||||
|
<-controllerCompletedCh
|
||||||
|
|
||||||
|
if controllerErr != nil {
|
||||||
|
t.Errorf("expected nil error from controller Run function, but got: %#v", controllerErr)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func unsynced(status map[reflect.Type]bool) []string {
|
||||||
|
names := make([]string, 0)
|
||||||
|
|
||||||
|
for objType, synced := range status {
|
||||||
|
if !synced {
|
||||||
|
names = append(names, objType.Name())
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return names
|
||||||
|
}
|
||||||
|
|
||||||
func checkNewFS(cts *ctlrTestState, rng *rand.Rand, trialName string, ftr *fsTestingRecord, catchAlls map[bool]*flowcontrol.FlowSchema) {
|
func checkNewFS(cts *ctlrTestState, rng *rand.Rand, trialName string, ftr *fsTestingRecord, catchAlls map[bool]*flowcontrol.FlowSchema) {
|
||||||
t := cts.t
|
t := cts.t
|
||||||
ctlr := cts.cfgCtlr
|
ctlr := cts.cfgCtlr
|
||||||
|
Loading…
Reference in New Issue
Block a user