Merge pull request #24838 from saad-ali/attachControllerOpMap

Automatic merge from submit-queue

Add data structure for managing go routines by name

This PR introduces a data structure for managing go routines by name. It prevents the creation of new go routines if an existing go routine with the same name exists. This will enable parallelization of the designs in https://github.com/kubernetes/kubernetes/issues/20262 and https://github.com/kubernetes/kubernetes/issues/21931 with sufficient protection to prevent starting multiple operations on the same volume.
This commit is contained in:
k8s-merge-robot 2016-05-09 06:31:22 -07:00
commit 6596ae0c61
2 changed files with 273 additions and 0 deletions

View File

@ -0,0 +1,76 @@
/*
Copyright 2016 The Kubernetes Authors All rights reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
/*
Package goroutinemap implements a data structure for managing go routines
by name. It prevents the creation of new go routines if an existing go routine
with the same name exists.
*/
package goroutinemap
import (
"fmt"
"sync"
"k8s.io/kubernetes/pkg/util/runtime"
)
// GoRoutineMap defines the supported set of operations.
type GoRoutineMap interface {
// NewGoRoutine adds operationName to the list of running operations and
// spawns a new go routine to execute the operation. If an operation with
// the same name already exists, an error is returned. Once the operation
// is complete, the go routine is terminated and the operationName is
// removed from the list of executing operations allowing a new operation
// to be started with the same name without error.
NewGoRoutine(operationName string, operation func() error) error
}
// NewGoRoutineMap returns a new instance of GoRoutineMap.
func NewGoRoutineMap() GoRoutineMap {
return &goRoutineMap{
operations: make(map[string]bool),
}
}
type goRoutineMap struct {
operations map[string]bool
sync.Mutex
}
func (grm *goRoutineMap) NewGoRoutine(operationName string, operation func() error) error {
grm.Lock()
defer grm.Unlock()
if grm.operations[operationName] {
// Operation with name exists
return fmt.Errorf("Failed to create operation with name %q. An operation with that name already exists.", operationName)
}
grm.operations[operationName] = true
go func() {
defer grm.operationComplete(operationName)
defer runtime.HandleCrash()
operation()
}()
return nil
}
func (grm *goRoutineMap) operationComplete(operationName string) {
grm.Lock()
defer grm.Unlock()
delete(grm.operations, operationName)
}

View File

@ -0,0 +1,197 @@
/*
Copyright 2016 The Kubernetes Authors All rights reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package goroutinemap
import (
"testing"
"time"
"k8s.io/kubernetes/pkg/util/wait"
)
func Test_NewGoRoutineMap_Positive_SingleOp(t *testing.T) {
// Arrange
grm := NewGoRoutineMap()
operationName := "operation-name"
operation := func() error { return nil }
// Act
err := grm.NewGoRoutine(operationName, operation)
// Assert
if err != nil {
t.Fatalf("NewGoRoutine failed. Expected: <no error> Actual: <%v>", err)
}
}
func Test_NewGoRoutineMap_Positive_SecondOpAfterFirstCompletes(t *testing.T) {
// Arrange
grm := NewGoRoutineMap()
operationName := "operation-name"
operation1DoneCh := make(chan interface{}, 0 /* bufferSize */)
operation1 := generateCallbackFunc(operation1DoneCh)
err1 := grm.NewGoRoutine(operationName, operation1)
if err1 != nil {
t.Fatalf("NewGoRoutine failed. Expected: <no error> Actual: <%v>", err1)
}
operation2 := generateNoopFunc()
<-operation1DoneCh // Force operation1 to complete
// Act
err2 := retryWithExponentialBackOff(
time.Duration(20*time.Millisecond),
func() (bool, error) {
err := grm.NewGoRoutine(operationName, operation2)
if err != nil {
t.Logf("Warning: NewGoRoutine failed. Expected: <no error> Actual: <%v>. Will retry.", err)
return false, nil
}
return true, nil
},
)
// Assert
if err2 != nil {
t.Fatalf("NewGoRoutine failed. Expected: <no error> Actual: <%v>", err2)
}
}
func Test_NewGoRoutineMap_Positive_SecondOpAfterFirstPanics(t *testing.T) {
// Arrange
grm := NewGoRoutineMap()
operationName := "operation-name"
operation1 := generatePanicFunc()
err1 := grm.NewGoRoutine(operationName, operation1)
if err1 != nil {
t.Fatalf("NewGoRoutine failed. Expected: <no error> Actual: <%v>", err1)
}
operation2 := generateNoopFunc()
// Act
err2 := retryWithExponentialBackOff(
time.Duration(20*time.Millisecond),
func() (bool, error) {
err := grm.NewGoRoutine(operationName, operation2)
if err != nil {
t.Logf("Warning: NewGoRoutine failed. Expected: <no error> Actual: <%v>. Will retry.", err)
return false, nil
}
return true, nil
},
)
// Assert
if err2 != nil {
t.Fatalf("NewGoRoutine failed. Expected: <no error> Actual: <%v>", err2)
}
}
func Test_NewGoRoutineMap_Negative_SecondOpBeforeFirstCompletes(t *testing.T) {
// Arrange
grm := NewGoRoutineMap()
operationName := "operation-name"
operation1DoneCh := make(chan interface{}, 0 /* bufferSize */)
operation1 := generateWaitFunc(operation1DoneCh)
err1 := grm.NewGoRoutine(operationName, operation1)
if err1 != nil {
t.Fatalf("NewGoRoutine failed. Expected: <no error> Actual: <%v>", err1)
}
operation2 := generateNoopFunc()
// Act
err2 := grm.NewGoRoutine(operationName, operation2)
// Assert
if err2 == nil {
t.Fatalf("NewGoRoutine did not fail. Expected: <Failed to create operation with name \"%s\". An operation with that name already exists.> Actual: <no error>", operationName)
}
}
func Test_NewGoRoutineMap_Positive_ThirdOpAfterFirstCompletes(t *testing.T) {
// Arrange
grm := NewGoRoutineMap()
operationName := "operation-name"
operation1DoneCh := make(chan interface{}, 0 /* bufferSize */)
operation1 := generateWaitFunc(operation1DoneCh)
err1 := grm.NewGoRoutine(operationName, operation1)
if err1 != nil {
t.Fatalf("NewGoRoutine failed. Expected: <no error> Actual: <%v>", err1)
}
operation2 := generateNoopFunc()
operation3 := generateNoopFunc()
// Act
err2 := grm.NewGoRoutine(operationName, operation2)
// Assert
if err2 == nil {
t.Fatalf("NewGoRoutine did not fail. Expected: <Failed to create operation with name \"%s\". An operation with that name already exists.> Actual: <no error>", operationName)
}
// Act
operation1DoneCh <- true // Force operation1 to complete
err3 := retryWithExponentialBackOff(
time.Duration(20*time.Millisecond),
func() (bool, error) {
err := grm.NewGoRoutine(operationName, operation3)
if err != nil {
t.Logf("Warning: NewGoRoutine failed. Expected: <no error> Actual: <%v>. Will retry.", err)
return false, nil
}
return true, nil
},
)
// Assert
if err3 != nil {
t.Fatalf("NewGoRoutine failed. Expected: <no error> Actual: <%v>", err3)
}
}
func generateCallbackFunc(done chan<- interface{}) func() error {
return func() error {
done <- true
return nil
}
}
func generateWaitFunc(done <-chan interface{}) func() error {
return func() error {
<-done
return nil
}
}
func generatePanicFunc() func() error {
return func() error {
panic("testing panic")
}
}
func generateNoopFunc() func() error {
return func() error { return nil }
}
func retryWithExponentialBackOff(initialDuration time.Duration, fn wait.ConditionFunc) error {
backoff := wait.Backoff{
Duration: initialDuration,
Factor: 3,
Jitter: 0,
Steps: 4,
}
return wait.ExponentialBackoff(backoff, fn)
}