diff --git a/pkg/cloudprovider/providers/gce/gce.go b/pkg/cloudprovider/providers/gce/gce.go index bd33d7a27d1..4898070145e 100644 --- a/pkg/cloudprovider/providers/gce/gce.go +++ b/pkg/cloudprovider/providers/gce/gce.go @@ -55,6 +55,9 @@ const ( gceAffinityTypeClientIP = "CLIENT_IP" // AffinityTypeClientIPProto - affinity based on Client IP and port. gceAffinityTypeClientIPProto = "CLIENT_IP_PROTO" + + operationPollInterval = 3 * time.Second + operationPollTimeoutDuration = 30 * time.Minute ) // GCECloud is an implementation of Interface, TCPLoadBalancer and Instances for Google Compute Engine. @@ -259,48 +262,57 @@ func (gce *GCECloud) targetPoolURL(name, region string) string { return fmt.Sprintf("https://www.googleapis.com/compute/v1/projects/%s/regions/%s/targetPools/%s", gce.projectID, region, name) } -func waitForOp(op *compute.Operation, getOperation func() (*compute.Operation, error)) error { - pollOp := op - consecPollFails := 0 - for pollOp.Status != "DONE" { - var err error - time.Sleep(3 * time.Second) - pollOp, err = getOperation() - if err != nil { - if consecPollFails == 2 { - // Only bail if we've seen 3 consecutive polling errors. - return err - } - consecPollFails++ - } else { - consecPollFails = 0 - } +func waitForOp(op *compute.Operation, getOperation func(operationName string) (*compute.Operation, error)) error { + if op == nil { + return fmt.Errorf("operation must not be nil") } - if pollOp.Error != nil && len(pollOp.Error.Errors) > 0 { - return &googleapi.Error{ - Code: int(pollOp.HttpErrorStatusCode), - Message: pollOp.Error.Errors[0].Message, - } - } - return nil + if opIsDone(op) { + return getErrorFromOp(op) + } + + opName := op.Name + return wait.Poll(operationPollInterval, operationPollTimeoutDuration, func() (bool, error) { + pollOp, err := getOperation(opName) + if err != nil { + glog.Warningf("GCE poll operation failed: %v", err) + } + return opIsDone(pollOp), getErrorFromOp(pollOp) + }) +} + +func opIsDone(op *compute.Operation) bool { + return op != nil && op.Status == "DONE" +} + +func getErrorFromOp(op *compute.Operation) error { + if op != nil && op.Error != nil && len(op.Error.Errors) > 0 { + err := &googleapi.Error{ + Code: int(op.HttpErrorStatusCode), + Message: op.Error.Errors[0].Message, + } + glog.Errorf("GCE operation failed: %v", err) + return err + } + + return nil } func (gce *GCECloud) waitForGlobalOp(op *compute.Operation) error { - return waitForOp(op, func() (*compute.Operation, error) { - return gce.service.GlobalOperations.Get(gce.projectID, op.Name).Do() + return waitForOp(op, func(operationName string) (*compute.Operation, error) { + return gce.service.GlobalOperations.Get(gce.projectID, operationName).Do() }) } func (gce *GCECloud) waitForRegionOp(op *compute.Operation, region string) error { - return waitForOp(op, func() (*compute.Operation, error) { - return gce.service.RegionOperations.Get(gce.projectID, region, op.Name).Do() + return waitForOp(op, func(operationName string) (*compute.Operation, error) { + return gce.service.RegionOperations.Get(gce.projectID, region, operationName).Do() }) } func (gce *GCECloud) waitForZoneOp(op *compute.Operation) error { - return waitForOp(op, func() (*compute.Operation, error) { - return gce.service.ZoneOperations.Get(gce.projectID, gce.zone, op.Name).Do() + return waitForOp(op, func(operationName string) (*compute.Operation, error) { + return gce.service.ZoneOperations.Get(gce.projectID, gce.zone, operationName).Do() }) } @@ -1457,18 +1469,7 @@ func (gce *GCECloud) AttachDisk(diskName string, readOnly bool) error { attachOp, err := gce.service.Instances.AttachDisk(gce.projectID, gce.zone, gce.instanceID, attachedDisk).Do() if err != nil { - // Check if the disk is already attached to this instance. We do this only - // in the error case, since it is expected to be exceptional. - instance, err := gce.service.Instances.Get(gce.projectID, gce.zone, gce.instanceID).Do() - if err != nil { - return err - } - for _, disk := range instance.Disks { - if disk.Source == attachedDisk.Source { - // Disk is already attached, we're good to go. - return nil - } - } + return err } return gce.waitForZoneOp(attachOp) diff --git a/pkg/util/keymutex/keymutex.go b/pkg/util/keymutex/keymutex.go new file mode 100644 index 00000000000..bb01a019bae --- /dev/null +++ b/pkg/util/keymutex/keymutex.go @@ -0,0 +1,82 @@ +/* +Copyright 2015 The Kubernetes Authors All rights reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package keymutex + +import ( + "fmt" + "github.com/golang/glog" + "sync" +) + +// KeyMutex is a thread-safe interface for aquiring locks on arbitrary strings. +type KeyMutex interface { + // Aquires a lock associated with the specified ID, creates the lock if one doesn't already exist. + LockKey(id string) + + // Releases the lock associated with the specified ID. + // Returns an error if the specified ID doesn't exist. + UnlockKey(id string) error +} + +// Returns a new instance of a key mutex. +func NewKeyMutex() KeyMutex { + return &keyMutex{ + mutexMap: make(map[string]*sync.Mutex), + } +} + +type keyMutex struct { + sync.RWMutex + mutexMap map[string]*sync.Mutex +} + +// Aquires a lock associated with the specified ID (creates the lock if one doesn't already exist). +func (km *keyMutex) LockKey(id string) { + glog.V(5).Infof("LockKey(...) called for id %q\r\n", id) + mutex := km.getOrCreateLock(id) + mutex.Lock() + glog.V(5).Infof("LockKey(...) for id %q completed.\r\n", id) +} + +// Releases the lock associated with the specified ID. +// Returns an error if the specified ID doesn't exist. +func (km *keyMutex) UnlockKey(id string) error { + glog.V(5).Infof("UnlockKey(...) called for id %q\r\n", id) + km.RLock() + defer km.RUnlock() + mutex, exists := km.mutexMap[id] + if !exists { + return fmt.Errorf("id %q not found", id) + } + glog.V(5).Infof("UnlockKey(...) for id. Mutex found, trying to unlock it. %q\r\n", id) + + mutex.Unlock() + glog.V(5).Infof("UnlockKey(...) for id %q completed.\r\n", id) + return nil +} + +// Returns lock associated with the specified ID, or creates the lock if one doesn't already exist. +func (km *keyMutex) getOrCreateLock(id string) *sync.Mutex { + km.Lock() + defer km.Unlock() + + if _, exists := km.mutexMap[id]; !exists { + km.mutexMap[id] = &sync.Mutex{} + } + + return km.mutexMap[id] +} diff --git a/pkg/util/keymutex/keymutex_test.go b/pkg/util/keymutex/keymutex_test.go new file mode 100644 index 00000000000..faa3be16aad --- /dev/null +++ b/pkg/util/keymutex/keymutex_test.go @@ -0,0 +1,111 @@ +/* +Copyright 2015 The Kubernetes Authors All rights reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package keymutex + +import ( + "testing" + "time" +) + +const ( + callbackTimeout = 1 * time.Second +) + +func Test_SingleLock_NoUnlock(t *testing.T) { + // Arrange + km := NewKeyMutex() + key := "fakeid" + callbackCh := make(chan interface{}) + + // Act + go lockAndCallback(km, key, callbackCh) + + // Assert + verifyCallbackHappens(t, callbackCh) +} + +func Test_SingleLock_SingleUnlock(t *testing.T) { + // Arrange + km := NewKeyMutex() + key := "fakeid" + callbackCh := make(chan interface{}) + + // Act & Assert + go lockAndCallback(km, key, callbackCh) + verifyCallbackHappens(t, callbackCh) + km.UnlockKey(key) +} + +func Test_DoubleLock_DoubleUnlock(t *testing.T) { + // Arrange + km := NewKeyMutex() + key := "fakeid" + callbackCh1stLock := make(chan interface{}) + callbackCh2ndLock := make(chan interface{}) + + // Act & Assert + go lockAndCallback(km, key, callbackCh1stLock) + verifyCallbackHappens(t, callbackCh1stLock) + go lockAndCallback(km, key, callbackCh2ndLock) + verifyCallbackDoesntHappens(t, callbackCh2ndLock) + km.UnlockKey(key) + verifyCallbackHappens(t, callbackCh2ndLock) + km.UnlockKey(key) +} + +func lockAndCallback(km KeyMutex, id string, callbackCh chan<- interface{}) { + km.LockKey(id) + callbackCh <- true +} + +func verifyCallbackHappens(t *testing.T, callbackCh <-chan interface{}) bool { + select { + case <-callbackCh: + return true + case <-time.After(callbackTimeout): + t.Fatalf("Timed out waiting for callback.") + return false + } +} + +func verifyCallbackDoesntHappens(t *testing.T, callbackCh <-chan interface{}) bool { + select { + case <-callbackCh: + t.Fatalf("Unexpected callback.") + return false + case <-time.After(callbackTimeout): + return true + } +} + +func verifyNoError(t *testing.T, err error, name string) { + if err != nil { + t.Fatalf("Unexpected response on %q. Expected: Actual: <%v>", name, err) + } +} + +func verifyError(t *testing.T, err error, name string) { + if err == nil { + t.Fatalf("Unexpected response on %q. Expected: Actual: ", name) + } +} + +func verifyMsg(t *testing.T, expected, actual string) { + if actual != expected { + t.Fatalf("Unexpected testMsg value. Expected: <%v> Actual: <%v>", expected, actual) + } +} diff --git a/pkg/util/operationmanager/operationmanager.go b/pkg/util/operationmanager/operationmanager.go deleted file mode 100644 index 39ff248573d..00000000000 --- a/pkg/util/operationmanager/operationmanager.go +++ /dev/null @@ -1,103 +0,0 @@ -/* -Copyright 2015 The Kubernetes Authors All rights reserved. - -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. -*/ - -package operationmanager - -import ( - "fmt" - "sync" -) - -// Operation Manager is a thread-safe interface for keeping track of multiple pending async operations. -type OperationManager interface { - // Called when the operation with the given ID has started. - // Creates a new channel with specified buffer size tracked with the specified ID. - // Returns a read-only version of the newly created channel. - // Returns an error if an entry with the specified ID already exists (previous entry must be removed by calling Close). - Start(id string, bufferSize uint) (<-chan interface{}, error) - - // Called when the operation with the given ID has terminated. - // Closes and removes the channel associated with ID. - // Returns an error if no associated channel exists. - Close(id string) error - - // Attempts to send msg to the channel associated with ID. - // Returns an error if no associated channel exists. - Send(id string, msg interface{}) error - - // Returns true if an entry with the specified ID already exists. - Exists(id string) bool -} - -// Returns a new instance of a channel manager. -func NewOperationManager() OperationManager { - return &operationManager{ - chanMap: make(map[string]chan interface{}), - } -} - -type operationManager struct { - sync.RWMutex - chanMap map[string]chan interface{} -} - -// Called when the operation with the given ID has started. -// Creates a new channel with specified buffer size tracked with the specified ID. -// Returns a read-only version of the newly created channel. -// Returns an error if an entry with the specified ID already exists (previous entry must be removed by calling Close). -func (cm *operationManager) Start(id string, bufferSize uint) (<-chan interface{}, error) { - cm.Lock() - defer cm.Unlock() - if _, exists := cm.chanMap[id]; exists { - return nil, fmt.Errorf("id %q already exists", id) - } - cm.chanMap[id] = make(chan interface{}, bufferSize) - return cm.chanMap[id], nil -} - -// Called when the operation with the given ID has terminated. -// Closes and removes the channel associated with ID. -// Returns an error if no associated channel exists. -func (cm *operationManager) Close(id string) error { - cm.Lock() - defer cm.Unlock() - if _, exists := cm.chanMap[id]; !exists { - return fmt.Errorf("id %q not found", id) - } - close(cm.chanMap[id]) - delete(cm.chanMap, id) - return nil -} - -// Attempts to send msg to the channel associated with ID. -// Returns an error if no associated channel exists. -func (cm *operationManager) Send(id string, msg interface{}) error { - cm.RLock() - defer cm.RUnlock() - if _, exists := cm.chanMap[id]; !exists { - return fmt.Errorf("id %q not found", id) - } - cm.chanMap[id] <- msg - return nil -} - -// Returns true if an entry with the specified ID already exists. -func (cm *operationManager) Exists(id string) (exists bool) { - cm.RLock() - defer cm.RUnlock() - _, exists = cm.chanMap[id] - return -} diff --git a/pkg/util/operationmanager/operationmanager_test.go b/pkg/util/operationmanager/operationmanager_test.go deleted file mode 100644 index ac1d4db34c4..00000000000 --- a/pkg/util/operationmanager/operationmanager_test.go +++ /dev/null @@ -1,159 +0,0 @@ -/* -Copyright 2015 The Kubernetes Authors All rights reserved. - -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. -*/ - -// Channel Manager keeps track of multiple channels -package operationmanager - -import ( - "testing" -) - -func TestStart(t *testing.T) { - // Arrange - cm := NewOperationManager() - chanId := "testChanId" - testMsg := "test message" - - // Act - ch, startErr := cm.Start(chanId, 1 /* bufferSize */) - sigErr := cm.Send(chanId, testMsg) - - // Assert - verifyNoError(t, startErr, "Start") - verifyNoError(t, sigErr, "Send") - actualMsg := <-ch - verifyMsg(t, testMsg /* expected */, actualMsg.(string) /* actual */) -} - -func TestStartIdExists(t *testing.T) { - // Arrange - cm := NewOperationManager() - chanId := "testChanId" - - // Act - _, startErr1 := cm.Start(chanId, 1 /* bufferSize */) - _, startErr2 := cm.Start(chanId, 1 /* bufferSize */) - - // Assert - verifyNoError(t, startErr1, "Start1") - verifyError(t, startErr2, "Start2") -} - -func TestStartAndAdd2Chans(t *testing.T) { - // Arrange - cm := NewOperationManager() - chanId1 := "testChanId1" - chanId2 := "testChanId2" - testMsg1 := "test message 1" - testMsg2 := "test message 2" - - // Act - ch1, startErr1 := cm.Start(chanId1, 1 /* bufferSize */) - ch2, startErr2 := cm.Start(chanId2, 1 /* bufferSize */) - sigErr1 := cm.Send(chanId1, testMsg1) - sigErr2 := cm.Send(chanId2, testMsg2) - - // Assert - verifyNoError(t, startErr1, "Start1") - verifyNoError(t, startErr2, "Start2") - verifyNoError(t, sigErr1, "Send1") - verifyNoError(t, sigErr2, "Send2") - actualMsg1 := <-ch1 - actualMsg2 := <-ch2 - verifyMsg(t, testMsg1 /* expected */, actualMsg1.(string) /* actual */) - verifyMsg(t, testMsg2 /* expected */, actualMsg2.(string) /* actual */) -} - -func TestStartAndAdd2ChansAndClose(t *testing.T) { - // Arrange - cm := NewOperationManager() - chanId1 := "testChanId1" - chanId2 := "testChanId2" - testMsg1 := "test message 1" - testMsg2 := "test message 2" - - // Act - ch1, startErr1 := cm.Start(chanId1, 1 /* bufferSize */) - ch2, startErr2 := cm.Start(chanId2, 1 /* bufferSize */) - sigErr1 := cm.Send(chanId1, testMsg1) - sigErr2 := cm.Send(chanId2, testMsg2) - cm.Close(chanId1) - sigErr3 := cm.Send(chanId1, testMsg1) - - // Assert - verifyNoError(t, startErr1, "Start1") - verifyNoError(t, startErr2, "Start2") - verifyNoError(t, sigErr1, "Send1") - verifyNoError(t, sigErr2, "Send2") - verifyError(t, sigErr3, "Send3") - actualMsg1 := <-ch1 - actualMsg2 := <-ch2 - verifyMsg(t, testMsg1 /* expected */, actualMsg1.(string) /* actual */) - verifyMsg(t, testMsg2 /* expected */, actualMsg2.(string) /* actual */) -} - -func TestExists(t *testing.T) { - // Arrange - cm := NewOperationManager() - chanId1 := "testChanId1" - chanId2 := "testChanId2" - - // Act & Assert - verifyExists(t, cm, chanId1, false /* expected */) - verifyExists(t, cm, chanId2, false /* expected */) - - _, startErr1 := cm.Start(chanId1, 1 /* bufferSize */) - verifyNoError(t, startErr1, "Start1") - verifyExists(t, cm, chanId1, true /* expected */) - verifyExists(t, cm, chanId2, false /* expected */) - - _, startErr2 := cm.Start(chanId2, 1 /* bufferSize */) - verifyNoError(t, startErr2, "Start2") - verifyExists(t, cm, chanId1, true /* expected */) - verifyExists(t, cm, chanId2, true /* expected */) - - cm.Close(chanId1) - verifyExists(t, cm, chanId1, false /* expected */) - verifyExists(t, cm, chanId2, true /* expected */) - - cm.Close(chanId2) - verifyExists(t, cm, chanId1, false /* expected */) - verifyExists(t, cm, chanId2, false /* expected */) -} - -func verifyExists(t *testing.T, cm OperationManager, id string, expected bool) { - if actual := cm.Exists(id); expected != actual { - t.Fatalf("Unexpected Exists(%q) response. Expected: <%v> Actual: <%v>", id, expected, actual) - } -} - -func verifyNoError(t *testing.T, err error, name string) { - if err != nil { - t.Fatalf("Unexpected response on %q. Expected: Actual: <%v>", name, err) - } -} - -func verifyError(t *testing.T, err error, name string) { - if err == nil { - t.Fatalf("Unexpected response on %q. Expected: Actual: ", name) - } -} - -func verifyMsg(t *testing.T, expected, actual string) { - if actual != expected { - t.Fatalf("Unexpected testMsg value. Expected: <%v> Actual: <%v>", expected, actual) - } -} diff --git a/pkg/volume/gce_pd/gce_util.go b/pkg/volume/gce_pd/gce_util.go index 9384acabaf6..c43a610bd3d 100644 --- a/pkg/volume/gce_pd/gce_util.go +++ b/pkg/volume/gce_pd/gce_util.go @@ -29,7 +29,7 @@ import ( gcecloud "k8s.io/kubernetes/pkg/cloudprovider/providers/gce" "k8s.io/kubernetes/pkg/util" "k8s.io/kubernetes/pkg/util/exec" - "k8s.io/kubernetes/pkg/util/operationmanager" + "k8s.io/kubernetes/pkg/util/keymutex" "k8s.io/kubernetes/pkg/util/sets" ) @@ -46,18 +46,21 @@ const ( errorSleepDuration = 5 * time.Second ) -// Singleton operation manager for managing detach clean up go routines -var detachCleanupManager = operationmanager.NewOperationManager() +// Singleton key mutex for keeping attach/detach operations for the same PD atomic +var attachDetachMutex = keymutex.NewKeyMutex() type GCEDiskUtil struct{} // Attaches a disk specified by a volume.GCEPersistentDisk to the current kubelet. // Mounts the disk to it's global path. func (diskUtil *GCEDiskUtil) AttachAndMountDisk(b *gcePersistentDiskBuilder, globalPDPath string) error { - glog.V(5).Infof("AttachAndMountDisk(b, %q) where b is %#v\r\n", globalPDPath, b) + glog.V(5).Infof("AttachAndMountDisk(...) called for PD %q. Will block for existing operations, if any. (globalPDPath=%q)\r\n", b.pdName, globalPDPath) - // Block execution until any pending detach goroutines for this pd have completed - detachCleanupManager.Send(b.pdName, true) + // Block execution until any pending detach operations for this PD have completed + attachDetachMutex.LockKey(b.pdName) + defer attachDetachMutex.UnlockKey(b.pdName) + + glog.V(5).Infof("AttachAndMountDisk(...) called for PD %q. Awake and ready to execute. (globalPDPath=%q)\r\n", b.pdName, globalPDPath) sdBefore, err := filepath.Glob(diskSDPattern) if err != nil { @@ -98,24 +101,13 @@ func (diskUtil *GCEDiskUtil) AttachAndMountDisk(b *gcePersistentDiskBuilder, glo // Unmounts the device and detaches the disk from the kubelet's host machine. func (util *GCEDiskUtil) DetachDisk(c *gcePersistentDiskCleaner) error { - // Unmount the global PD mount, which should be the only one. - globalPDPath := makeGlobalPDName(c.plugin.host, c.pdName) - glog.V(5).Infof("DetachDisk(c) where c is %#v and the globalPDPath is %q\r\n", c, globalPDPath) + glog.V(5).Infof("DetachDisk(...) for PD %q\r\n", c.pdName) - if err := c.mounter.Unmount(globalPDPath); err != nil { - return err - } - if err := os.Remove(globalPDPath); err != nil { - return err + if err := unmountPDAndRemoveGlobalPath(c); err != nil { + glog.Errorf("Error unmounting PD %q: %v", c.pdName, err) } - if detachCleanupManager.Exists(c.pdName) { - glog.Warningf("Terminating new DetachDisk call for GCE PD %q. A previous detach call for this PD is still pending.", c.pdName) - return nil - - } - - // Detach disk, retry if needed. + // Detach disk asynchronously so that the kubelet sync loop is not blocked. go detachDiskAndVerify(c) return nil } @@ -125,9 +117,6 @@ func attachDiskAndVerify(b *gcePersistentDiskBuilder, sdBeforeSet sets.String) ( devicePaths := getDiskByIdPaths(b.gcePersistentDisk) var gceCloud *gcecloud.GCECloud for numRetries := 0; numRetries < maxRetries; numRetries++ { - // Block execution until any pending detach goroutines for this pd have completed - detachCleanupManager.Send(b.pdName, true) - var err error if gceCloud == nil { gceCloud, err = getCloudProvider() @@ -140,11 +129,10 @@ func attachDiskAndVerify(b *gcePersistentDiskBuilder, sdBeforeSet sets.String) ( } if numRetries > 0 { - glog.Warningf("Timed out waiting for GCE PD %q to attach. Retrying attach.", b.pdName) + glog.Warningf("Retrying attach for GCE PD %q (retry count=%v).", b.pdName, numRetries) } if err := gceCloud.AttachDisk(b.pdName, b.readOnly); err != nil { - // Retry on error. See issue #11321. glog.Errorf("Error attaching PD %q: %v", b.pdName, err) time.Sleep(errorSleepDuration) continue @@ -190,33 +178,15 @@ func verifyDevicePath(devicePaths []string, sdBeforeSet sets.String) (string, er // Detaches the specified persistent disk device from node, verifies that it is detached, and retries if it fails. // This function is intended to be called asynchronously as a go routine. -// It starts the detachCleanupManager with the specified pdName so that callers can wait for completion. func detachDiskAndVerify(c *gcePersistentDiskCleaner) { - glog.V(5).Infof("detachDiskAndVerify for pd %q.", c.pdName) + glog.V(5).Infof("detachDiskAndVerify(...) for pd %q. Will block for pending operations", c.pdName) defer util.HandleCrash() - // Start operation, so that other threads can wait on this detach operation. - // Set bufferSize to 0 so senders are blocked on send until we receive. - ch, err := detachCleanupManager.Start(c.pdName, 0 /* bufferSize */) - if err != nil { - glog.Errorf("Error adding %q to detachCleanupManager: %v", c.pdName, err) - return - } + // Block execution until any pending attach/detach operations for this PD have completed + attachDetachMutex.LockKey(c.pdName) + defer attachDetachMutex.UnlockKey(c.pdName) - defer detachCleanupManager.Close(c.pdName) - - defer func() { - // Unblock any callers that have been waiting for this detach routine to complete. - for { - select { - case <-ch: - glog.V(5).Infof("detachDiskAndVerify for pd %q clearing chan.", c.pdName) - default: - glog.V(5).Infof("detachDiskAndVerify for pd %q done clearing chans.", c.pdName) - return - } - } - }() + glog.V(5).Infof("detachDiskAndVerify(...) for pd %q. Awake and ready to execute.", c.pdName) devicePaths := getDiskByIdPaths(c.gcePersistentDisk) var gceCloud *gcecloud.GCECloud @@ -233,13 +203,13 @@ func detachDiskAndVerify(c *gcePersistentDiskCleaner) { } if numRetries > 0 { - glog.Warningf("Timed out waiting for GCE PD %q to detach. Retrying detach.", c.pdName) + glog.Warningf("Retrying detach for GCE PD %q (retry count=%v).", c.pdName, numRetries) } if err := gceCloud.DetachDisk(c.pdName); err != nil { - // Retry on error. See issue #11321. Continue and verify if disk is detached, because a - // previous detach operation may still succeed. glog.Errorf("Error detaching PD %q: %v", c.pdName, err) + time.Sleep(errorSleepDuration) + continue } for numChecks := 0; numChecks < maxChecks; numChecks++ { @@ -249,6 +219,7 @@ func detachDiskAndVerify(c *gcePersistentDiskCleaner) { glog.Errorf("Error verifying GCE PD (%q) is detached: %v", c.pdName, err) } else if allPathsRemoved { // All paths to the PD have been succefully removed + unmountPDAndRemoveGlobalPath(c) glog.Infof("Successfully detached GCE PD %q.", c.pdName) return } @@ -263,6 +234,15 @@ func detachDiskAndVerify(c *gcePersistentDiskCleaner) { glog.Errorf("Failed to detach GCE PD %q. One or more mount paths was not removed.", c.pdName) } +// Unmount the global PD mount, which should be the only one, and delete it. +func unmountPDAndRemoveGlobalPath(c *gcePersistentDiskCleaner) error { + globalPDPath := makeGlobalPDName(c.plugin.host, c.pdName) + + err := c.mounter.Unmount(globalPDPath) + os.Remove(globalPDPath) + return err +} + // Returns the first path that exists, or empty string if none exist. func verifyAllPathsRemoved(devicePaths []string) (bool, error) { allPathsRemoved := true