mirror of
https://github.com/k3s-io/kubernetes.git
synced 2025-07-27 05:27:21 +00:00
Merge pull request #16053 from saad-ali/attachDetachMutextFix
Fix GCE Cloud/Attach/Detach stability issues
This commit is contained in:
commit
b07dd73f26
@ -55,6 +55,9 @@ const (
|
|||||||
gceAffinityTypeClientIP = "CLIENT_IP"
|
gceAffinityTypeClientIP = "CLIENT_IP"
|
||||||
// AffinityTypeClientIPProto - affinity based on Client IP and port.
|
// AffinityTypeClientIPProto - affinity based on Client IP and port.
|
||||||
gceAffinityTypeClientIPProto = "CLIENT_IP_PROTO"
|
gceAffinityTypeClientIPProto = "CLIENT_IP_PROTO"
|
||||||
|
|
||||||
|
operationPollInterval = 3 * time.Second
|
||||||
|
operationPollTimeoutDuration = 30 * time.Minute
|
||||||
)
|
)
|
||||||
|
|
||||||
// GCECloud is an implementation of Interface, TCPLoadBalancer and Instances for Google Compute Engine.
|
// GCECloud is an implementation of Interface, TCPLoadBalancer and Instances for Google Compute Engine.
|
||||||
@ -259,48 +262,57 @@ func (gce *GCECloud) targetPoolURL(name, region string) string {
|
|||||||
return fmt.Sprintf("https://www.googleapis.com/compute/v1/projects/%s/regions/%s/targetPools/%s", gce.projectID, region, name)
|
return fmt.Sprintf("https://www.googleapis.com/compute/v1/projects/%s/regions/%s/targetPools/%s", gce.projectID, region, name)
|
||||||
}
|
}
|
||||||
|
|
||||||
func waitForOp(op *compute.Operation, getOperation func() (*compute.Operation, error)) error {
|
func waitForOp(op *compute.Operation, getOperation func(operationName string) (*compute.Operation, error)) error {
|
||||||
pollOp := op
|
if op == nil {
|
||||||
consecPollFails := 0
|
return fmt.Errorf("operation must not be nil")
|
||||||
for pollOp.Status != "DONE" {
|
}
|
||||||
var err error
|
|
||||||
time.Sleep(3 * time.Second)
|
if opIsDone(op) {
|
||||||
pollOp, err = getOperation()
|
return getErrorFromOp(op)
|
||||||
|
}
|
||||||
|
|
||||||
|
opName := op.Name
|
||||||
|
return wait.Poll(operationPollInterval, operationPollTimeoutDuration, func() (bool, error) {
|
||||||
|
pollOp, err := getOperation(opName)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
if consecPollFails == 2 {
|
glog.Warningf("GCE poll operation failed: %v", err)
|
||||||
// Only bail if we've seen 3 consecutive polling errors.
|
}
|
||||||
|
return opIsDone(pollOp), getErrorFromOp(pollOp)
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
func opIsDone(op *compute.Operation) bool {
|
||||||
|
return op != nil && op.Status == "DONE"
|
||||||
|
}
|
||||||
|
|
||||||
|
func getErrorFromOp(op *compute.Operation) error {
|
||||||
|
if op != nil && op.Error != nil && len(op.Error.Errors) > 0 {
|
||||||
|
err := &googleapi.Error{
|
||||||
|
Code: int(op.HttpErrorStatusCode),
|
||||||
|
Message: op.Error.Errors[0].Message,
|
||||||
|
}
|
||||||
|
glog.Errorf("GCE operation failed: %v", err)
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
consecPollFails++
|
|
||||||
} else {
|
|
||||||
consecPollFails = 0
|
|
||||||
}
|
|
||||||
}
|
|
||||||
if pollOp.Error != nil && len(pollOp.Error.Errors) > 0 {
|
|
||||||
return &googleapi.Error{
|
|
||||||
Code: int(pollOp.HttpErrorStatusCode),
|
|
||||||
Message: pollOp.Error.Errors[0].Message,
|
|
||||||
}
|
|
||||||
}
|
|
||||||
return nil
|
|
||||||
|
|
||||||
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (gce *GCECloud) waitForGlobalOp(op *compute.Operation) error {
|
func (gce *GCECloud) waitForGlobalOp(op *compute.Operation) error {
|
||||||
return waitForOp(op, func() (*compute.Operation, error) {
|
return waitForOp(op, func(operationName string) (*compute.Operation, error) {
|
||||||
return gce.service.GlobalOperations.Get(gce.projectID, op.Name).Do()
|
return gce.service.GlobalOperations.Get(gce.projectID, operationName).Do()
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
func (gce *GCECloud) waitForRegionOp(op *compute.Operation, region string) error {
|
func (gce *GCECloud) waitForRegionOp(op *compute.Operation, region string) error {
|
||||||
return waitForOp(op, func() (*compute.Operation, error) {
|
return waitForOp(op, func(operationName string) (*compute.Operation, error) {
|
||||||
return gce.service.RegionOperations.Get(gce.projectID, region, op.Name).Do()
|
return gce.service.RegionOperations.Get(gce.projectID, region, operationName).Do()
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
func (gce *GCECloud) waitForZoneOp(op *compute.Operation) error {
|
func (gce *GCECloud) waitForZoneOp(op *compute.Operation) error {
|
||||||
return waitForOp(op, func() (*compute.Operation, error) {
|
return waitForOp(op, func(operationName string) (*compute.Operation, error) {
|
||||||
return gce.service.ZoneOperations.Get(gce.projectID, gce.zone, op.Name).Do()
|
return gce.service.ZoneOperations.Get(gce.projectID, gce.zone, operationName).Do()
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -1456,20 +1468,9 @@ func (gce *GCECloud) AttachDisk(diskName string, readOnly bool) error {
|
|||||||
attachedDisk := gce.convertDiskToAttachedDisk(disk, readWrite)
|
attachedDisk := gce.convertDiskToAttachedDisk(disk, readWrite)
|
||||||
|
|
||||||
attachOp, err := gce.service.Instances.AttachDisk(gce.projectID, gce.zone, gce.instanceID, attachedDisk).Do()
|
attachOp, err := gce.service.Instances.AttachDisk(gce.projectID, gce.zone, gce.instanceID, attachedDisk).Do()
|
||||||
if err != nil {
|
|
||||||
// Check if the disk is already attached to this instance. We do this only
|
|
||||||
// in the error case, since it is expected to be exceptional.
|
|
||||||
instance, err := gce.service.Instances.Get(gce.projectID, gce.zone, gce.instanceID).Do()
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
for _, disk := range instance.Disks {
|
|
||||||
if disk.Source == attachedDisk.Source {
|
|
||||||
// Disk is already attached, we're good to go.
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
return gce.waitForZoneOp(attachOp)
|
return gce.waitForZoneOp(attachOp)
|
||||||
}
|
}
|
||||||
|
82
pkg/util/keymutex/keymutex.go
Normal file
82
pkg/util/keymutex/keymutex.go
Normal file
@ -0,0 +1,82 @@
|
|||||||
|
/*
|
||||||
|
Copyright 2015 The Kubernetes Authors All rights reserved.
|
||||||
|
|
||||||
|
Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
|
you may not use this file except in compliance with the License.
|
||||||
|
You may obtain a copy of the License at
|
||||||
|
|
||||||
|
http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
|
||||||
|
Unless required by applicable law or agreed to in writing, software
|
||||||
|
distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
See the License for the specific language governing permissions and
|
||||||
|
limitations under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package keymutex
|
||||||
|
|
||||||
|
import (
|
||||||
|
"fmt"
|
||||||
|
"github.com/golang/glog"
|
||||||
|
"sync"
|
||||||
|
)
|
||||||
|
|
||||||
|
// KeyMutex is a thread-safe interface for aquiring locks on arbitrary strings.
|
||||||
|
type KeyMutex interface {
|
||||||
|
// Aquires a lock associated with the specified ID, creates the lock if one doesn't already exist.
|
||||||
|
LockKey(id string)
|
||||||
|
|
||||||
|
// Releases the lock associated with the specified ID.
|
||||||
|
// Returns an error if the specified ID doesn't exist.
|
||||||
|
UnlockKey(id string) error
|
||||||
|
}
|
||||||
|
|
||||||
|
// Returns a new instance of a key mutex.
|
||||||
|
func NewKeyMutex() KeyMutex {
|
||||||
|
return &keyMutex{
|
||||||
|
mutexMap: make(map[string]*sync.Mutex),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
type keyMutex struct {
|
||||||
|
sync.RWMutex
|
||||||
|
mutexMap map[string]*sync.Mutex
|
||||||
|
}
|
||||||
|
|
||||||
|
// Aquires a lock associated with the specified ID (creates the lock if one doesn't already exist).
|
||||||
|
func (km *keyMutex) LockKey(id string) {
|
||||||
|
glog.V(5).Infof("LockKey(...) called for id %q\r\n", id)
|
||||||
|
mutex := km.getOrCreateLock(id)
|
||||||
|
mutex.Lock()
|
||||||
|
glog.V(5).Infof("LockKey(...) for id %q completed.\r\n", id)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Releases the lock associated with the specified ID.
|
||||||
|
// Returns an error if the specified ID doesn't exist.
|
||||||
|
func (km *keyMutex) UnlockKey(id string) error {
|
||||||
|
glog.V(5).Infof("UnlockKey(...) called for id %q\r\n", id)
|
||||||
|
km.RLock()
|
||||||
|
defer km.RUnlock()
|
||||||
|
mutex, exists := km.mutexMap[id]
|
||||||
|
if !exists {
|
||||||
|
return fmt.Errorf("id %q not found", id)
|
||||||
|
}
|
||||||
|
glog.V(5).Infof("UnlockKey(...) for id. Mutex found, trying to unlock it. %q\r\n", id)
|
||||||
|
|
||||||
|
mutex.Unlock()
|
||||||
|
glog.V(5).Infof("UnlockKey(...) for id %q completed.\r\n", id)
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// Returns lock associated with the specified ID, or creates the lock if one doesn't already exist.
|
||||||
|
func (km *keyMutex) getOrCreateLock(id string) *sync.Mutex {
|
||||||
|
km.Lock()
|
||||||
|
defer km.Unlock()
|
||||||
|
|
||||||
|
if _, exists := km.mutexMap[id]; !exists {
|
||||||
|
km.mutexMap[id] = &sync.Mutex{}
|
||||||
|
}
|
||||||
|
|
||||||
|
return km.mutexMap[id]
|
||||||
|
}
|
111
pkg/util/keymutex/keymutex_test.go
Normal file
111
pkg/util/keymutex/keymutex_test.go
Normal file
@ -0,0 +1,111 @@
|
|||||||
|
/*
|
||||||
|
Copyright 2015 The Kubernetes Authors All rights reserved.
|
||||||
|
|
||||||
|
Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
|
you may not use this file except in compliance with the License.
|
||||||
|
You may obtain a copy of the License at
|
||||||
|
|
||||||
|
http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
|
||||||
|
Unless required by applicable law or agreed to in writing, software
|
||||||
|
distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
See the License for the specific language governing permissions and
|
||||||
|
limitations under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package keymutex
|
||||||
|
|
||||||
|
import (
|
||||||
|
"testing"
|
||||||
|
"time"
|
||||||
|
)
|
||||||
|
|
||||||
|
const (
|
||||||
|
callbackTimeout = 1 * time.Second
|
||||||
|
)
|
||||||
|
|
||||||
|
func Test_SingleLock_NoUnlock(t *testing.T) {
|
||||||
|
// Arrange
|
||||||
|
km := NewKeyMutex()
|
||||||
|
key := "fakeid"
|
||||||
|
callbackCh := make(chan interface{})
|
||||||
|
|
||||||
|
// Act
|
||||||
|
go lockAndCallback(km, key, callbackCh)
|
||||||
|
|
||||||
|
// Assert
|
||||||
|
verifyCallbackHappens(t, callbackCh)
|
||||||
|
}
|
||||||
|
|
||||||
|
func Test_SingleLock_SingleUnlock(t *testing.T) {
|
||||||
|
// Arrange
|
||||||
|
km := NewKeyMutex()
|
||||||
|
key := "fakeid"
|
||||||
|
callbackCh := make(chan interface{})
|
||||||
|
|
||||||
|
// Act & Assert
|
||||||
|
go lockAndCallback(km, key, callbackCh)
|
||||||
|
verifyCallbackHappens(t, callbackCh)
|
||||||
|
km.UnlockKey(key)
|
||||||
|
}
|
||||||
|
|
||||||
|
func Test_DoubleLock_DoubleUnlock(t *testing.T) {
|
||||||
|
// Arrange
|
||||||
|
km := NewKeyMutex()
|
||||||
|
key := "fakeid"
|
||||||
|
callbackCh1stLock := make(chan interface{})
|
||||||
|
callbackCh2ndLock := make(chan interface{})
|
||||||
|
|
||||||
|
// Act & Assert
|
||||||
|
go lockAndCallback(km, key, callbackCh1stLock)
|
||||||
|
verifyCallbackHappens(t, callbackCh1stLock)
|
||||||
|
go lockAndCallback(km, key, callbackCh2ndLock)
|
||||||
|
verifyCallbackDoesntHappens(t, callbackCh2ndLock)
|
||||||
|
km.UnlockKey(key)
|
||||||
|
verifyCallbackHappens(t, callbackCh2ndLock)
|
||||||
|
km.UnlockKey(key)
|
||||||
|
}
|
||||||
|
|
||||||
|
func lockAndCallback(km KeyMutex, id string, callbackCh chan<- interface{}) {
|
||||||
|
km.LockKey(id)
|
||||||
|
callbackCh <- true
|
||||||
|
}
|
||||||
|
|
||||||
|
func verifyCallbackHappens(t *testing.T, callbackCh <-chan interface{}) bool {
|
||||||
|
select {
|
||||||
|
case <-callbackCh:
|
||||||
|
return true
|
||||||
|
case <-time.After(callbackTimeout):
|
||||||
|
t.Fatalf("Timed out waiting for callback.")
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func verifyCallbackDoesntHappens(t *testing.T, callbackCh <-chan interface{}) bool {
|
||||||
|
select {
|
||||||
|
case <-callbackCh:
|
||||||
|
t.Fatalf("Unexpected callback.")
|
||||||
|
return false
|
||||||
|
case <-time.After(callbackTimeout):
|
||||||
|
return true
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func verifyNoError(t *testing.T, err error, name string) {
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("Unexpected response on %q. Expected: <no error> Actual: <%v>", name, err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func verifyError(t *testing.T, err error, name string) {
|
||||||
|
if err == nil {
|
||||||
|
t.Fatalf("Unexpected response on %q. Expected: <error> Actual: <no error>", name)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func verifyMsg(t *testing.T, expected, actual string) {
|
||||||
|
if actual != expected {
|
||||||
|
t.Fatalf("Unexpected testMsg value. Expected: <%v> Actual: <%v>", expected, actual)
|
||||||
|
}
|
||||||
|
}
|
@ -1,103 +0,0 @@
|
|||||||
/*
|
|
||||||
Copyright 2015 The Kubernetes Authors All rights reserved.
|
|
||||||
|
|
||||||
Licensed under the Apache License, Version 2.0 (the "License");
|
|
||||||
you may not use this file except in compliance with the License.
|
|
||||||
You may obtain a copy of the License at
|
|
||||||
|
|
||||||
http://www.apache.org/licenses/LICENSE-2.0
|
|
||||||
|
|
||||||
Unless required by applicable law or agreed to in writing, software
|
|
||||||
distributed under the License is distributed on an "AS IS" BASIS,
|
|
||||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
||||||
See the License for the specific language governing permissions and
|
|
||||||
limitations under the License.
|
|
||||||
*/
|
|
||||||
|
|
||||||
package operationmanager
|
|
||||||
|
|
||||||
import (
|
|
||||||
"fmt"
|
|
||||||
"sync"
|
|
||||||
)
|
|
||||||
|
|
||||||
// Operation Manager is a thread-safe interface for keeping track of multiple pending async operations.
|
|
||||||
type OperationManager interface {
|
|
||||||
// Called when the operation with the given ID has started.
|
|
||||||
// Creates a new channel with specified buffer size tracked with the specified ID.
|
|
||||||
// Returns a read-only version of the newly created channel.
|
|
||||||
// Returns an error if an entry with the specified ID already exists (previous entry must be removed by calling Close).
|
|
||||||
Start(id string, bufferSize uint) (<-chan interface{}, error)
|
|
||||||
|
|
||||||
// Called when the operation with the given ID has terminated.
|
|
||||||
// Closes and removes the channel associated with ID.
|
|
||||||
// Returns an error if no associated channel exists.
|
|
||||||
Close(id string) error
|
|
||||||
|
|
||||||
// Attempts to send msg to the channel associated with ID.
|
|
||||||
// Returns an error if no associated channel exists.
|
|
||||||
Send(id string, msg interface{}) error
|
|
||||||
|
|
||||||
// Returns true if an entry with the specified ID already exists.
|
|
||||||
Exists(id string) bool
|
|
||||||
}
|
|
||||||
|
|
||||||
// Returns a new instance of a channel manager.
|
|
||||||
func NewOperationManager() OperationManager {
|
|
||||||
return &operationManager{
|
|
||||||
chanMap: make(map[string]chan interface{}),
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
type operationManager struct {
|
|
||||||
sync.RWMutex
|
|
||||||
chanMap map[string]chan interface{}
|
|
||||||
}
|
|
||||||
|
|
||||||
// Called when the operation with the given ID has started.
|
|
||||||
// Creates a new channel with specified buffer size tracked with the specified ID.
|
|
||||||
// Returns a read-only version of the newly created channel.
|
|
||||||
// Returns an error if an entry with the specified ID already exists (previous entry must be removed by calling Close).
|
|
||||||
func (cm *operationManager) Start(id string, bufferSize uint) (<-chan interface{}, error) {
|
|
||||||
cm.Lock()
|
|
||||||
defer cm.Unlock()
|
|
||||||
if _, exists := cm.chanMap[id]; exists {
|
|
||||||
return nil, fmt.Errorf("id %q already exists", id)
|
|
||||||
}
|
|
||||||
cm.chanMap[id] = make(chan interface{}, bufferSize)
|
|
||||||
return cm.chanMap[id], nil
|
|
||||||
}
|
|
||||||
|
|
||||||
// Called when the operation with the given ID has terminated.
|
|
||||||
// Closes and removes the channel associated with ID.
|
|
||||||
// Returns an error if no associated channel exists.
|
|
||||||
func (cm *operationManager) Close(id string) error {
|
|
||||||
cm.Lock()
|
|
||||||
defer cm.Unlock()
|
|
||||||
if _, exists := cm.chanMap[id]; !exists {
|
|
||||||
return fmt.Errorf("id %q not found", id)
|
|
||||||
}
|
|
||||||
close(cm.chanMap[id])
|
|
||||||
delete(cm.chanMap, id)
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|
||||||
// Attempts to send msg to the channel associated with ID.
|
|
||||||
// Returns an error if no associated channel exists.
|
|
||||||
func (cm *operationManager) Send(id string, msg interface{}) error {
|
|
||||||
cm.RLock()
|
|
||||||
defer cm.RUnlock()
|
|
||||||
if _, exists := cm.chanMap[id]; !exists {
|
|
||||||
return fmt.Errorf("id %q not found", id)
|
|
||||||
}
|
|
||||||
cm.chanMap[id] <- msg
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|
||||||
// Returns true if an entry with the specified ID already exists.
|
|
||||||
func (cm *operationManager) Exists(id string) (exists bool) {
|
|
||||||
cm.RLock()
|
|
||||||
defer cm.RUnlock()
|
|
||||||
_, exists = cm.chanMap[id]
|
|
||||||
return
|
|
||||||
}
|
|
@ -1,159 +0,0 @@
|
|||||||
/*
|
|
||||||
Copyright 2015 The Kubernetes Authors All rights reserved.
|
|
||||||
|
|
||||||
Licensed under the Apache License, Version 2.0 (the "License");
|
|
||||||
you may not use this file except in compliance with the License.
|
|
||||||
You may obtain a copy of the License at
|
|
||||||
|
|
||||||
http://www.apache.org/licenses/LICENSE-2.0
|
|
||||||
|
|
||||||
Unless required by applicable law or agreed to in writing, software
|
|
||||||
distributed under the License is distributed on an "AS IS" BASIS,
|
|
||||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
||||||
See the License for the specific language governing permissions and
|
|
||||||
limitations under the License.
|
|
||||||
*/
|
|
||||||
|
|
||||||
// Channel Manager keeps track of multiple channels
|
|
||||||
package operationmanager
|
|
||||||
|
|
||||||
import (
|
|
||||||
"testing"
|
|
||||||
)
|
|
||||||
|
|
||||||
func TestStart(t *testing.T) {
|
|
||||||
// Arrange
|
|
||||||
cm := NewOperationManager()
|
|
||||||
chanId := "testChanId"
|
|
||||||
testMsg := "test message"
|
|
||||||
|
|
||||||
// Act
|
|
||||||
ch, startErr := cm.Start(chanId, 1 /* bufferSize */)
|
|
||||||
sigErr := cm.Send(chanId, testMsg)
|
|
||||||
|
|
||||||
// Assert
|
|
||||||
verifyNoError(t, startErr, "Start")
|
|
||||||
verifyNoError(t, sigErr, "Send")
|
|
||||||
actualMsg := <-ch
|
|
||||||
verifyMsg(t, testMsg /* expected */, actualMsg.(string) /* actual */)
|
|
||||||
}
|
|
||||||
|
|
||||||
func TestStartIdExists(t *testing.T) {
|
|
||||||
// Arrange
|
|
||||||
cm := NewOperationManager()
|
|
||||||
chanId := "testChanId"
|
|
||||||
|
|
||||||
// Act
|
|
||||||
_, startErr1 := cm.Start(chanId, 1 /* bufferSize */)
|
|
||||||
_, startErr2 := cm.Start(chanId, 1 /* bufferSize */)
|
|
||||||
|
|
||||||
// Assert
|
|
||||||
verifyNoError(t, startErr1, "Start1")
|
|
||||||
verifyError(t, startErr2, "Start2")
|
|
||||||
}
|
|
||||||
|
|
||||||
func TestStartAndAdd2Chans(t *testing.T) {
|
|
||||||
// Arrange
|
|
||||||
cm := NewOperationManager()
|
|
||||||
chanId1 := "testChanId1"
|
|
||||||
chanId2 := "testChanId2"
|
|
||||||
testMsg1 := "test message 1"
|
|
||||||
testMsg2 := "test message 2"
|
|
||||||
|
|
||||||
// Act
|
|
||||||
ch1, startErr1 := cm.Start(chanId1, 1 /* bufferSize */)
|
|
||||||
ch2, startErr2 := cm.Start(chanId2, 1 /* bufferSize */)
|
|
||||||
sigErr1 := cm.Send(chanId1, testMsg1)
|
|
||||||
sigErr2 := cm.Send(chanId2, testMsg2)
|
|
||||||
|
|
||||||
// Assert
|
|
||||||
verifyNoError(t, startErr1, "Start1")
|
|
||||||
verifyNoError(t, startErr2, "Start2")
|
|
||||||
verifyNoError(t, sigErr1, "Send1")
|
|
||||||
verifyNoError(t, sigErr2, "Send2")
|
|
||||||
actualMsg1 := <-ch1
|
|
||||||
actualMsg2 := <-ch2
|
|
||||||
verifyMsg(t, testMsg1 /* expected */, actualMsg1.(string) /* actual */)
|
|
||||||
verifyMsg(t, testMsg2 /* expected */, actualMsg2.(string) /* actual */)
|
|
||||||
}
|
|
||||||
|
|
||||||
func TestStartAndAdd2ChansAndClose(t *testing.T) {
|
|
||||||
// Arrange
|
|
||||||
cm := NewOperationManager()
|
|
||||||
chanId1 := "testChanId1"
|
|
||||||
chanId2 := "testChanId2"
|
|
||||||
testMsg1 := "test message 1"
|
|
||||||
testMsg2 := "test message 2"
|
|
||||||
|
|
||||||
// Act
|
|
||||||
ch1, startErr1 := cm.Start(chanId1, 1 /* bufferSize */)
|
|
||||||
ch2, startErr2 := cm.Start(chanId2, 1 /* bufferSize */)
|
|
||||||
sigErr1 := cm.Send(chanId1, testMsg1)
|
|
||||||
sigErr2 := cm.Send(chanId2, testMsg2)
|
|
||||||
cm.Close(chanId1)
|
|
||||||
sigErr3 := cm.Send(chanId1, testMsg1)
|
|
||||||
|
|
||||||
// Assert
|
|
||||||
verifyNoError(t, startErr1, "Start1")
|
|
||||||
verifyNoError(t, startErr2, "Start2")
|
|
||||||
verifyNoError(t, sigErr1, "Send1")
|
|
||||||
verifyNoError(t, sigErr2, "Send2")
|
|
||||||
verifyError(t, sigErr3, "Send3")
|
|
||||||
actualMsg1 := <-ch1
|
|
||||||
actualMsg2 := <-ch2
|
|
||||||
verifyMsg(t, testMsg1 /* expected */, actualMsg1.(string) /* actual */)
|
|
||||||
verifyMsg(t, testMsg2 /* expected */, actualMsg2.(string) /* actual */)
|
|
||||||
}
|
|
||||||
|
|
||||||
func TestExists(t *testing.T) {
|
|
||||||
// Arrange
|
|
||||||
cm := NewOperationManager()
|
|
||||||
chanId1 := "testChanId1"
|
|
||||||
chanId2 := "testChanId2"
|
|
||||||
|
|
||||||
// Act & Assert
|
|
||||||
verifyExists(t, cm, chanId1, false /* expected */)
|
|
||||||
verifyExists(t, cm, chanId2, false /* expected */)
|
|
||||||
|
|
||||||
_, startErr1 := cm.Start(chanId1, 1 /* bufferSize */)
|
|
||||||
verifyNoError(t, startErr1, "Start1")
|
|
||||||
verifyExists(t, cm, chanId1, true /* expected */)
|
|
||||||
verifyExists(t, cm, chanId2, false /* expected */)
|
|
||||||
|
|
||||||
_, startErr2 := cm.Start(chanId2, 1 /* bufferSize */)
|
|
||||||
verifyNoError(t, startErr2, "Start2")
|
|
||||||
verifyExists(t, cm, chanId1, true /* expected */)
|
|
||||||
verifyExists(t, cm, chanId2, true /* expected */)
|
|
||||||
|
|
||||||
cm.Close(chanId1)
|
|
||||||
verifyExists(t, cm, chanId1, false /* expected */)
|
|
||||||
verifyExists(t, cm, chanId2, true /* expected */)
|
|
||||||
|
|
||||||
cm.Close(chanId2)
|
|
||||||
verifyExists(t, cm, chanId1, false /* expected */)
|
|
||||||
verifyExists(t, cm, chanId2, false /* expected */)
|
|
||||||
}
|
|
||||||
|
|
||||||
func verifyExists(t *testing.T, cm OperationManager, id string, expected bool) {
|
|
||||||
if actual := cm.Exists(id); expected != actual {
|
|
||||||
t.Fatalf("Unexpected Exists(%q) response. Expected: <%v> Actual: <%v>", id, expected, actual)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func verifyNoError(t *testing.T, err error, name string) {
|
|
||||||
if err != nil {
|
|
||||||
t.Fatalf("Unexpected response on %q. Expected: <no error> Actual: <%v>", name, err)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func verifyError(t *testing.T, err error, name string) {
|
|
||||||
if err == nil {
|
|
||||||
t.Fatalf("Unexpected response on %q. Expected: <error> Actual: <no error>", name)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func verifyMsg(t *testing.T, expected, actual string) {
|
|
||||||
if actual != expected {
|
|
||||||
t.Fatalf("Unexpected testMsg value. Expected: <%v> Actual: <%v>", expected, actual)
|
|
||||||
}
|
|
||||||
}
|
|
@ -29,7 +29,7 @@ import (
|
|||||||
gcecloud "k8s.io/kubernetes/pkg/cloudprovider/providers/gce"
|
gcecloud "k8s.io/kubernetes/pkg/cloudprovider/providers/gce"
|
||||||
"k8s.io/kubernetes/pkg/util"
|
"k8s.io/kubernetes/pkg/util"
|
||||||
"k8s.io/kubernetes/pkg/util/exec"
|
"k8s.io/kubernetes/pkg/util/exec"
|
||||||
"k8s.io/kubernetes/pkg/util/operationmanager"
|
"k8s.io/kubernetes/pkg/util/keymutex"
|
||||||
"k8s.io/kubernetes/pkg/util/sets"
|
"k8s.io/kubernetes/pkg/util/sets"
|
||||||
)
|
)
|
||||||
|
|
||||||
@ -46,18 +46,21 @@ const (
|
|||||||
errorSleepDuration = 5 * time.Second
|
errorSleepDuration = 5 * time.Second
|
||||||
)
|
)
|
||||||
|
|
||||||
// Singleton operation manager for managing detach clean up go routines
|
// Singleton key mutex for keeping attach/detach operations for the same PD atomic
|
||||||
var detachCleanupManager = operationmanager.NewOperationManager()
|
var attachDetachMutex = keymutex.NewKeyMutex()
|
||||||
|
|
||||||
type GCEDiskUtil struct{}
|
type GCEDiskUtil struct{}
|
||||||
|
|
||||||
// Attaches a disk specified by a volume.GCEPersistentDisk to the current kubelet.
|
// Attaches a disk specified by a volume.GCEPersistentDisk to the current kubelet.
|
||||||
// Mounts the disk to it's global path.
|
// Mounts the disk to it's global path.
|
||||||
func (diskUtil *GCEDiskUtil) AttachAndMountDisk(b *gcePersistentDiskBuilder, globalPDPath string) error {
|
func (diskUtil *GCEDiskUtil) AttachAndMountDisk(b *gcePersistentDiskBuilder, globalPDPath string) error {
|
||||||
glog.V(5).Infof("AttachAndMountDisk(b, %q) where b is %#v\r\n", globalPDPath, b)
|
glog.V(5).Infof("AttachAndMountDisk(...) called for PD %q. Will block for existing operations, if any. (globalPDPath=%q)\r\n", b.pdName, globalPDPath)
|
||||||
|
|
||||||
// Block execution until any pending detach goroutines for this pd have completed
|
// Block execution until any pending detach operations for this PD have completed
|
||||||
detachCleanupManager.Send(b.pdName, true)
|
attachDetachMutex.LockKey(b.pdName)
|
||||||
|
defer attachDetachMutex.UnlockKey(b.pdName)
|
||||||
|
|
||||||
|
glog.V(5).Infof("AttachAndMountDisk(...) called for PD %q. Awake and ready to execute. (globalPDPath=%q)\r\n", b.pdName, globalPDPath)
|
||||||
|
|
||||||
sdBefore, err := filepath.Glob(diskSDPattern)
|
sdBefore, err := filepath.Glob(diskSDPattern)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
@ -98,24 +101,13 @@ func (diskUtil *GCEDiskUtil) AttachAndMountDisk(b *gcePersistentDiskBuilder, glo
|
|||||||
|
|
||||||
// Unmounts the device and detaches the disk from the kubelet's host machine.
|
// Unmounts the device and detaches the disk from the kubelet's host machine.
|
||||||
func (util *GCEDiskUtil) DetachDisk(c *gcePersistentDiskCleaner) error {
|
func (util *GCEDiskUtil) DetachDisk(c *gcePersistentDiskCleaner) error {
|
||||||
// Unmount the global PD mount, which should be the only one.
|
glog.V(5).Infof("DetachDisk(...) for PD %q\r\n", c.pdName)
|
||||||
globalPDPath := makeGlobalPDName(c.plugin.host, c.pdName)
|
|
||||||
glog.V(5).Infof("DetachDisk(c) where c is %#v and the globalPDPath is %q\r\n", c, globalPDPath)
|
|
||||||
|
|
||||||
if err := c.mounter.Unmount(globalPDPath); err != nil {
|
if err := unmountPDAndRemoveGlobalPath(c); err != nil {
|
||||||
return err
|
glog.Errorf("Error unmounting PD %q: %v", c.pdName, err)
|
||||||
}
|
|
||||||
if err := os.Remove(globalPDPath); err != nil {
|
|
||||||
return err
|
|
||||||
}
|
}
|
||||||
|
|
||||||
if detachCleanupManager.Exists(c.pdName) {
|
// Detach disk asynchronously so that the kubelet sync loop is not blocked.
|
||||||
glog.Warningf("Terminating new DetachDisk call for GCE PD %q. A previous detach call for this PD is still pending.", c.pdName)
|
|
||||||
return nil
|
|
||||||
|
|
||||||
}
|
|
||||||
|
|
||||||
// Detach disk, retry if needed.
|
|
||||||
go detachDiskAndVerify(c)
|
go detachDiskAndVerify(c)
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
@ -125,9 +117,6 @@ func attachDiskAndVerify(b *gcePersistentDiskBuilder, sdBeforeSet sets.String) (
|
|||||||
devicePaths := getDiskByIdPaths(b.gcePersistentDisk)
|
devicePaths := getDiskByIdPaths(b.gcePersistentDisk)
|
||||||
var gceCloud *gcecloud.GCECloud
|
var gceCloud *gcecloud.GCECloud
|
||||||
for numRetries := 0; numRetries < maxRetries; numRetries++ {
|
for numRetries := 0; numRetries < maxRetries; numRetries++ {
|
||||||
// Block execution until any pending detach goroutines for this pd have completed
|
|
||||||
detachCleanupManager.Send(b.pdName, true)
|
|
||||||
|
|
||||||
var err error
|
var err error
|
||||||
if gceCloud == nil {
|
if gceCloud == nil {
|
||||||
gceCloud, err = getCloudProvider()
|
gceCloud, err = getCloudProvider()
|
||||||
@ -140,11 +129,10 @@ func attachDiskAndVerify(b *gcePersistentDiskBuilder, sdBeforeSet sets.String) (
|
|||||||
}
|
}
|
||||||
|
|
||||||
if numRetries > 0 {
|
if numRetries > 0 {
|
||||||
glog.Warningf("Timed out waiting for GCE PD %q to attach. Retrying attach.", b.pdName)
|
glog.Warningf("Retrying attach for GCE PD %q (retry count=%v).", b.pdName, numRetries)
|
||||||
}
|
}
|
||||||
|
|
||||||
if err := gceCloud.AttachDisk(b.pdName, b.readOnly); err != nil {
|
if err := gceCloud.AttachDisk(b.pdName, b.readOnly); err != nil {
|
||||||
// Retry on error. See issue #11321.
|
|
||||||
glog.Errorf("Error attaching PD %q: %v", b.pdName, err)
|
glog.Errorf("Error attaching PD %q: %v", b.pdName, err)
|
||||||
time.Sleep(errorSleepDuration)
|
time.Sleep(errorSleepDuration)
|
||||||
continue
|
continue
|
||||||
@ -190,33 +178,15 @@ func verifyDevicePath(devicePaths []string, sdBeforeSet sets.String) (string, er
|
|||||||
|
|
||||||
// Detaches the specified persistent disk device from node, verifies that it is detached, and retries if it fails.
|
// Detaches the specified persistent disk device from node, verifies that it is detached, and retries if it fails.
|
||||||
// This function is intended to be called asynchronously as a go routine.
|
// This function is intended to be called asynchronously as a go routine.
|
||||||
// It starts the detachCleanupManager with the specified pdName so that callers can wait for completion.
|
|
||||||
func detachDiskAndVerify(c *gcePersistentDiskCleaner) {
|
func detachDiskAndVerify(c *gcePersistentDiskCleaner) {
|
||||||
glog.V(5).Infof("detachDiskAndVerify for pd %q.", c.pdName)
|
glog.V(5).Infof("detachDiskAndVerify(...) for pd %q. Will block for pending operations", c.pdName)
|
||||||
defer util.HandleCrash()
|
defer util.HandleCrash()
|
||||||
|
|
||||||
// Start operation, so that other threads can wait on this detach operation.
|
// Block execution until any pending attach/detach operations for this PD have completed
|
||||||
// Set bufferSize to 0 so senders are blocked on send until we receive.
|
attachDetachMutex.LockKey(c.pdName)
|
||||||
ch, err := detachCleanupManager.Start(c.pdName, 0 /* bufferSize */)
|
defer attachDetachMutex.UnlockKey(c.pdName)
|
||||||
if err != nil {
|
|
||||||
glog.Errorf("Error adding %q to detachCleanupManager: %v", c.pdName, err)
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
defer detachCleanupManager.Close(c.pdName)
|
glog.V(5).Infof("detachDiskAndVerify(...) for pd %q. Awake and ready to execute.", c.pdName)
|
||||||
|
|
||||||
defer func() {
|
|
||||||
// Unblock any callers that have been waiting for this detach routine to complete.
|
|
||||||
for {
|
|
||||||
select {
|
|
||||||
case <-ch:
|
|
||||||
glog.V(5).Infof("detachDiskAndVerify for pd %q clearing chan.", c.pdName)
|
|
||||||
default:
|
|
||||||
glog.V(5).Infof("detachDiskAndVerify for pd %q done clearing chans.", c.pdName)
|
|
||||||
return
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}()
|
|
||||||
|
|
||||||
devicePaths := getDiskByIdPaths(c.gcePersistentDisk)
|
devicePaths := getDiskByIdPaths(c.gcePersistentDisk)
|
||||||
var gceCloud *gcecloud.GCECloud
|
var gceCloud *gcecloud.GCECloud
|
||||||
@ -233,13 +203,13 @@ func detachDiskAndVerify(c *gcePersistentDiskCleaner) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
if numRetries > 0 {
|
if numRetries > 0 {
|
||||||
glog.Warningf("Timed out waiting for GCE PD %q to detach. Retrying detach.", c.pdName)
|
glog.Warningf("Retrying detach for GCE PD %q (retry count=%v).", c.pdName, numRetries)
|
||||||
}
|
}
|
||||||
|
|
||||||
if err := gceCloud.DetachDisk(c.pdName); err != nil {
|
if err := gceCloud.DetachDisk(c.pdName); err != nil {
|
||||||
// Retry on error. See issue #11321. Continue and verify if disk is detached, because a
|
|
||||||
// previous detach operation may still succeed.
|
|
||||||
glog.Errorf("Error detaching PD %q: %v", c.pdName, err)
|
glog.Errorf("Error detaching PD %q: %v", c.pdName, err)
|
||||||
|
time.Sleep(errorSleepDuration)
|
||||||
|
continue
|
||||||
}
|
}
|
||||||
|
|
||||||
for numChecks := 0; numChecks < maxChecks; numChecks++ {
|
for numChecks := 0; numChecks < maxChecks; numChecks++ {
|
||||||
@ -249,6 +219,7 @@ func detachDiskAndVerify(c *gcePersistentDiskCleaner) {
|
|||||||
glog.Errorf("Error verifying GCE PD (%q) is detached: %v", c.pdName, err)
|
glog.Errorf("Error verifying GCE PD (%q) is detached: %v", c.pdName, err)
|
||||||
} else if allPathsRemoved {
|
} else if allPathsRemoved {
|
||||||
// All paths to the PD have been succefully removed
|
// All paths to the PD have been succefully removed
|
||||||
|
unmountPDAndRemoveGlobalPath(c)
|
||||||
glog.Infof("Successfully detached GCE PD %q.", c.pdName)
|
glog.Infof("Successfully detached GCE PD %q.", c.pdName)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
@ -263,6 +234,15 @@ func detachDiskAndVerify(c *gcePersistentDiskCleaner) {
|
|||||||
glog.Errorf("Failed to detach GCE PD %q. One or more mount paths was not removed.", c.pdName)
|
glog.Errorf("Failed to detach GCE PD %q. One or more mount paths was not removed.", c.pdName)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Unmount the global PD mount, which should be the only one, and delete it.
|
||||||
|
func unmountPDAndRemoveGlobalPath(c *gcePersistentDiskCleaner) error {
|
||||||
|
globalPDPath := makeGlobalPDName(c.plugin.host, c.pdName)
|
||||||
|
|
||||||
|
err := c.mounter.Unmount(globalPDPath)
|
||||||
|
os.Remove(globalPDPath)
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
// Returns the first path that exists, or empty string if none exist.
|
// Returns the first path that exists, or empty string if none exist.
|
||||||
func verifyAllPathsRemoved(devicePaths []string) (bool, error) {
|
func verifyAllPathsRemoved(devicePaths []string) (bool, error) {
|
||||||
allPathsRemoved := true
|
allPathsRemoved := true
|
||||||
|
Loading…
Reference in New Issue
Block a user