mirror of
https://github.com/k3s-io/kubernetes.git
synced 2025-07-23 11:50:44 +00:00
Merge pull request #57610 from vikaschoudhary16/remove-redundant-sleep
Automatic merge from submit-queue. If you want to cherry-pick this change to another branch, please follow the instructions <a href="https://github.com/kubernetes/community/blob/master/contributors/devel/cherry-picks.md">here</a>. Remove redundant sleep from ReRegistration unit test case /kind cleanup /sig node **What this PR does / why we need it**: Once upon a time, there was a race in the device plugin registration logic. At that time, [list()](5cac9fc984/pkg/kubelet/deviceplugin/manager.go (L206)
) and [listAndWatch()](5cac9fc984/pkg/kubelet/deviceplugin/manager.go (L224)
) used to be separate functions. Race was there for taking manager.mutex lock from two places. [One, from within the m.addEndpoint()](5cac9fc984/pkg/kubelet/deviceplugin/manager.go (L214)
) and the [second, from within m.Devices()](5cac9fc984/pkg/kubelet/deviceplugin/manager.go (L137)
). This race was making `TestDevicePluginReRegistration` flaky as explained below. ``` 1. p1.Register(socketName, testResourceName) 2. // Wait for the first callback to be issued. 3. <-callbackChan 4. devices := m.Devices() ``` * L#1 leads to eventually **asynchronous** invocation of m.addEndpoint(), let say **thread1**. * L#3 holds the test case execution till the [callback gets invoked](5cac9fc984/pkg/kubelet/deviceplugin/endpoint.go (L108)
). This means test case execution waits on channel till the **thread1** reaches the point where [e.list() call completes in the addEndpoint.](5cac9fc984/pkg/kubelet/deviceplugin/manager.go (L206)
) * L#4 triggers a new thread. thread1 and this new thread are both racing for m.mutex.Lock(). Former, in the addEndpoint() and later one in the m.Devices(). If m.Devices wins the race, result is the test case failure because endpoint gets added in the manager only after taking mutex.Lock() in the addEndpoint(). To deal with this flake, we added `Sleep` between L#3 and L#4. `Sleep` was getting some extra time to addEndpoint() and thus making thread1 win the race each time. Above explained race scenario got fixed and merged sometime back in this PR: [Deviceplugin refactoring: merge func list and listwatch in endpoint into one](https://github.com/kubernetes/kubernetes/pull/52149) With the above PR, callback function is invoked from e.run() which makes sure that test case waits on channel till the endpoint is added and devices are updated Above explained race scenario does not exist now, therefore removing redundant sleeps from the test case. Tested: go test -race -count 500 k8s.io/kubernetes/pkg/kubelet/cm/deviceplugin -run TestDevicePluginReRegistration -timeout 5h Related #52616 #56026 **Special notes for your reviewer**: **Release note**: ```release-note None ``` /cc @vishh @derekwaynecarr @jiayingz @RenaudWasTaken @lichuqiang @ScorpioCPH @tengqm @mindprince @ConnorDoyle @jeremyeder
This commit is contained in:
commit
a4eb2f96d0
@ -24,7 +24,6 @@ import (
|
||||
"reflect"
|
||||
"sync/atomic"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/stretchr/testify/assert"
|
||||
"github.com/stretchr/testify/require"
|
||||
@ -68,36 +67,29 @@ func TestDevicePluginReRegistration(t *testing.T) {
|
||||
{ID: "Dev3", Health: pluginapi.Healthy},
|
||||
}
|
||||
|
||||
callbackCount := 0
|
||||
callbackChan := make(chan int)
|
||||
var stopping int32
|
||||
stopping = 0
|
||||
expCallbackCount := int32(0)
|
||||
callbackCount := int32(0)
|
||||
callbackChan := make(chan int32)
|
||||
callback := func(n string, a, u, r []pluginapi.Device) {
|
||||
// Should be called three times, one for each plugin registration, till we are stopping.
|
||||
if callbackCount > 2 && atomic.LoadInt32(&stopping) <= 0 {
|
||||
callbackCount++
|
||||
if callbackCount > atomic.LoadInt32(&expCallbackCount) {
|
||||
t.FailNow()
|
||||
}
|
||||
callbackCount++
|
||||
callbackChan <- callbackCount
|
||||
}
|
||||
m, p1 := setup(t, devs, callback)
|
||||
atomic.StoreInt32(&expCallbackCount, 1)
|
||||
p1.Register(socketName, testResourceName)
|
||||
// Wait for the first callback to be issued.
|
||||
|
||||
<-callbackChan
|
||||
// Wait till the endpoint is added to the manager.
|
||||
for i := 0; i < 20; i++ {
|
||||
if len(m.Devices()) > 0 {
|
||||
break
|
||||
}
|
||||
time.Sleep(1)
|
||||
}
|
||||
devices := m.Devices()
|
||||
require.Equal(t, 2, len(devices[testResourceName]), "Devices are not updated.")
|
||||
|
||||
p2 := NewDevicePluginStub(devs, pluginSocketName+".new")
|
||||
err := p2.Start()
|
||||
require.NoError(t, err)
|
||||
atomic.StoreInt32(&expCallbackCount, 2)
|
||||
p2.Register(socketName, testResourceName)
|
||||
// Wait for the second callback to be issued.
|
||||
<-callbackChan
|
||||
@ -109,20 +101,17 @@ func TestDevicePluginReRegistration(t *testing.T) {
|
||||
p3 := NewDevicePluginStub(devsForRegistration, pluginSocketName+".third")
|
||||
err = p3.Start()
|
||||
require.NoError(t, err)
|
||||
atomic.StoreInt32(&expCallbackCount, 3)
|
||||
p3.Register(socketName, testResourceName)
|
||||
// Wait for the second callback to be issued.
|
||||
<-callbackChan
|
||||
|
||||
devices3 := m.Devices()
|
||||
require.Equal(t, 1, len(devices3[testResourceName]), "Devices of plugin previously registered should be removed.")
|
||||
// Wait long enough to catch unexpected callbacks.
|
||||
time.Sleep(5 * time.Second)
|
||||
|
||||
atomic.StoreInt32(&stopping, 1)
|
||||
p2.Stop()
|
||||
p3.Stop()
|
||||
cleanup(t, m, p1)
|
||||
|
||||
close(callbackChan)
|
||||
}
|
||||
|
||||
func setup(t *testing.T, devs []*pluginapi.Device, callback monitorCallback) (Manager, *Stub) {
|
||||
|
Loading…
Reference in New Issue
Block a user