Merge pull request #10169 from saad-ali/fixPDIssue2

Work around for PDs stop mounting after a few hours issue
This commit is contained in:
Zach Loafman 2015-06-30 15:47:21 -07:00
commit 7df8d76a93
3 changed files with 439 additions and 35 deletions

View File

@ -0,0 +1,92 @@
/*
Copyright 2015 The Kubernetes Authors All rights reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package operationmanager
import (
"fmt"
"sync"
)
// Operation Manager is a thread-safe interface for keeping track of multiple pending async operations.
type OperationManager interface {
// Called when the operation with the given ID has started.
// Creates a new channel with specified buffer size tracked with the specified ID.
// Returns a read-only version of the newly created channel.
// Returns an error if an entry with the specified ID already exists (previous entry must be removed by calling Close).
Start(id string, bufferSize uint) (<-chan interface{}, error)
// Called when the operation with the given ID has terminated.
// Closes and removes the channel associated with ID.
// Returns an error if no associated channel exists.
Close(id string) error
// Attempts to send msg to the channel associated with ID.
// Returns an error if no associated channel exists.
Send(id string, msg interface{}) error
}
// Returns a new instance of a channel manager.
func NewOperationManager() OperationManager {
return &operationManager{
chanMap: make(map[string]chan interface{}),
}
}
type operationManager struct {
sync.RWMutex
chanMap map[string]chan interface{}
}
// Called when the operation with the given ID has started.
// Creates a new channel with specified buffer size tracked with the specified ID.
// Returns a read-only version of the newly created channel.
// Returns an error if an entry with the specified ID already exists (previous entry must be removed by calling Close).
func (cm *operationManager) Start(id string, bufferSize uint) (<-chan interface{}, error) {
cm.Lock()
defer cm.Unlock()
if _, exists := cm.chanMap[id]; exists {
return nil, fmt.Errorf("id %q already exists", id)
}
cm.chanMap[id] = make(chan interface{}, bufferSize)
return cm.chanMap[id], nil
}
// Called when the operation with the given ID has terminated.
// Closes and removes the channel associated with ID.
// Returns an error if no associated channel exists.
func (cm *operationManager) Close(id string) error {
cm.Lock()
defer cm.Unlock()
if _, exists := cm.chanMap[id]; !exists {
return fmt.Errorf("id %q not found", id)
}
close(cm.chanMap[id])
delete(cm.chanMap, id)
return nil
}
// Attempts to send msg to the channel associated with ID.
// Returns an error if no associated channel exists.
func (cm *operationManager) Send(id string, msg interface{}) error {
cm.RLock()
defer cm.RUnlock()
if _, exists := cm.chanMap[id]; !exists {
return fmt.Errorf("id %q not found", id)
}
cm.chanMap[id] <- msg
return nil
}

View File

@ -0,0 +1,139 @@
/*
Copyright 2015 The Kubernetes Authors All rights reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
// Channel Manager keeps track of multiple channels
package operationmanager
import (
"testing"
)
func TestStart(t *testing.T) {
// Arrange
cm := NewOperationManager()
chanId := "testChanId"
testMsg := "test message"
// Act
ch, startErr := cm.Start(chanId, 1 /* bufferSize */)
sigErr := cm.Send(chanId, testMsg)
// Assert
if startErr != nil {
t.Fatalf("Unexpected error on Start. Expected: <no error> Actual: <%v>", startErr)
}
if sigErr != nil {
t.Fatalf("Unexpected error on Send. Expected: <no error> Actual: <%v>", sigErr)
}
if actual := <-ch; actual != testMsg {
t.Fatalf("Unexpected testMsg value. Expected: <%v> Actual: <%v>", testMsg, actual)
}
}
func TestStartIdExists(t *testing.T) {
// Arrange
cm := NewOperationManager()
chanId := "testChanId"
// Act
_, startErr1 := cm.Start(chanId, 1 /* bufferSize */)
_, startErr2 := cm.Start(chanId, 1 /* bufferSize */)
// Assert
if startErr1 != nil {
t.Fatalf("Unexpected error on Start1. Expected: <no error> Actual: <%v>", startErr1)
}
if startErr2 == nil {
t.Fatalf("Expected error on Start2. Expected: <id already exists error> Actual: <no error>")
}
}
func TestStartAndAdd2Chans(t *testing.T) {
// Arrange
cm := NewOperationManager()
chanId1 := "testChanId1"
chanId2 := "testChanId2"
testMsg1 := "test message 1"
testMsg2 := "test message 2"
// Act
ch1, startErr1 := cm.Start(chanId1, 1 /* bufferSize */)
ch2, startErr2 := cm.Start(chanId2, 1 /* bufferSize */)
sigErr1 := cm.Send(chanId1, testMsg1)
sigErr2 := cm.Send(chanId2, testMsg2)
// Assert
if startErr1 != nil {
t.Fatalf("Unexpected error on Start1. Expected: <no error> Actual: <%v>", startErr1)
}
if startErr2 != nil {
t.Fatalf("Unexpected error on Start2. Expected: <no error> Actual: <%v>", startErr2)
}
if sigErr1 != nil {
t.Fatalf("Unexpected error on Send1. Expected: <no error> Actual: <%v>", sigErr1)
}
if sigErr2 != nil {
t.Fatalf("Unexpected error on Send2. Expected: <no error> Actual: <%v>", sigErr2)
}
if actual := <-ch1; actual != testMsg1 {
t.Fatalf("Unexpected testMsg value. Expected: <%v> Actual: <%v>", testMsg1, actual)
}
if actual := <-ch2; actual != testMsg2 {
t.Fatalf("Unexpected testMsg value. Expected: <%v> Actual: <%v>", testMsg2, actual)
}
}
func TestStartAndAdd2ChansAndClose(t *testing.T) {
// Arrange
cm := NewOperationManager()
chanId1 := "testChanId1"
chanId2 := "testChanId2"
testMsg1 := "test message 1"
testMsg2 := "test message 2"
// Act
ch1, startErr1 := cm.Start(chanId1, 1 /* bufferSize */)
ch2, startErr2 := cm.Start(chanId2, 1 /* bufferSize */)
sigErr1 := cm.Send(chanId1, testMsg1)
sigErr2 := cm.Send(chanId2, testMsg2)
cm.Close(chanId1)
sigErr3 := cm.Send(chanId1, testMsg1)
// Assert
if startErr1 != nil {
t.Fatalf("Unexpected error on Start1. Expected: <no error> Actual: <%v>", startErr1)
}
if startErr2 != nil {
t.Fatalf("Unexpected error on Start2. Expected: <no error> Actual: <%v>", startErr2)
}
if sigErr1 != nil {
t.Fatalf("Unexpected error on Send1. Expected: <no error> Actual: <%v>", sigErr1)
}
if sigErr2 != nil {
t.Fatalf("Unexpected error on Send2. Expected: <no error> Actual: <%v>", sigErr2)
}
if sigErr3 == nil {
t.Fatalf("Expected error on Send3. Expected: <error> Actual: <no error>", sigErr2)
}
if actual := <-ch1; actual != testMsg1 {
t.Fatalf("Unexpected testMsg value. Expected: <%v> Actual: <%v>", testMsg1, actual)
}
if actual := <-ch2; actual != testMsg2 {
t.Fatalf("Unexpected testMsg value. Expected: <%v> Actual: <%v>", testMsg2, actual)
}
}

View File

@ -17,65 +17,63 @@ limitations under the License.
package gce_pd
import (
"errors"
"fmt"
"os"
"path"
"path/filepath"
"strings"
"time"
"github.com/GoogleCloudPlatform/kubernetes/pkg/cloudprovider"
"github.com/GoogleCloudPlatform/kubernetes/pkg/cloudprovider/gce"
"github.com/GoogleCloudPlatform/kubernetes/pkg/util"
"github.com/GoogleCloudPlatform/kubernetes/pkg/util/exec"
"github.com/GoogleCloudPlatform/kubernetes/pkg/util/mount"
"github.com/GoogleCloudPlatform/kubernetes/pkg/util/operationmanager"
"github.com/golang/glog"
)
const (
diskByIdPath = "/dev/disk/by-id/"
diskGooglePrefix = "google-"
diskScsiGooglePrefix = "scsi-0Google_PersistentDisk_"
diskPartitionSuffix = "-part"
diskSDPath = "/dev/sd"
diskSDPattern = "/dev/sd*"
maxChecks = 10
maxRetries = 10
checkSleepDuration = time.Second
)
// Singleton operation manager for managing detach clean up go routines
var detachCleanupManager = operationmanager.NewOperationManager()
type GCEDiskUtil struct{}
// Attaches a disk specified by a volume.GCEPersistentDisk to the current kubelet.
// Mounts the disk to it's global path.
func (util *GCEDiskUtil) AttachAndMountDisk(pd *gcePersistentDisk, globalPDPath string) error {
func (diskUtil *GCEDiskUtil) AttachAndMountDisk(pd *gcePersistentDisk, globalPDPath string) error {
glog.V(5).Infof("AttachAndMountDisk(pd, %q) where pd is %#v\r\n", globalPDPath, pd)
// Terminate any in progress verify detach go routines, this will block until the goroutine is ready to exit because the channel is unbuffered
detachCleanupManager.Send(pd.pdName, true)
sdBefore, err := filepath.Glob(diskSDPattern)
if err != nil {
glog.Errorf("Error filepath.Glob(\"%s\"): %v\r\n", diskSDPattern, err)
}
sdBeforeSet := util.NewStringSet(sdBefore...)
gce, err := cloudprovider.GetCloudProvider("gce", nil)
if err != nil {
return err
}
if err := gce.(*gce_cloud.GCECloud).AttachDisk(pd.pdName, pd.readOnly); err != nil {
return err
}
devicePaths := []string{
path.Join("/dev/disk/by-id/", "google-"+pd.pdName),
path.Join("/dev/disk/by-id/", "scsi-0Google_PersistentDisk_"+pd.pdName),
}
if pd.partition != "" {
for i, path := range devicePaths {
devicePaths[i] = path + "-part" + pd.partition
}
}
//TODO(jonesdl) There should probably be better method than busy-waiting here.
numTries := 0
devicePath := ""
// Wait for the disk device to be created
for {
for _, path := range devicePaths {
_, err := os.Stat(path)
if err == nil {
devicePath = path
break
}
if err != nil && !os.IsNotExist(err) {
return err
}
}
if devicePath != "" {
break
}
numTries++
if numTries == 10 {
return errors.New("Could not attach disk: Timeout after 10s")
}
time.Sleep(time.Second)
devicePath, err := verifyAttached(pd, sdBeforeSet, gce)
if err != nil {
return err
}
// Only mount the PD globally once.
@ -108,6 +106,11 @@ func (util *GCEDiskUtil) AttachAndMountDisk(pd *gcePersistentDisk, globalPDPath
func (util *GCEDiskUtil) DetachDisk(pd *gcePersistentDisk) error {
// Unmount the global PD mount, which should be the only one.
globalPDPath := makeGlobalPDName(pd.plugin.host, pd.pdName)
glog.V(5).Infof("DetachDisk(pd) where pd is %#v and the globalPDPath is %q\r\n", pd, globalPDPath)
// Terminate any in progress verify detach go routines, this will block until the goroutine is ready to exit because the channel is unbuffered
detachCleanupManager.Send(pd.pdName, true)
if err := pd.mounter.Unmount(globalPDPath); err != nil {
return err
}
@ -122,6 +125,176 @@ func (util *GCEDiskUtil) DetachDisk(pd *gcePersistentDisk) error {
if err := gce.(*gce_cloud.GCECloud).DetachDisk(pd.pdName); err != nil {
return err
}
// Verify disk detached, retry if needed.
go verifyDetached(pd, gce)
return nil
}
// Verifys the disk device to be created has been succesffully attached, and retries if it fails.
func verifyAttached(pd *gcePersistentDisk, sdBeforeSet util.StringSet, gce cloudprovider.Interface) (string, error) {
devicePaths := getDiskByIdPaths(pd)
for numRetries := 0; numRetries < maxRetries; numRetries++ {
for numChecks := 0; numChecks < maxChecks; numChecks++ {
if err := udevadmChangeToNewDrives(sdBeforeSet); err != nil {
// udevadm errors should not block disk attachment, log and continue
glog.Errorf("%v", err)
}
for _, path := range devicePaths {
if pathExists, err := pathExists(path); err != nil {
return "", err
} else if pathExists {
// A device path has succesfully been created for the PD
glog.V(5).Infof("Succesfully attached GCE PD %q.", pd.pdName)
return path, nil
}
}
// Sleep then check again
glog.V(5).Infof("Waiting for GCE PD %q to attach.", pd.pdName)
time.Sleep(checkSleepDuration)
}
// Try attaching the disk again
glog.Warningf("Timed out waiting for GCE PD %q to attach. Retrying attach.", pd.pdName)
if err := gce.(*gce_cloud.GCECloud).AttachDisk(pd.pdName, pd.readOnly); err != nil {
return "", err
}
}
return "", fmt.Errorf("Could not attach GCE PD %q. Timeout waiting for mount paths to be created.", pd.pdName)
}
// Veify the specified persistent disk device has been succesfully detached, and retries if it fails.
// This function is intended to be called asynchronously as a go routine.
func verifyDetached(pd *gcePersistentDisk, gce cloudprovider.Interface) {
defer util.HandleCrash()
// Setting bufferSize to 0 so that when senders send, they are blocked until we recieve. This avoids the need to have a separate exit check.
ch, err := detachCleanupManager.Start(pd.pdName, 0 /* bufferSize */)
if err != nil {
glog.Errorf("Error adding %q to detachCleanupManager: %v", pd.pdName, err)
return
}
defer detachCleanupManager.Close(pd.pdName)
devicePaths := getDiskByIdPaths(pd)
for numRetries := 0; numRetries < maxRetries; numRetries++ {
for numChecks := 0; numChecks < maxChecks; numChecks++ {
select {
case <-ch:
glog.Warningf("Terminating GCE PD %q detach verification. Another attach/detach call was made for this PD.", pd.pdName)
return
default:
allPathsRemoved := true
for _, path := range devicePaths {
if err := udevadmChangeToDrive(path); err != nil {
// udevadm errors should not block disk detachment, log and continue
glog.Errorf("%v", err)
}
if exists, err := pathExists(path); err != nil {
glog.Errorf("Error check path: %v", err)
return
} else {
allPathsRemoved = allPathsRemoved && !exists
}
}
if allPathsRemoved {
// All paths to the PD have been succefully removed
glog.V(5).Infof("Succesfully detached GCE PD %q.", pd.pdName)
return
}
// Sleep then check again
glog.V(5).Infof("Waiting for GCE PD %q to detach.", pd.pdName)
time.Sleep(checkSleepDuration)
}
}
// Try detaching disk again
glog.Warningf("Timed out waiting for GCE PD %q to detach. Retrying detach.", pd.pdName)
if err := gce.(*gce_cloud.GCECloud).DetachDisk(pd.pdName); err != nil {
glog.Errorf("Error on retry detach PD %q: %v", pd.pdName, err)
return
}
}
glog.Errorf("Could not detach GCE PD %q. One or more mount paths was not removed.", pd.pdName)
}
// Returns list of all /dev/disk/by-id/* paths for given PD.
func getDiskByIdPaths(pd *gcePersistentDisk) []string {
devicePaths := []string{
path.Join(diskByIdPath, diskGooglePrefix+pd.pdName),
path.Join(diskByIdPath, diskScsiGooglePrefix+pd.pdName),
}
if pd.partition != "" {
for i, path := range devicePaths {
devicePaths[i] = path + diskPartitionSuffix + pd.partition
}
}
return devicePaths
}
// Checks if the specified path exists
func pathExists(path string) (bool, error) {
_, err := os.Stat(path)
if err == nil {
return true, nil
} else if os.IsNotExist(err) {
return false, nil
} else {
return false, err
}
}
// Calls "udevadm trigger --action=change" for newly created "/dev/sd*" drives (exist only in after set).
// This is workaround for Issue #7972. Once the underlying issue has been resolved, this may be removed.
func udevadmChangeToNewDrives(sdBeforeSet util.StringSet) error {
sdAfter, err := filepath.Glob(diskSDPattern)
if err != nil {
return fmt.Errorf("Error filepath.Glob(\"%s\"): %v\r\n", diskSDPattern, err)
}
for _, sd := range sdAfter {
if !sdBeforeSet.Has(sd) {
return udevadmChangeToDrive(sd)
}
}
return nil
}
// Calls "udevadm trigger --action=change" on the specified drive.
// drivePath must be the the block device path to trigger on, in the format "/dev/sd*", or a symlink to it.
// This is workaround for Issue #7972. Once the underlying issue has been resolved, this may be removed.
func udevadmChangeToDrive(drivePath string) error {
glog.V(5).Infof("udevadmChangeToDrive: drive=%q", drivePath)
// Evaluate symlink, if any
drive, err := filepath.EvalSymlinks(drivePath)
if err != nil {
return fmt.Errorf("udevadmChangeToDrive: filepath.EvalSymlinks(%q) failed with %v.", drivePath, err)
}
glog.V(5).Infof("udevadmChangeToDrive: symlink path is %q", drive)
// Check to make sure input is "/dev/sd*"
if !strings.Contains(drive, diskSDPath) {
return fmt.Errorf("udevadmChangeToDrive: expected input in the form \"%s\" but drive is %q.", diskSDPattern, drive)
}
// Call "udevadm trigger --action=change --property-match=DEVNAME=/dev/sd..."
_, err = exec.New().Command(
"udevadm",
"trigger",
"--action=change",
fmt.Sprintf("--property-match=DEVNAME=%s", drive)).CombinedOutput()
if err != nil {
return fmt.Errorf("udevadmChangeToDrive: udevadm trigger failed for drive %q with %v.", drive, err)
}
return nil
}