From 53bb527e352f385a2b5e4e95a4d09626350c70be Mon Sep 17 00:00:00 2001 From: saadali Date: Tue, 26 Apr 2016 18:07:38 -0700 Subject: [PATCH] Introduce a data structure for managing go routines by unique name. --- pkg/util/goroutinemap/goroutinemap.go | 76 ++++++++ pkg/util/goroutinemap/goroutinemap_test.go | 197 +++++++++++++++++++++ 2 files changed, 273 insertions(+) create mode 100644 pkg/util/goroutinemap/goroutinemap.go create mode 100644 pkg/util/goroutinemap/goroutinemap_test.go diff --git a/pkg/util/goroutinemap/goroutinemap.go b/pkg/util/goroutinemap/goroutinemap.go new file mode 100644 index 00000000000..277f7809d74 --- /dev/null +++ b/pkg/util/goroutinemap/goroutinemap.go @@ -0,0 +1,76 @@ +/* +Copyright 2016 The Kubernetes Authors All rights reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +/* +Package goroutinemap implements a data structure for managing go routines +by name. It prevents the creation of new go routines if an existing go routine +with the same name exists. +*/ +package goroutinemap + +import ( + "fmt" + "sync" + + "k8s.io/kubernetes/pkg/util/runtime" +) + +// GoRoutineMap defines the supported set of operations. +type GoRoutineMap interface { + // NewGoRoutine adds operationName to the list of running operations and + // spawns a new go routine to execute the operation. If an operation with + // the same name already exists, an error is returned. Once the operation + // is complete, the go routine is terminated and the operationName is + // removed from the list of executing operations allowing a new operation + // to be started with the same name without error. + NewGoRoutine(operationName string, operation func() error) error +} + +// NewGoRoutineMap returns a new instance of GoRoutineMap. +func NewGoRoutineMap() GoRoutineMap { + return &goRoutineMap{ + operations: make(map[string]bool), + } +} + +type goRoutineMap struct { + operations map[string]bool + sync.Mutex +} + +func (grm *goRoutineMap) NewGoRoutine(operationName string, operation func() error) error { + grm.Lock() + defer grm.Unlock() + if grm.operations[operationName] { + // Operation with name exists + return fmt.Errorf("Failed to create operation with name %q. An operation with that name already exists.", operationName) + } + + grm.operations[operationName] = true + go func() { + defer grm.operationComplete(operationName) + defer runtime.HandleCrash() + operation() + }() + + return nil +} + +func (grm *goRoutineMap) operationComplete(operationName string) { + grm.Lock() + defer grm.Unlock() + delete(grm.operations, operationName) +} diff --git a/pkg/util/goroutinemap/goroutinemap_test.go b/pkg/util/goroutinemap/goroutinemap_test.go new file mode 100644 index 00000000000..48f7cf6e8b0 --- /dev/null +++ b/pkg/util/goroutinemap/goroutinemap_test.go @@ -0,0 +1,197 @@ +/* +Copyright 2016 The Kubernetes Authors All rights reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package goroutinemap + +import ( + "testing" + "time" + + "k8s.io/kubernetes/pkg/util/wait" +) + +func Test_NewGoRoutineMap_Positive_SingleOp(t *testing.T) { + // Arrange + grm := NewGoRoutineMap() + operationName := "operation-name" + operation := func() error { return nil } + + // Act + err := grm.NewGoRoutine(operationName, operation) + + // Assert + if err != nil { + t.Fatalf("NewGoRoutine failed. Expected: Actual: <%v>", err) + } +} + +func Test_NewGoRoutineMap_Positive_SecondOpAfterFirstCompletes(t *testing.T) { + // Arrange + grm := NewGoRoutineMap() + operationName := "operation-name" + operation1DoneCh := make(chan interface{}, 0 /* bufferSize */) + operation1 := generateCallbackFunc(operation1DoneCh) + err1 := grm.NewGoRoutine(operationName, operation1) + if err1 != nil { + t.Fatalf("NewGoRoutine failed. Expected: Actual: <%v>", err1) + } + operation2 := generateNoopFunc() + <-operation1DoneCh // Force operation1 to complete + + // Act + err2 := retryWithExponentialBackOff( + time.Duration(20*time.Millisecond), + func() (bool, error) { + err := grm.NewGoRoutine(operationName, operation2) + if err != nil { + t.Logf("Warning: NewGoRoutine failed. Expected: Actual: <%v>. Will retry.", err) + return false, nil + } + return true, nil + }, + ) + + // Assert + if err2 != nil { + t.Fatalf("NewGoRoutine failed. Expected: Actual: <%v>", err2) + } +} + +func Test_NewGoRoutineMap_Positive_SecondOpAfterFirstPanics(t *testing.T) { + // Arrange + grm := NewGoRoutineMap() + operationName := "operation-name" + operation1 := generatePanicFunc() + err1 := grm.NewGoRoutine(operationName, operation1) + if err1 != nil { + t.Fatalf("NewGoRoutine failed. Expected: Actual: <%v>", err1) + } + operation2 := generateNoopFunc() + + // Act + err2 := retryWithExponentialBackOff( + time.Duration(20*time.Millisecond), + func() (bool, error) { + err := grm.NewGoRoutine(operationName, operation2) + if err != nil { + t.Logf("Warning: NewGoRoutine failed. Expected: Actual: <%v>. Will retry.", err) + return false, nil + } + return true, nil + }, + ) + + // Assert + if err2 != nil { + t.Fatalf("NewGoRoutine failed. Expected: Actual: <%v>", err2) + } +} + +func Test_NewGoRoutineMap_Negative_SecondOpBeforeFirstCompletes(t *testing.T) { + // Arrange + grm := NewGoRoutineMap() + operationName := "operation-name" + operation1DoneCh := make(chan interface{}, 0 /* bufferSize */) + operation1 := generateWaitFunc(operation1DoneCh) + err1 := grm.NewGoRoutine(operationName, operation1) + if err1 != nil { + t.Fatalf("NewGoRoutine failed. Expected: Actual: <%v>", err1) + } + operation2 := generateNoopFunc() + + // Act + err2 := grm.NewGoRoutine(operationName, operation2) + + // Assert + if err2 == nil { + t.Fatalf("NewGoRoutine did not fail. Expected: Actual: ", operationName) + } +} + +func Test_NewGoRoutineMap_Positive_ThirdOpAfterFirstCompletes(t *testing.T) { + // Arrange + grm := NewGoRoutineMap() + operationName := "operation-name" + operation1DoneCh := make(chan interface{}, 0 /* bufferSize */) + operation1 := generateWaitFunc(operation1DoneCh) + err1 := grm.NewGoRoutine(operationName, operation1) + if err1 != nil { + t.Fatalf("NewGoRoutine failed. Expected: Actual: <%v>", err1) + } + operation2 := generateNoopFunc() + operation3 := generateNoopFunc() + + // Act + err2 := grm.NewGoRoutine(operationName, operation2) + + // Assert + if err2 == nil { + t.Fatalf("NewGoRoutine did not fail. Expected: Actual: ", operationName) + } + + // Act + operation1DoneCh <- true // Force operation1 to complete + err3 := retryWithExponentialBackOff( + time.Duration(20*time.Millisecond), + func() (bool, error) { + err := grm.NewGoRoutine(operationName, operation3) + if err != nil { + t.Logf("Warning: NewGoRoutine failed. Expected: Actual: <%v>. Will retry.", err) + return false, nil + } + return true, nil + }, + ) + + // Assert + if err3 != nil { + t.Fatalf("NewGoRoutine failed. Expected: Actual: <%v>", err3) + } +} + +func generateCallbackFunc(done chan<- interface{}) func() error { + return func() error { + done <- true + return nil + } +} + +func generateWaitFunc(done <-chan interface{}) func() error { + return func() error { + <-done + return nil + } +} + +func generatePanicFunc() func() error { + return func() error { + panic("testing panic") + } +} + +func generateNoopFunc() func() error { + return func() error { return nil } +} + +func retryWithExponentialBackOff(initialDuration time.Duration, fn wait.ConditionFunc) error { + backoff := wait.Backoff{ + Duration: initialDuration, + Factor: 3, + Jitter: 0, + Steps: 4, + } + return wait.ExponentialBackoff(backoff, fn) +}