From c952ee23a70ad0fbecaa1e2c311225e8a7b5e17c Mon Sep 17 00:00:00 2001 From: saadali Date: Sun, 21 Jun 2015 19:39:17 -0700 Subject: [PATCH] Work around for PDs stop mounting after a few hours issue --- pkg/util/operationmanager/operationmanager.go | 92 +++++++ .../operationmanager/operationmanager_test.go | 139 ++++++++++ pkg/volume/gce_pd/gce_util.go | 243 +++++++++++++++--- 3 files changed, 439 insertions(+), 35 deletions(-) create mode 100644 pkg/util/operationmanager/operationmanager.go create mode 100644 pkg/util/operationmanager/operationmanager_test.go diff --git a/pkg/util/operationmanager/operationmanager.go b/pkg/util/operationmanager/operationmanager.go new file mode 100644 index 00000000000..eb8fed88ed0 --- /dev/null +++ b/pkg/util/operationmanager/operationmanager.go @@ -0,0 +1,92 @@ +/* +Copyright 2015 The Kubernetes Authors All rights reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package operationmanager + +import ( + "fmt" + "sync" +) + +// Operation Manager is a thread-safe interface for keeping track of multiple pending async operations. +type OperationManager interface { + // Called when the operation with the given ID has started. + // Creates a new channel with specified buffer size tracked with the specified ID. + // Returns a read-only version of the newly created channel. + // Returns an error if an entry with the specified ID already exists (previous entry must be removed by calling Close). + Start(id string, bufferSize uint) (<-chan interface{}, error) + + // Called when the operation with the given ID has terminated. + // Closes and removes the channel associated with ID. + // Returns an error if no associated channel exists. + Close(id string) error + + // Attempts to send msg to the channel associated with ID. + // Returns an error if no associated channel exists. + Send(id string, msg interface{}) error +} + +// Returns a new instance of a channel manager. +func NewOperationManager() OperationManager { + return &operationManager{ + chanMap: make(map[string]chan interface{}), + } +} + +type operationManager struct { + sync.RWMutex + chanMap map[string]chan interface{} +} + +// Called when the operation with the given ID has started. +// Creates a new channel with specified buffer size tracked with the specified ID. +// Returns a read-only version of the newly created channel. +// Returns an error if an entry with the specified ID already exists (previous entry must be removed by calling Close). +func (cm *operationManager) Start(id string, bufferSize uint) (<-chan interface{}, error) { + cm.Lock() + defer cm.Unlock() + if _, exists := cm.chanMap[id]; exists { + return nil, fmt.Errorf("id %q already exists", id) + } + cm.chanMap[id] = make(chan interface{}, bufferSize) + return cm.chanMap[id], nil +} + +// Called when the operation with the given ID has terminated. +// Closes and removes the channel associated with ID. +// Returns an error if no associated channel exists. +func (cm *operationManager) Close(id string) error { + cm.Lock() + defer cm.Unlock() + if _, exists := cm.chanMap[id]; !exists { + return fmt.Errorf("id %q not found", id) + } + close(cm.chanMap[id]) + delete(cm.chanMap, id) + return nil +} + +// Attempts to send msg to the channel associated with ID. +// Returns an error if no associated channel exists. +func (cm *operationManager) Send(id string, msg interface{}) error { + cm.RLock() + defer cm.RUnlock() + if _, exists := cm.chanMap[id]; !exists { + return fmt.Errorf("id %q not found", id) + } + cm.chanMap[id] <- msg + return nil +} diff --git a/pkg/util/operationmanager/operationmanager_test.go b/pkg/util/operationmanager/operationmanager_test.go new file mode 100644 index 00000000000..d19f4f3fe56 --- /dev/null +++ b/pkg/util/operationmanager/operationmanager_test.go @@ -0,0 +1,139 @@ +/* +Copyright 2015 The Kubernetes Authors All rights reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +// Channel Manager keeps track of multiple channels +package operationmanager + +import ( + "testing" +) + +func TestStart(t *testing.T) { + // Arrange + cm := NewOperationManager() + chanId := "testChanId" + testMsg := "test message" + + // Act + ch, startErr := cm.Start(chanId, 1 /* bufferSize */) + sigErr := cm.Send(chanId, testMsg) + + // Assert + if startErr != nil { + t.Fatalf("Unexpected error on Start. Expected: Actual: <%v>", startErr) + } + if sigErr != nil { + t.Fatalf("Unexpected error on Send. Expected: Actual: <%v>", sigErr) + } + if actual := <-ch; actual != testMsg { + t.Fatalf("Unexpected testMsg value. Expected: <%v> Actual: <%v>", testMsg, actual) + } +} + +func TestStartIdExists(t *testing.T) { + // Arrange + cm := NewOperationManager() + chanId := "testChanId" + + // Act + _, startErr1 := cm.Start(chanId, 1 /* bufferSize */) + _, startErr2 := cm.Start(chanId, 1 /* bufferSize */) + + // Assert + if startErr1 != nil { + t.Fatalf("Unexpected error on Start1. Expected: Actual: <%v>", startErr1) + } + if startErr2 == nil { + t.Fatalf("Expected error on Start2. Expected: Actual: ") + } +} + +func TestStartAndAdd2Chans(t *testing.T) { + // Arrange + cm := NewOperationManager() + chanId1 := "testChanId1" + chanId2 := "testChanId2" + testMsg1 := "test message 1" + testMsg2 := "test message 2" + + // Act + ch1, startErr1 := cm.Start(chanId1, 1 /* bufferSize */) + ch2, startErr2 := cm.Start(chanId2, 1 /* bufferSize */) + sigErr1 := cm.Send(chanId1, testMsg1) + sigErr2 := cm.Send(chanId2, testMsg2) + + // Assert + if startErr1 != nil { + t.Fatalf("Unexpected error on Start1. Expected: Actual: <%v>", startErr1) + } + if startErr2 != nil { + t.Fatalf("Unexpected error on Start2. Expected: Actual: <%v>", startErr2) + } + if sigErr1 != nil { + t.Fatalf("Unexpected error on Send1. Expected: Actual: <%v>", sigErr1) + } + if sigErr2 != nil { + t.Fatalf("Unexpected error on Send2. Expected: Actual: <%v>", sigErr2) + } + if actual := <-ch1; actual != testMsg1 { + t.Fatalf("Unexpected testMsg value. Expected: <%v> Actual: <%v>", testMsg1, actual) + } + if actual := <-ch2; actual != testMsg2 { + t.Fatalf("Unexpected testMsg value. Expected: <%v> Actual: <%v>", testMsg2, actual) + } + +} + +func TestStartAndAdd2ChansAndClose(t *testing.T) { + // Arrange + cm := NewOperationManager() + chanId1 := "testChanId1" + chanId2 := "testChanId2" + testMsg1 := "test message 1" + testMsg2 := "test message 2" + + // Act + ch1, startErr1 := cm.Start(chanId1, 1 /* bufferSize */) + ch2, startErr2 := cm.Start(chanId2, 1 /* bufferSize */) + sigErr1 := cm.Send(chanId1, testMsg1) + sigErr2 := cm.Send(chanId2, testMsg2) + cm.Close(chanId1) + sigErr3 := cm.Send(chanId1, testMsg1) + + // Assert + if startErr1 != nil { + t.Fatalf("Unexpected error on Start1. Expected: Actual: <%v>", startErr1) + } + if startErr2 != nil { + t.Fatalf("Unexpected error on Start2. Expected: Actual: <%v>", startErr2) + } + if sigErr1 != nil { + t.Fatalf("Unexpected error on Send1. Expected: Actual: <%v>", sigErr1) + } + if sigErr2 != nil { + t.Fatalf("Unexpected error on Send2. Expected: Actual: <%v>", sigErr2) + } + if sigErr3 == nil { + t.Fatalf("Expected error on Send3. Expected: Actual: ", sigErr2) + } + if actual := <-ch1; actual != testMsg1 { + t.Fatalf("Unexpected testMsg value. Expected: <%v> Actual: <%v>", testMsg1, actual) + } + if actual := <-ch2; actual != testMsg2 { + t.Fatalf("Unexpected testMsg value. Expected: <%v> Actual: <%v>", testMsg2, actual) + } + +} diff --git a/pkg/volume/gce_pd/gce_util.go b/pkg/volume/gce_pd/gce_util.go index 44dba25c180..e855cb886d7 100644 --- a/pkg/volume/gce_pd/gce_util.go +++ b/pkg/volume/gce_pd/gce_util.go @@ -17,65 +17,63 @@ limitations under the License. package gce_pd import ( - "errors" "fmt" "os" "path" + "path/filepath" + "strings" "time" "github.com/GoogleCloudPlatform/kubernetes/pkg/cloudprovider" "github.com/GoogleCloudPlatform/kubernetes/pkg/cloudprovider/gce" + "github.com/GoogleCloudPlatform/kubernetes/pkg/util" "github.com/GoogleCloudPlatform/kubernetes/pkg/util/exec" "github.com/GoogleCloudPlatform/kubernetes/pkg/util/mount" + "github.com/GoogleCloudPlatform/kubernetes/pkg/util/operationmanager" "github.com/golang/glog" ) +const ( + diskByIdPath = "/dev/disk/by-id/" + diskGooglePrefix = "google-" + diskScsiGooglePrefix = "scsi-0Google_PersistentDisk_" + diskPartitionSuffix = "-part" + diskSDPath = "/dev/sd" + diskSDPattern = "/dev/sd*" + maxChecks = 10 + maxRetries = 10 + checkSleepDuration = time.Second +) + +// Singleton operation manager for managing detach clean up go routines +var detachCleanupManager = operationmanager.NewOperationManager() + type GCEDiskUtil struct{} // Attaches a disk specified by a volume.GCEPersistentDisk to the current kubelet. // Mounts the disk to it's global path. -func (util *GCEDiskUtil) AttachAndMountDisk(pd *gcePersistentDisk, globalPDPath string) error { +func (diskUtil *GCEDiskUtil) AttachAndMountDisk(pd *gcePersistentDisk, globalPDPath string) error { + glog.V(5).Infof("AttachAndMountDisk(pd, %q) where pd is %#v\r\n", globalPDPath, pd) + // Terminate any in progress verify detach go routines, this will block until the goroutine is ready to exit because the channel is unbuffered + detachCleanupManager.Send(pd.pdName, true) + sdBefore, err := filepath.Glob(diskSDPattern) + if err != nil { + glog.Errorf("Error filepath.Glob(\"%s\"): %v\r\n", diskSDPattern, err) + } + sdBeforeSet := util.NewStringSet(sdBefore...) + gce, err := cloudprovider.GetCloudProvider("gce", nil) if err != nil { return err } + if err := gce.(*gce_cloud.GCECloud).AttachDisk(pd.pdName, pd.readOnly); err != nil { return err } - devicePaths := []string{ - path.Join("/dev/disk/by-id/", "google-"+pd.pdName), - path.Join("/dev/disk/by-id/", "scsi-0Google_PersistentDisk_"+pd.pdName), - } - - if pd.partition != "" { - for i, path := range devicePaths { - devicePaths[i] = path + "-part" + pd.partition - } - } - //TODO(jonesdl) There should probably be better method than busy-waiting here. - numTries := 0 - devicePath := "" - // Wait for the disk device to be created - for { - for _, path := range devicePaths { - _, err := os.Stat(path) - if err == nil { - devicePath = path - break - } - if err != nil && !os.IsNotExist(err) { - return err - } - } - if devicePath != "" { - break - } - numTries++ - if numTries == 10 { - return errors.New("Could not attach disk: Timeout after 10s") - } - time.Sleep(time.Second) + devicePath, err := verifyAttached(pd, sdBeforeSet, gce) + if err != nil { + return err } // Only mount the PD globally once. @@ -108,6 +106,11 @@ func (util *GCEDiskUtil) AttachAndMountDisk(pd *gcePersistentDisk, globalPDPath func (util *GCEDiskUtil) DetachDisk(pd *gcePersistentDisk) error { // Unmount the global PD mount, which should be the only one. globalPDPath := makeGlobalPDName(pd.plugin.host, pd.pdName) + glog.V(5).Infof("DetachDisk(pd) where pd is %#v and the globalPDPath is %q\r\n", pd, globalPDPath) + + // Terminate any in progress verify detach go routines, this will block until the goroutine is ready to exit because the channel is unbuffered + detachCleanupManager.Send(pd.pdName, true) + if err := pd.mounter.Unmount(globalPDPath); err != nil { return err } @@ -122,6 +125,176 @@ func (util *GCEDiskUtil) DetachDisk(pd *gcePersistentDisk) error { if err := gce.(*gce_cloud.GCECloud).DetachDisk(pd.pdName); err != nil { return err } + + // Verify disk detached, retry if needed. + go verifyDetached(pd, gce) + return nil +} + +// Verifys the disk device to be created has been succesffully attached, and retries if it fails. +func verifyAttached(pd *gcePersistentDisk, sdBeforeSet util.StringSet, gce cloudprovider.Interface) (string, error) { + devicePaths := getDiskByIdPaths(pd) + for numRetries := 0; numRetries < maxRetries; numRetries++ { + for numChecks := 0; numChecks < maxChecks; numChecks++ { + if err := udevadmChangeToNewDrives(sdBeforeSet); err != nil { + // udevadm errors should not block disk attachment, log and continue + glog.Errorf("%v", err) + } + + for _, path := range devicePaths { + if pathExists, err := pathExists(path); err != nil { + return "", err + } else if pathExists { + // A device path has succesfully been created for the PD + glog.V(5).Infof("Succesfully attached GCE PD %q.", pd.pdName) + return path, nil + } + } + + // Sleep then check again + glog.V(5).Infof("Waiting for GCE PD %q to attach.", pd.pdName) + time.Sleep(checkSleepDuration) + } + + // Try attaching the disk again + glog.Warningf("Timed out waiting for GCE PD %q to attach. Retrying attach.", pd.pdName) + if err := gce.(*gce_cloud.GCECloud).AttachDisk(pd.pdName, pd.readOnly); err != nil { + return "", err + } + } + + return "", fmt.Errorf("Could not attach GCE PD %q. Timeout waiting for mount paths to be created.", pd.pdName) +} + +// Veify the specified persistent disk device has been succesfully detached, and retries if it fails. +// This function is intended to be called asynchronously as a go routine. +func verifyDetached(pd *gcePersistentDisk, gce cloudprovider.Interface) { + defer util.HandleCrash() + + // Setting bufferSize to 0 so that when senders send, they are blocked until we recieve. This avoids the need to have a separate exit check. + ch, err := detachCleanupManager.Start(pd.pdName, 0 /* bufferSize */) + if err != nil { + glog.Errorf("Error adding %q to detachCleanupManager: %v", pd.pdName, err) + return + } + defer detachCleanupManager.Close(pd.pdName) + + devicePaths := getDiskByIdPaths(pd) + for numRetries := 0; numRetries < maxRetries; numRetries++ { + for numChecks := 0; numChecks < maxChecks; numChecks++ { + select { + case <-ch: + glog.Warningf("Terminating GCE PD %q detach verification. Another attach/detach call was made for this PD.", pd.pdName) + return + default: + allPathsRemoved := true + for _, path := range devicePaths { + if err := udevadmChangeToDrive(path); err != nil { + // udevadm errors should not block disk detachment, log and continue + glog.Errorf("%v", err) + } + if exists, err := pathExists(path); err != nil { + glog.Errorf("Error check path: %v", err) + return + } else { + allPathsRemoved = allPathsRemoved && !exists + } + } + if allPathsRemoved { + // All paths to the PD have been succefully removed + glog.V(5).Infof("Succesfully detached GCE PD %q.", pd.pdName) + return + } + + // Sleep then check again + glog.V(5).Infof("Waiting for GCE PD %q to detach.", pd.pdName) + time.Sleep(checkSleepDuration) + } + } + + // Try detaching disk again + glog.Warningf("Timed out waiting for GCE PD %q to detach. Retrying detach.", pd.pdName) + if err := gce.(*gce_cloud.GCECloud).DetachDisk(pd.pdName); err != nil { + glog.Errorf("Error on retry detach PD %q: %v", pd.pdName, err) + return + } + } + + glog.Errorf("Could not detach GCE PD %q. One or more mount paths was not removed.", pd.pdName) +} + +// Returns list of all /dev/disk/by-id/* paths for given PD. +func getDiskByIdPaths(pd *gcePersistentDisk) []string { + devicePaths := []string{ + path.Join(diskByIdPath, diskGooglePrefix+pd.pdName), + path.Join(diskByIdPath, diskScsiGooglePrefix+pd.pdName), + } + + if pd.partition != "" { + for i, path := range devicePaths { + devicePaths[i] = path + diskPartitionSuffix + pd.partition + } + } + + return devicePaths +} + +// Checks if the specified path exists +func pathExists(path string) (bool, error) { + _, err := os.Stat(path) + if err == nil { + return true, nil + } else if os.IsNotExist(err) { + return false, nil + } else { + return false, err + } +} + +// Calls "udevadm trigger --action=change" for newly created "/dev/sd*" drives (exist only in after set). +// This is workaround for Issue #7972. Once the underlying issue has been resolved, this may be removed. +func udevadmChangeToNewDrives(sdBeforeSet util.StringSet) error { + sdAfter, err := filepath.Glob(diskSDPattern) + if err != nil { + return fmt.Errorf("Error filepath.Glob(\"%s\"): %v\r\n", diskSDPattern, err) + } + + for _, sd := range sdAfter { + if !sdBeforeSet.Has(sd) { + return udevadmChangeToDrive(sd) + } + } + + return nil +} + +// Calls "udevadm trigger --action=change" on the specified drive. +// drivePath must be the the block device path to trigger on, in the format "/dev/sd*", or a symlink to it. +// This is workaround for Issue #7972. Once the underlying issue has been resolved, this may be removed. +func udevadmChangeToDrive(drivePath string) error { + glog.V(5).Infof("udevadmChangeToDrive: drive=%q", drivePath) + + // Evaluate symlink, if any + drive, err := filepath.EvalSymlinks(drivePath) + if err != nil { + return fmt.Errorf("udevadmChangeToDrive: filepath.EvalSymlinks(%q) failed with %v.", drivePath, err) + } + glog.V(5).Infof("udevadmChangeToDrive: symlink path is %q", drive) + + // Check to make sure input is "/dev/sd*" + if !strings.Contains(drive, diskSDPath) { + return fmt.Errorf("udevadmChangeToDrive: expected input in the form \"%s\" but drive is %q.", diskSDPattern, drive) + } + + // Call "udevadm trigger --action=change --property-match=DEVNAME=/dev/sd..." + _, err = exec.New().Command( + "udevadm", + "trigger", + "--action=change", + fmt.Sprintf("--property-match=DEVNAME=%s", drive)).CombinedOutput() + if err != nil { + return fmt.Errorf("udevadmChangeToDrive: udevadm trigger failed for drive %q with %v.", drive, err) + } return nil }