Work around for PDs stop mounting after a few hours issue

This commit is contained in:
saadali
2015-06-21 19:39:17 -07:00
parent ff0546da4f
commit c952ee23a7
3 changed files with 439 additions and 35 deletions

View File

@@ -0,0 +1,92 @@
/*
Copyright 2015 The Kubernetes Authors All rights reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package operationmanager
import (
"fmt"
"sync"
)
// Operation Manager is a thread-safe interface for keeping track of multiple pending async operations.
type OperationManager interface {
// Called when the operation with the given ID has started.
// Creates a new channel with specified buffer size tracked with the specified ID.
// Returns a read-only version of the newly created channel.
// Returns an error if an entry with the specified ID already exists (previous entry must be removed by calling Close).
Start(id string, bufferSize uint) (<-chan interface{}, error)
// Called when the operation with the given ID has terminated.
// Closes and removes the channel associated with ID.
// Returns an error if no associated channel exists.
Close(id string) error
// Attempts to send msg to the channel associated with ID.
// Returns an error if no associated channel exists.
Send(id string, msg interface{}) error
}
// Returns a new instance of a channel manager.
func NewOperationManager() OperationManager {
return &operationManager{
chanMap: make(map[string]chan interface{}),
}
}
type operationManager struct {
sync.RWMutex
chanMap map[string]chan interface{}
}
// Called when the operation with the given ID has started.
// Creates a new channel with specified buffer size tracked with the specified ID.
// Returns a read-only version of the newly created channel.
// Returns an error if an entry with the specified ID already exists (previous entry must be removed by calling Close).
func (cm *operationManager) Start(id string, bufferSize uint) (<-chan interface{}, error) {
cm.Lock()
defer cm.Unlock()
if _, exists := cm.chanMap[id]; exists {
return nil, fmt.Errorf("id %q already exists", id)
}
cm.chanMap[id] = make(chan interface{}, bufferSize)
return cm.chanMap[id], nil
}
// Called when the operation with the given ID has terminated.
// Closes and removes the channel associated with ID.
// Returns an error if no associated channel exists.
func (cm *operationManager) Close(id string) error {
cm.Lock()
defer cm.Unlock()
if _, exists := cm.chanMap[id]; !exists {
return fmt.Errorf("id %q not found", id)
}
close(cm.chanMap[id])
delete(cm.chanMap, id)
return nil
}
// Attempts to send msg to the channel associated with ID.
// Returns an error if no associated channel exists.
func (cm *operationManager) Send(id string, msg interface{}) error {
cm.RLock()
defer cm.RUnlock()
if _, exists := cm.chanMap[id]; !exists {
return fmt.Errorf("id %q not found", id)
}
cm.chanMap[id] <- msg
return nil
}

View File

@@ -0,0 +1,139 @@
/*
Copyright 2015 The Kubernetes Authors All rights reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
// Channel Manager keeps track of multiple channels
package operationmanager
import (
"testing"
)
func TestStart(t *testing.T) {
// Arrange
cm := NewOperationManager()
chanId := "testChanId"
testMsg := "test message"
// Act
ch, startErr := cm.Start(chanId, 1 /* bufferSize */)
sigErr := cm.Send(chanId, testMsg)
// Assert
if startErr != nil {
t.Fatalf("Unexpected error on Start. Expected: <no error> Actual: <%v>", startErr)
}
if sigErr != nil {
t.Fatalf("Unexpected error on Send. Expected: <no error> Actual: <%v>", sigErr)
}
if actual := <-ch; actual != testMsg {
t.Fatalf("Unexpected testMsg value. Expected: <%v> Actual: <%v>", testMsg, actual)
}
}
func TestStartIdExists(t *testing.T) {
// Arrange
cm := NewOperationManager()
chanId := "testChanId"
// Act
_, startErr1 := cm.Start(chanId, 1 /* bufferSize */)
_, startErr2 := cm.Start(chanId, 1 /* bufferSize */)
// Assert
if startErr1 != nil {
t.Fatalf("Unexpected error on Start1. Expected: <no error> Actual: <%v>", startErr1)
}
if startErr2 == nil {
t.Fatalf("Expected error on Start2. Expected: <id already exists error> Actual: <no error>")
}
}
func TestStartAndAdd2Chans(t *testing.T) {
// Arrange
cm := NewOperationManager()
chanId1 := "testChanId1"
chanId2 := "testChanId2"
testMsg1 := "test message 1"
testMsg2 := "test message 2"
// Act
ch1, startErr1 := cm.Start(chanId1, 1 /* bufferSize */)
ch2, startErr2 := cm.Start(chanId2, 1 /* bufferSize */)
sigErr1 := cm.Send(chanId1, testMsg1)
sigErr2 := cm.Send(chanId2, testMsg2)
// Assert
if startErr1 != nil {
t.Fatalf("Unexpected error on Start1. Expected: <no error> Actual: <%v>", startErr1)
}
if startErr2 != nil {
t.Fatalf("Unexpected error on Start2. Expected: <no error> Actual: <%v>", startErr2)
}
if sigErr1 != nil {
t.Fatalf("Unexpected error on Send1. Expected: <no error> Actual: <%v>", sigErr1)
}
if sigErr2 != nil {
t.Fatalf("Unexpected error on Send2. Expected: <no error> Actual: <%v>", sigErr2)
}
if actual := <-ch1; actual != testMsg1 {
t.Fatalf("Unexpected testMsg value. Expected: <%v> Actual: <%v>", testMsg1, actual)
}
if actual := <-ch2; actual != testMsg2 {
t.Fatalf("Unexpected testMsg value. Expected: <%v> Actual: <%v>", testMsg2, actual)
}
}
func TestStartAndAdd2ChansAndClose(t *testing.T) {
// Arrange
cm := NewOperationManager()
chanId1 := "testChanId1"
chanId2 := "testChanId2"
testMsg1 := "test message 1"
testMsg2 := "test message 2"
// Act
ch1, startErr1 := cm.Start(chanId1, 1 /* bufferSize */)
ch2, startErr2 := cm.Start(chanId2, 1 /* bufferSize */)
sigErr1 := cm.Send(chanId1, testMsg1)
sigErr2 := cm.Send(chanId2, testMsg2)
cm.Close(chanId1)
sigErr3 := cm.Send(chanId1, testMsg1)
// Assert
if startErr1 != nil {
t.Fatalf("Unexpected error on Start1. Expected: <no error> Actual: <%v>", startErr1)
}
if startErr2 != nil {
t.Fatalf("Unexpected error on Start2. Expected: <no error> Actual: <%v>", startErr2)
}
if sigErr1 != nil {
t.Fatalf("Unexpected error on Send1. Expected: <no error> Actual: <%v>", sigErr1)
}
if sigErr2 != nil {
t.Fatalf("Unexpected error on Send2. Expected: <no error> Actual: <%v>", sigErr2)
}
if sigErr3 == nil {
t.Fatalf("Expected error on Send3. Expected: <error> Actual: <no error>", sigErr2)
}
if actual := <-ch1; actual != testMsg1 {
t.Fatalf("Unexpected testMsg value. Expected: <%v> Actual: <%v>", testMsg1, actual)
}
if actual := <-ch2; actual != testMsg2 {
t.Fatalf("Unexpected testMsg value. Expected: <%v> Actual: <%v>", testMsg2, actual)
}
}