mirror of
https://github.com/k3s-io/kubernetes.git
synced 2025-07-24 20:24:09 +00:00
Fix data race in PVC Run/Stop methods
This commit is contained in:
parent
c39b584ea2
commit
387f9ea952
@ -436,7 +436,7 @@ func StartControllers(s *options.CMServer, kubeClient *client.Client, kubeconfig
|
|||||||
nil, // eventRecorder
|
nil, // eventRecorder
|
||||||
s.VolumeConfiguration.EnableDynamicProvisioning,
|
s.VolumeConfiguration.EnableDynamicProvisioning,
|
||||||
)
|
)
|
||||||
volumeController.Run()
|
volumeController.Run(wait.NeverStop)
|
||||||
time.Sleep(wait.Jitter(s.ControllerStartInterval.Duration, ControllerStartJitter))
|
time.Sleep(wait.Jitter(s.ControllerStartInterval.Duration, ControllerStartJitter))
|
||||||
|
|
||||||
attachDetachController, attachDetachControllerErr :=
|
attachDetachController, attachDetachControllerErr :=
|
||||||
|
@ -302,7 +302,7 @@ func (s *CMServer) Run(_ []string) error {
|
|||||||
nil, // eventRecorder
|
nil, // eventRecorder
|
||||||
s.VolumeConfiguration.EnableDynamicProvisioning,
|
s.VolumeConfiguration.EnableDynamicProvisioning,
|
||||||
)
|
)
|
||||||
volumeController.Run()
|
volumeController.Run(wait.NeverStop)
|
||||||
|
|
||||||
var rootCA []byte
|
var rootCA []byte
|
||||||
|
|
||||||
|
@ -154,13 +154,10 @@ const createProvisionedPVInterval = 10 * time.Second
|
|||||||
// changes.
|
// changes.
|
||||||
type PersistentVolumeController struct {
|
type PersistentVolumeController struct {
|
||||||
volumeController *framework.Controller
|
volumeController *framework.Controller
|
||||||
volumeControllerStopCh chan struct{}
|
|
||||||
volumeSource cache.ListerWatcher
|
volumeSource cache.ListerWatcher
|
||||||
claimController *framework.Controller
|
claimController *framework.Controller
|
||||||
claimControllerStopCh chan struct{}
|
|
||||||
claimSource cache.ListerWatcher
|
claimSource cache.ListerWatcher
|
||||||
classReflector *cache.Reflector
|
classReflector *cache.Reflector
|
||||||
classReflectorStopCh chan struct{}
|
|
||||||
classSource cache.ListerWatcher
|
classSource cache.ListerWatcher
|
||||||
kubeClient clientset.Interface
|
kubeClient clientset.Interface
|
||||||
eventRecorder record.EventRecorder
|
eventRecorder record.EventRecorder
|
||||||
|
@ -442,33 +442,12 @@ func (ctrl *PersistentVolumeController) deleteClaim(obj interface{}) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
// Run starts all of this controller's control loops
|
// Run starts all of this controller's control loops
|
||||||
func (ctrl *PersistentVolumeController) Run() {
|
func (ctrl *PersistentVolumeController) Run(stopCh <-chan struct{}) {
|
||||||
glog.V(4).Infof("starting PersistentVolumeController")
|
glog.V(4).Infof("starting PersistentVolumeController")
|
||||||
|
|
||||||
ctrl.initializeCaches(ctrl.volumeSource, ctrl.claimSource)
|
ctrl.initializeCaches(ctrl.volumeSource, ctrl.claimSource)
|
||||||
|
go ctrl.volumeController.Run(stopCh)
|
||||||
if ctrl.volumeControllerStopCh == nil {
|
go ctrl.claimController.Run(stopCh)
|
||||||
ctrl.volumeControllerStopCh = make(chan struct{})
|
go ctrl.classReflector.RunUntil(stopCh)
|
||||||
go ctrl.volumeController.Run(ctrl.volumeControllerStopCh)
|
|
||||||
}
|
|
||||||
|
|
||||||
if ctrl.claimControllerStopCh == nil {
|
|
||||||
ctrl.claimControllerStopCh = make(chan struct{})
|
|
||||||
go ctrl.claimController.Run(ctrl.claimControllerStopCh)
|
|
||||||
}
|
|
||||||
|
|
||||||
if ctrl.classReflectorStopCh == nil {
|
|
||||||
ctrl.classReflectorStopCh = make(chan struct{})
|
|
||||||
go ctrl.classReflector.RunUntil(ctrl.classReflectorStopCh)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// Stop gracefully shuts down this controller
|
|
||||||
func (ctrl *PersistentVolumeController) Stop() {
|
|
||||||
glog.V(4).Infof("stopping PersistentVolumeController")
|
|
||||||
close(ctrl.volumeControllerStopCh)
|
|
||||||
close(ctrl.claimControllerStopCh)
|
|
||||||
close(ctrl.classReflectorStopCh)
|
|
||||||
}
|
}
|
||||||
|
|
||||||
const (
|
const (
|
||||||
|
@ -175,7 +175,8 @@ func TestControllerSync(t *testing.T) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
// Start the controller
|
// Start the controller
|
||||||
go ctrl.Run()
|
stopCh := make(chan struct{})
|
||||||
|
ctrl.Run(stopCh)
|
||||||
|
|
||||||
// Wait for the controller to pass initial sync and fill its caches.
|
// Wait for the controller to pass initial sync and fill its caches.
|
||||||
for !ctrl.volumeController.HasSynced() ||
|
for !ctrl.volumeController.HasSynced() ||
|
||||||
@ -201,7 +202,7 @@ func TestControllerSync(t *testing.T) {
|
|||||||
if err != nil {
|
if err != nil {
|
||||||
t.Errorf("Failed to run test %s: %v", test.name, err)
|
t.Errorf("Failed to run test %s: %v", test.name, err)
|
||||||
}
|
}
|
||||||
ctrl.Stop()
|
close(stopCh)
|
||||||
|
|
||||||
evaluateTestResults(ctrl, reactor, test, t)
|
evaluateTestResults(ctrl, reactor, test, t)
|
||||||
}
|
}
|
||||||
|
@ -123,8 +123,9 @@ func TestPersistentVolumeRecycler(t *testing.T) {
|
|||||||
// non-namespaced objects (PersistenceVolumes).
|
// non-namespaced objects (PersistenceVolumes).
|
||||||
defer testClient.Core().PersistentVolumes().DeleteCollection(nil, api.ListOptions{})
|
defer testClient.Core().PersistentVolumes().DeleteCollection(nil, api.ListOptions{})
|
||||||
|
|
||||||
ctrl.Run()
|
stopCh := make(chan struct{})
|
||||||
defer ctrl.Stop()
|
ctrl.Run(stopCh)
|
||||||
|
defer close(stopCh)
|
||||||
|
|
||||||
// This PV will be claimed, released, and recycled.
|
// This PV will be claimed, released, and recycled.
|
||||||
pv := createPV("fake-pv-recycler", "/tmp/foo", "10G", []api.PersistentVolumeAccessMode{api.ReadWriteOnce}, api.PersistentVolumeReclaimRecycle)
|
pv := createPV("fake-pv-recycler", "/tmp/foo", "10G", []api.PersistentVolumeAccessMode{api.ReadWriteOnce}, api.PersistentVolumeReclaimRecycle)
|
||||||
@ -176,8 +177,9 @@ func TestPersistentVolumeDeleter(t *testing.T) {
|
|||||||
// non-namespaced objects (PersistenceVolumes).
|
// non-namespaced objects (PersistenceVolumes).
|
||||||
defer testClient.Core().PersistentVolumes().DeleteCollection(nil, api.ListOptions{})
|
defer testClient.Core().PersistentVolumes().DeleteCollection(nil, api.ListOptions{})
|
||||||
|
|
||||||
ctrl.Run()
|
stopCh := make(chan struct{})
|
||||||
defer ctrl.Stop()
|
ctrl.Run(stopCh)
|
||||||
|
defer close(stopCh)
|
||||||
|
|
||||||
// This PV will be claimed, released, and deleted.
|
// This PV will be claimed, released, and deleted.
|
||||||
pv := createPV("fake-pv-deleter", "/tmp/foo", "10G", []api.PersistentVolumeAccessMode{api.ReadWriteOnce}, api.PersistentVolumeReclaimDelete)
|
pv := createPV("fake-pv-deleter", "/tmp/foo", "10G", []api.PersistentVolumeAccessMode{api.ReadWriteOnce}, api.PersistentVolumeReclaimDelete)
|
||||||
@ -234,8 +236,9 @@ func TestPersistentVolumeBindRace(t *testing.T) {
|
|||||||
// non-namespaced objects (PersistenceVolumes).
|
// non-namespaced objects (PersistenceVolumes).
|
||||||
defer testClient.Core().PersistentVolumes().DeleteCollection(nil, api.ListOptions{})
|
defer testClient.Core().PersistentVolumes().DeleteCollection(nil, api.ListOptions{})
|
||||||
|
|
||||||
ctrl.Run()
|
stopCh := make(chan struct{})
|
||||||
defer ctrl.Stop()
|
ctrl.Run(stopCh)
|
||||||
|
defer close(stopCh)
|
||||||
|
|
||||||
pv := createPV("fake-pv-race", "/tmp/foo", "10G", []api.PersistentVolumeAccessMode{api.ReadWriteOnce}, api.PersistentVolumeReclaimRetain)
|
pv := createPV("fake-pv-race", "/tmp/foo", "10G", []api.PersistentVolumeAccessMode{api.ReadWriteOnce}, api.PersistentVolumeReclaimRetain)
|
||||||
pvc := createPVC("fake-pvc-race", ns.Name, "5G", []api.PersistentVolumeAccessMode{api.ReadWriteOnce})
|
pvc := createPVC("fake-pvc-race", ns.Name, "5G", []api.PersistentVolumeAccessMode{api.ReadWriteOnce})
|
||||||
@ -304,8 +307,9 @@ func TestPersistentVolumeClaimLabelSelector(t *testing.T) {
|
|||||||
// non-namespaced objects (PersistenceVolumes).
|
// non-namespaced objects (PersistenceVolumes).
|
||||||
defer testClient.Core().PersistentVolumes().DeleteCollection(nil, api.ListOptions{})
|
defer testClient.Core().PersistentVolumes().DeleteCollection(nil, api.ListOptions{})
|
||||||
|
|
||||||
controller.Run()
|
stopCh := make(chan struct{})
|
||||||
defer controller.Stop()
|
controller.Run(stopCh)
|
||||||
|
defer close(stopCh)
|
||||||
|
|
||||||
var (
|
var (
|
||||||
err error
|
err error
|
||||||
@ -383,8 +387,9 @@ func TestPersistentVolumeClaimLabelSelectorMatchExpressions(t *testing.T) {
|
|||||||
// non-namespaced objects (PersistenceVolumes).
|
// non-namespaced objects (PersistenceVolumes).
|
||||||
defer testClient.Core().PersistentVolumes().DeleteCollection(nil, api.ListOptions{})
|
defer testClient.Core().PersistentVolumes().DeleteCollection(nil, api.ListOptions{})
|
||||||
|
|
||||||
controller.Run()
|
stopCh := make(chan struct{})
|
||||||
defer controller.Stop()
|
controller.Run(stopCh)
|
||||||
|
defer close(stopCh)
|
||||||
|
|
||||||
var (
|
var (
|
||||||
err error
|
err error
|
||||||
@ -481,8 +486,9 @@ func TestPersistentVolumeMultiPVs(t *testing.T) {
|
|||||||
// non-namespaced objects (PersistenceVolumes).
|
// non-namespaced objects (PersistenceVolumes).
|
||||||
defer testClient.Core().PersistentVolumes().DeleteCollection(nil, api.ListOptions{})
|
defer testClient.Core().PersistentVolumes().DeleteCollection(nil, api.ListOptions{})
|
||||||
|
|
||||||
controller.Run()
|
stopCh := make(chan struct{})
|
||||||
defer controller.Stop()
|
controller.Run(stopCh)
|
||||||
|
defer close(stopCh)
|
||||||
|
|
||||||
maxPVs := getObjectCount()
|
maxPVs := getObjectCount()
|
||||||
pvs := make([]*api.PersistentVolume, maxPVs)
|
pvs := make([]*api.PersistentVolume, maxPVs)
|
||||||
@ -569,8 +575,9 @@ func TestPersistentVolumeMultiPVsPVCs(t *testing.T) {
|
|||||||
// non-namespaced objects (PersistenceVolumes).
|
// non-namespaced objects (PersistenceVolumes).
|
||||||
defer testClient.Core().PersistentVolumes().DeleteCollection(nil, api.ListOptions{})
|
defer testClient.Core().PersistentVolumes().DeleteCollection(nil, api.ListOptions{})
|
||||||
|
|
||||||
binder.Run()
|
controllerStopCh := make(chan struct{})
|
||||||
defer binder.Stop()
|
binder.Run(controllerStopCh)
|
||||||
|
defer close(controllerStopCh)
|
||||||
|
|
||||||
objCount := getObjectCount()
|
objCount := getObjectCount()
|
||||||
pvs := make([]*api.PersistentVolume, objCount)
|
pvs := make([]*api.PersistentVolume, objCount)
|
||||||
@ -781,8 +788,9 @@ func TestPersistentVolumeControllerStartup(t *testing.T) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
// Start the controller when all PVs and PVCs are already saved in etcd
|
// Start the controller when all PVs and PVCs are already saved in etcd
|
||||||
binder.Run()
|
stopCh := make(chan struct{})
|
||||||
defer binder.Stop()
|
binder.Run(stopCh)
|
||||||
|
defer close(stopCh)
|
||||||
|
|
||||||
// wait for at least two sync periods for changes. No volume should be
|
// wait for at least two sync periods for changes. No volume should be
|
||||||
// Released and no claim should be Lost during this time.
|
// Released and no claim should be Lost during this time.
|
||||||
@ -867,8 +875,9 @@ func TestPersistentVolumeProvisionMultiPVCs(t *testing.T) {
|
|||||||
}
|
}
|
||||||
testClient.Extensions().StorageClasses().Create(&storageClass)
|
testClient.Extensions().StorageClasses().Create(&storageClass)
|
||||||
|
|
||||||
binder.Run()
|
stopCh := make(chan struct{})
|
||||||
defer binder.Stop()
|
binder.Run(stopCh)
|
||||||
|
defer close(stopCh)
|
||||||
|
|
||||||
objCount := getObjectCount()
|
objCount := getObjectCount()
|
||||||
pvcs := make([]*api.PersistentVolumeClaim, objCount)
|
pvcs := make([]*api.PersistentVolumeClaim, objCount)
|
||||||
@ -951,8 +960,9 @@ func TestPersistentVolumeMultiPVsDiffAccessModes(t *testing.T) {
|
|||||||
// non-namespaced objects (PersistenceVolumes).
|
// non-namespaced objects (PersistenceVolumes).
|
||||||
defer testClient.Core().PersistentVolumes().DeleteCollection(nil, api.ListOptions{})
|
defer testClient.Core().PersistentVolumes().DeleteCollection(nil, api.ListOptions{})
|
||||||
|
|
||||||
controller.Run()
|
stopCh := make(chan struct{})
|
||||||
defer controller.Stop()
|
controller.Run(stopCh)
|
||||||
|
defer close(stopCh)
|
||||||
|
|
||||||
// This PV will be claimed, released, and deleted
|
// This PV will be claimed, released, and deleted
|
||||||
pv_rwo := createPV("pv-rwo", "/tmp/foo", "10G",
|
pv_rwo := createPV("pv-rwo", "/tmp/foo", "10G",
|
||||||
|
Loading…
Reference in New Issue
Block a user