mirror of
https://github.com/k3s-io/kubernetes.git
synced 2025-09-14 13:45:06 +00:00
fix test flake due to dependency on time.Timer
This commit is contained in:
@@ -281,7 +281,7 @@ func taskRunning(t *Task) taskStateFn {
|
|||||||
|
|
||||||
select {
|
select {
|
||||||
case <-t.shouldQuit:
|
case <-t.shouldQuit:
|
||||||
t.tryComplete(t.awaitDeath(defaultKillGracePeriod, waitCh))
|
t.tryComplete(t.awaitDeath(&realTimer{}, defaultKillGracePeriod, waitCh))
|
||||||
case wr := <-waitCh:
|
case wr := <-waitCh:
|
||||||
t.tryComplete(wr)
|
t.tryComplete(wr)
|
||||||
}
|
}
|
||||||
@@ -290,7 +290,9 @@ func taskRunning(t *Task) taskStateFn {
|
|||||||
|
|
||||||
// awaitDeath waits for the process to complete, or else for a "quit" signal on the task-
|
// awaitDeath waits for the process to complete, or else for a "quit" signal on the task-
|
||||||
// at which point we'll attempt to kill manually.
|
// at which point we'll attempt to kill manually.
|
||||||
func (t *Task) awaitDeath(gracePeriod time.Duration, waitCh <-chan *Completion) *Completion {
|
func (t *Task) awaitDeath(timer timer, gracePeriod time.Duration, waitCh <-chan *Completion) *Completion {
|
||||||
|
defer timer.discard()
|
||||||
|
|
||||||
select {
|
select {
|
||||||
case wr := <-waitCh:
|
case wr := <-waitCh:
|
||||||
// got a signal to quit, but we're already finished
|
// got a signal to quit, but we're already finished
|
||||||
@@ -318,10 +320,11 @@ waitLoop:
|
|||||||
}
|
}
|
||||||
|
|
||||||
// Wait for the kill to be processed, and child proc resources cleaned up; try to avoid zombies!
|
// Wait for the kill to be processed, and child proc resources cleaned up; try to avoid zombies!
|
||||||
|
timer.set(gracePeriod)
|
||||||
select {
|
select {
|
||||||
case wr = <-waitCh:
|
case wr = <-waitCh:
|
||||||
break waitLoop
|
break waitLoop
|
||||||
case <-time.After(gracePeriod):
|
case <-timer.await():
|
||||||
// want a timeout, but a shorter one than we used initially.
|
// want a timeout, but a shorter one than we used initially.
|
||||||
// using /= 2 is deterministic and yields the desirable effect.
|
// using /= 2 is deterministic and yields the desirable effect.
|
||||||
gracePeriod /= 2
|
gracePeriod /= 2
|
||||||
|
@@ -24,6 +24,7 @@ import (
|
|||||||
"sync"
|
"sync"
|
||||||
"syscall"
|
"syscall"
|
||||||
"testing"
|
"testing"
|
||||||
|
"time"
|
||||||
|
|
||||||
log "github.com/golang/glog"
|
log "github.com/golang/glog"
|
||||||
"github.com/stretchr/testify/assert"
|
"github.com/stretchr/testify/assert"
|
||||||
@@ -222,15 +223,28 @@ func TestMergeOutput(t *testing.T) {
|
|||||||
<-te.Done() // wait for the merge to complete
|
<-te.Done() // wait for the merge to complete
|
||||||
}
|
}
|
||||||
|
|
||||||
|
type fakeTimer struct {
|
||||||
|
ch chan time.Time
|
||||||
|
}
|
||||||
|
|
||||||
|
func (t *fakeTimer) set(d time.Duration) {}
|
||||||
|
func (t *fakeTimer) discard() {}
|
||||||
|
func (t *fakeTimer) await() <-chan time.Time { return t.ch }
|
||||||
|
func (t *fakeTimer) expire() { t.ch = make(chan time.Time); close(t.ch) }
|
||||||
|
func (t *fakeTimer) reset() { t.ch = nil }
|
||||||
|
|
||||||
func TestAfterDeath(t *testing.T) {
|
func TestAfterDeath(t *testing.T) {
|
||||||
// test kill escalation since that's not covered by other unit tests
|
// test kill escalation since that's not covered by other unit tests
|
||||||
t1 := New("foo", "", nil, nil, devNull)
|
t1 := New("foo", "", nil, nil, devNull)
|
||||||
kills := 0
|
kills := 0
|
||||||
waitCh := make(chan *Completion, 1)
|
waitCh := make(chan *Completion, 1)
|
||||||
|
timer := &fakeTimer{}
|
||||||
|
timer.expire()
|
||||||
t1.killFunc = func(force bool) (int, error) {
|
t1.killFunc = func(force bool) (int, error) {
|
||||||
// > 0 is intentional, multiple calls to close() should panic
|
// > 0 is intentional, multiple calls to close() should panic
|
||||||
if kills > 0 {
|
if kills > 0 {
|
||||||
assert.True(t, force)
|
assert.True(t, force)
|
||||||
|
timer.reset() // don't want to race w/ waitCh
|
||||||
waitCh <- &Completion{name: t1.name, code: 123}
|
waitCh <- &Completion{name: t1.name, code: 123}
|
||||||
close(waitCh)
|
close(waitCh)
|
||||||
} else {
|
} else {
|
||||||
@@ -239,7 +253,7 @@ func TestAfterDeath(t *testing.T) {
|
|||||||
kills++
|
kills++
|
||||||
return 0, nil
|
return 0, nil
|
||||||
}
|
}
|
||||||
wr := t1.awaitDeath(0, waitCh)
|
wr := t1.awaitDeath(timer, 0, waitCh)
|
||||||
assert.Equal(t, "foo", wr.name)
|
assert.Equal(t, "foo", wr.name)
|
||||||
assert.Equal(t, 123, wr.code)
|
assert.Equal(t, 123, wr.code)
|
||||||
assert.NoError(t, wr.err)
|
assert.NoError(t, wr.err)
|
||||||
@@ -252,7 +266,9 @@ func TestAfterDeath(t *testing.T) {
|
|||||||
t.Fatalf("should not attempt to kill a task that has already reported completion")
|
t.Fatalf("should not attempt to kill a task that has already reported completion")
|
||||||
return 0, nil
|
return 0, nil
|
||||||
}
|
}
|
||||||
wr = t1.awaitDeath(0, waitCh)
|
|
||||||
|
timer.reset() // don't race w/ waitCh
|
||||||
|
wr = t1.awaitDeath(timer, 0, waitCh)
|
||||||
assert.Equal(t, 456, wr.code)
|
assert.Equal(t, 456, wr.code)
|
||||||
assert.NoError(t, wr.err)
|
assert.NoError(t, wr.err)
|
||||||
|
|
||||||
@@ -270,7 +286,8 @@ func TestAfterDeath(t *testing.T) {
|
|||||||
kills++
|
kills++
|
||||||
return 0, nil
|
return 0, nil
|
||||||
}
|
}
|
||||||
wr = t1.awaitDeath(0, nil)
|
timer.expire()
|
||||||
|
wr = t1.awaitDeath(timer, 0, nil)
|
||||||
assert.Equal(t, "foo", wr.name)
|
assert.Equal(t, "foo", wr.name)
|
||||||
assert.Error(t, wr.err)
|
assert.Error(t, wr.err)
|
||||||
|
|
||||||
@@ -287,7 +304,8 @@ func TestAfterDeath(t *testing.T) {
|
|||||||
kills++
|
kills++
|
||||||
return 0, killFailed
|
return 0, killFailed
|
||||||
}
|
}
|
||||||
wr = t1.awaitDeath(0, nil)
|
timer.expire()
|
||||||
|
wr = t1.awaitDeath(timer, 0, nil)
|
||||||
assert.Equal(t, "foo", wr.name)
|
assert.Equal(t, "foo", wr.name)
|
||||||
assert.Error(t, wr.err)
|
assert.Error(t, wr.err)
|
||||||
}
|
}
|
||||||
|
52
contrib/mesos/pkg/minion/tasks/timer.go
Normal file
52
contrib/mesos/pkg/minion/tasks/timer.go
Normal file
@@ -0,0 +1,52 @@
|
|||||||
|
/*
|
||||||
|
Copyright 2015 The Kubernetes Authors All rights reserved.
|
||||||
|
|
||||||
|
Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
|
you may not use this file except in compliance with the License.
|
||||||
|
You may obtain a copy of the License at
|
||||||
|
|
||||||
|
http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
|
||||||
|
Unless required by applicable law or agreed to in writing, software
|
||||||
|
distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
See the License for the specific language governing permissions and
|
||||||
|
limitations under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package tasks
|
||||||
|
|
||||||
|
import (
|
||||||
|
"time"
|
||||||
|
)
|
||||||
|
|
||||||
|
type timer interface {
|
||||||
|
set(time.Duration)
|
||||||
|
discard()
|
||||||
|
await() <-chan time.Time
|
||||||
|
}
|
||||||
|
|
||||||
|
type realTimer struct {
|
||||||
|
*time.Timer
|
||||||
|
}
|
||||||
|
|
||||||
|
func (t *realTimer) set(d time.Duration) {
|
||||||
|
if t.Timer == nil {
|
||||||
|
t.Timer = time.NewTimer(d)
|
||||||
|
} else {
|
||||||
|
t.Reset(d)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (t *realTimer) await() <-chan time.Time {
|
||||||
|
if t.Timer == nil {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
return t.C
|
||||||
|
}
|
||||||
|
|
||||||
|
func (t *realTimer) discard() {
|
||||||
|
if t.Timer != nil {
|
||||||
|
t.Stop()
|
||||||
|
}
|
||||||
|
}
|
Reference in New Issue
Block a user