Merge pull request #47157 from wasylkowski/remove-rc-race-condition

Automatic merge from submit-queue (batch tested with PRs 47065, 47157, 47143)

Removed a race condition from ResourceConsumer

**What this PR does / why we need it**: Without this PR there is a race condition in ResourceConsumer that sometimes results in communication to pods that might not exist anymore.

**Which issue this PR fixes** *(optional, in `fixes #<issue number>(, fixes #<issue_number>, ...)` format, will close that issue when PR gets merged)*: fixes #47127

**Special notes for your reviewer**:

**Release note**:

```release-note
NONE
```
This commit is contained in:
Kubernetes Submit Queue 2017-06-08 13:43:14 -07:00 committed by GitHub
commit 409165bbad

View File

@ -20,6 +20,7 @@ import (
"context" "context"
"fmt" "fmt"
"strconv" "strconv"
"sync"
"time" "time"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
@ -86,6 +87,7 @@ type ResourceConsumer struct {
stopCPU chan int stopCPU chan int
stopMem chan int stopMem chan int
stopCustomMetric chan int stopCustomMetric chan int
stopWaitGroup sync.WaitGroup
consumptionTimeInSeconds int consumptionTimeInSeconds int
sleepTime time.Duration sleepTime time.Duration
requestSizeInMillicores int requestSizeInMillicores int
@ -167,6 +169,8 @@ func (rc *ResourceConsumer) ConsumeCustomMetric(amount int) {
func (rc *ResourceConsumer) makeConsumeCPURequests() { func (rc *ResourceConsumer) makeConsumeCPURequests() {
defer GinkgoRecover() defer GinkgoRecover()
rc.stopWaitGroup.Add(1)
defer rc.stopWaitGroup.Done()
sleepTime := time.Duration(0) sleepTime := time.Duration(0)
millicores := 0 millicores := 0
for { for {
@ -186,6 +190,8 @@ func (rc *ResourceConsumer) makeConsumeCPURequests() {
func (rc *ResourceConsumer) makeConsumeMemRequests() { func (rc *ResourceConsumer) makeConsumeMemRequests() {
defer GinkgoRecover() defer GinkgoRecover()
rc.stopWaitGroup.Add(1)
defer rc.stopWaitGroup.Done()
sleepTime := time.Duration(0) sleepTime := time.Duration(0)
megabytes := 0 megabytes := 0
for { for {
@ -205,6 +211,8 @@ func (rc *ResourceConsumer) makeConsumeMemRequests() {
func (rc *ResourceConsumer) makeConsumeCustomMetric() { func (rc *ResourceConsumer) makeConsumeCustomMetric() {
defer GinkgoRecover() defer GinkgoRecover()
rc.stopWaitGroup.Add(1)
defer rc.stopWaitGroup.Done()
sleepTime := time.Duration(0) sleepTime := time.Duration(0)
delta := 0 delta := 0
for { for {
@ -364,6 +372,7 @@ func (rc *ResourceConsumer) Pause() {
rc.stopCPU <- 0 rc.stopCPU <- 0
rc.stopMem <- 0 rc.stopMem <- 0
rc.stopCustomMetric <- 0 rc.stopCustomMetric <- 0
rc.stopWaitGroup.Wait()
} }
// Pause starts background goroutines responsible for consuming resources. // Pause starts background goroutines responsible for consuming resources.
@ -379,6 +388,7 @@ func (rc *ResourceConsumer) CleanUp() {
close(rc.stopCPU) close(rc.stopCPU)
close(rc.stopMem) close(rc.stopMem)
close(rc.stopCustomMetric) close(rc.stopCustomMetric)
rc.stopWaitGroup.Wait()
// Wait some time to ensure all child goroutines are finished. // Wait some time to ensure all child goroutines are finished.
time.Sleep(10 * time.Second) time.Sleep(10 * time.Second)
kind, err := kindOf(rc.kind) kind, err := kindOf(rc.kind)