Merge pull request #11376 from jiangyaoguo/new-builder-cleaner-in-volume

Refactor awsElasticBlockStore to seperate builder and cleaner
This commit is contained in:
Jerzy Szczepkowski 2015-07-31 12:57:43 +02:00
commit ff058a1afe
3 changed files with 115 additions and 74 deletions

View File

@ -94,18 +94,19 @@ func (plugin *awsElasticBlockStorePlugin) newBuilderInternal(spec *volume.Spec,
partition = strconv.Itoa(ebs.Partition) partition = strconv.Itoa(ebs.Partition)
} }
return &awsElasticBlockStore{ return &awsElasticBlockStoreBuilder{
podUID: podUID, awsElasticBlockStore: &awsElasticBlockStore{
volName: spec.Name, podUID: podUID,
volumeID: volumeID, volName: spec.Name,
volumeID: volumeID,
manager: manager,
mounter: mounter,
plugin: plugin,
},
fsType: fsType, fsType: fsType,
partition: partition, partition: partition,
readOnly: readOnly, readOnly: readOnly,
manager: manager, diskMounter: &awsSafeFormatAndMount{mounter, exec.New()}}, nil
mounter: mounter,
diskMounter: &awsSafeFormatAndMount{mounter, exec.New()},
plugin: plugin,
}, nil
} }
func (plugin *awsElasticBlockStorePlugin) NewCleaner(volName string, podUID types.UID, mounter mount.Interface) (volume.Cleaner, error) { func (plugin *awsElasticBlockStorePlugin) NewCleaner(volName string, podUID types.UID, mounter mount.Interface) (volume.Cleaner, error) {
@ -114,22 +115,21 @@ func (plugin *awsElasticBlockStorePlugin) NewCleaner(volName string, podUID type
} }
func (plugin *awsElasticBlockStorePlugin) newCleanerInternal(volName string, podUID types.UID, manager ebsManager, mounter mount.Interface) (volume.Cleaner, error) { func (plugin *awsElasticBlockStorePlugin) newCleanerInternal(volName string, podUID types.UID, manager ebsManager, mounter mount.Interface) (volume.Cleaner, error) {
return &awsElasticBlockStore{ return &awsElasticBlockStoreCleaner{&awsElasticBlockStore{
podUID: podUID, podUID: podUID,
volName: volName, volName: volName,
manager: manager, manager: manager,
mounter: mounter, mounter: mounter,
diskMounter: &awsSafeFormatAndMount{mounter, exec.New()}, plugin: plugin,
plugin: plugin, }}, nil
}, nil
} }
// Abstract interface to PD operations. // Abstract interface to PD operations.
type ebsManager interface { type ebsManager interface {
// Attaches the disk to the kubelet's host machine. // Attaches the disk to the kubelet's host machine.
AttachAndMountDisk(ebs *awsElasticBlockStore, globalPDPath string) error AttachAndMountDisk(b *awsElasticBlockStoreBuilder, globalPDPath string) error
// Detaches the disk from the kubelet's host machine. // Detaches the disk from the kubelet's host machine.
DetachDisk(ebs *awsElasticBlockStore) error DetachDisk(c *awsElasticBlockStoreCleaner) error
} }
// awsElasticBlockStore volumes are disk resources provided by Google Compute Engine // awsElasticBlockStore volumes are disk resources provided by Google Compute Engine
@ -139,23 +139,15 @@ type awsElasticBlockStore struct {
podUID types.UID podUID types.UID
// Unique id of the PD, used to find the disk resource in the provider. // Unique id of the PD, used to find the disk resource in the provider.
volumeID string volumeID string
// Filesystem type, optional.
fsType string
// Specifies the partition to mount
partition string
// Specifies whether the disk will be attached as read-only.
readOnly bool
// Utility interface that provides API calls to the provider to attach/detach disks. // Utility interface that provides API calls to the provider to attach/detach disks.
manager ebsManager manager ebsManager
// Mounter interface that provides system calls to mount the global path to the pod local path. // Mounter interface that provides system calls to mount the global path to the pod local path.
mounter mount.Interface mounter mount.Interface
// diskMounter provides the interface that is used to mount the actual block device. plugin *awsElasticBlockStorePlugin
diskMounter mount.Interface
plugin *awsElasticBlockStorePlugin
} }
func detachDiskLogError(ebs *awsElasticBlockStore) { func detachDiskLogError(ebs *awsElasticBlockStore) {
err := ebs.manager.DetachDisk(ebs) err := ebs.manager.DetachDisk(&awsElasticBlockStoreCleaner{ebs})
if err != nil { if err != nil {
glog.Warningf("Failed to detach disk: %v (%v)", ebs, err) glog.Warningf("Failed to detach disk: %v (%v)", ebs, err)
} }
@ -175,15 +167,29 @@ func (ebs *awsElasticBlockStore) getVolumeProvider() (aws_cloud.Volumes, error)
return volumes, nil return volumes, nil
} }
type awsElasticBlockStoreBuilder struct {
*awsElasticBlockStore
// Filesystem type, optional.
fsType string
// Specifies the partition to mount
partition string
// Specifies whether the disk will be attached as read-only.
readOnly bool
// diskMounter provides the interface that is used to mount the actual block device.
diskMounter mount.Interface
}
var _ volume.Builder = &awsElasticBlockStoreBuilder{}
// SetUp attaches the disk and bind mounts to the volume path. // SetUp attaches the disk and bind mounts to the volume path.
func (ebs *awsElasticBlockStore) SetUp() error { func (b *awsElasticBlockStoreBuilder) SetUp() error {
return ebs.SetUpAt(ebs.GetPath()) return b.SetUpAt(b.GetPath())
} }
// SetUpAt attaches the disk and bind mounts to the volume path. // SetUpAt attaches the disk and bind mounts to the volume path.
func (ebs *awsElasticBlockStore) SetUpAt(dir string) error { func (b *awsElasticBlockStoreBuilder) SetUpAt(dir string) error {
// TODO: handle failed mounts here. // TODO: handle failed mounts here.
mountpoint, err := ebs.mounter.IsMountPoint(dir) mountpoint, err := b.mounter.IsMountPoint(dir)
glog.V(4).Infof("PersistentDisk set up: %s %v %v", dir, mountpoint, err) glog.V(4).Infof("PersistentDisk set up: %s %v %v", dir, mountpoint, err)
if err != nil && !os.IsNotExist(err) { if err != nil && !os.IsNotExist(err) {
return err return err
@ -192,35 +198,35 @@ func (ebs *awsElasticBlockStore) SetUpAt(dir string) error {
return nil return nil
} }
globalPDPath := makeGlobalPDPath(ebs.plugin.host, ebs.volumeID) globalPDPath := makeGlobalPDPath(b.plugin.host, b.volumeID)
if err := ebs.manager.AttachAndMountDisk(ebs, globalPDPath); err != nil { if err := b.manager.AttachAndMountDisk(b, globalPDPath); err != nil {
return err return err
} }
if err := os.MkdirAll(dir, 0750); err != nil { if err := os.MkdirAll(dir, 0750); err != nil {
// TODO: we should really eject the attach/detach out into its own control loop. // TODO: we should really eject the attach/detach out into its own control loop.
detachDiskLogError(ebs) detachDiskLogError(b.awsElasticBlockStore)
return err return err
} }
// Perform a bind mount to the full path to allow duplicate mounts of the same PD. // Perform a bind mount to the full path to allow duplicate mounts of the same PD.
options := []string{"bind"} options := []string{"bind"}
if ebs.readOnly { if b.readOnly {
options = append(options, "ro") options = append(options, "ro")
} }
err = ebs.mounter.Mount(globalPDPath, dir, "", options) err = b.mounter.Mount(globalPDPath, dir, "", options)
if err != nil { if err != nil {
mountpoint, mntErr := ebs.mounter.IsMountPoint(dir) mountpoint, mntErr := b.mounter.IsMountPoint(dir)
if mntErr != nil { if mntErr != nil {
glog.Errorf("isMountpoint check failed: %v", mntErr) glog.Errorf("isMountpoint check failed: %v", mntErr)
return err return err
} }
if mountpoint { if mountpoint {
if mntErr = ebs.mounter.Unmount(dir); mntErr != nil { if mntErr = b.mounter.Unmount(dir); mntErr != nil {
glog.Errorf("Failed to unmount: %v", mntErr) glog.Errorf("Failed to unmount: %v", mntErr)
return err return err
} }
mountpoint, mntErr := ebs.mounter.IsMountPoint(dir) mountpoint, mntErr := b.mounter.IsMountPoint(dir)
if mntErr != nil { if mntErr != nil {
glog.Errorf("isMountpoint check failed: %v", mntErr) glog.Errorf("isMountpoint check failed: %v", mntErr)
return err return err
@ -233,15 +239,15 @@ func (ebs *awsElasticBlockStore) SetUpAt(dir string) error {
} }
os.Remove(dir) os.Remove(dir)
// TODO: we should really eject the attach/detach out into its own control loop. // TODO: we should really eject the attach/detach out into its own control loop.
detachDiskLogError(ebs) detachDiskLogError(b.awsElasticBlockStore)
return err return err
} }
return nil return nil
} }
func (pd *awsElasticBlockStore) IsReadOnly() bool { func (b *awsElasticBlockStoreBuilder) IsReadOnly() bool {
return pd.readOnly return b.readOnly
} }
func makeGlobalPDPath(host volume.VolumeHost, volumeID string) string { func makeGlobalPDPath(host volume.VolumeHost, volumeID string) string {
@ -274,16 +280,22 @@ func (ebs *awsElasticBlockStore) GetPath() string {
return ebs.plugin.host.GetPodVolumeDir(ebs.podUID, util.EscapeQualifiedNameForDisk(name), ebs.volName) return ebs.plugin.host.GetPodVolumeDir(ebs.podUID, util.EscapeQualifiedNameForDisk(name), ebs.volName)
} }
type awsElasticBlockStoreCleaner struct {
*awsElasticBlockStore
}
var _ volume.Cleaner = &awsElasticBlockStoreCleaner{}
// Unmounts the bind mount, and detaches the disk only if the PD // Unmounts the bind mount, and detaches the disk only if the PD
// resource was the last reference to that disk on the kubelet. // resource was the last reference to that disk on the kubelet.
func (ebs *awsElasticBlockStore) TearDown() error { func (c *awsElasticBlockStoreCleaner) TearDown() error {
return ebs.TearDownAt(ebs.GetPath()) return c.TearDownAt(c.GetPath())
} }
// Unmounts the bind mount, and detaches the disk only if the PD // Unmounts the bind mount, and detaches the disk only if the PD
// resource was the last reference to that disk on the kubelet. // resource was the last reference to that disk on the kubelet.
func (ebs *awsElasticBlockStore) TearDownAt(dir string) error { func (c *awsElasticBlockStoreCleaner) TearDownAt(dir string) error {
mountpoint, err := ebs.mounter.IsMountPoint(dir) mountpoint, err := c.mounter.IsMountPoint(dir)
if err != nil { if err != nil {
glog.V(2).Info("Error checking if mountpoint ", dir, ": ", err) glog.V(2).Info("Error checking if mountpoint ", dir, ": ", err)
return err return err
@ -293,7 +305,7 @@ func (ebs *awsElasticBlockStore) TearDownAt(dir string) error {
return os.Remove(dir) return os.Remove(dir)
} }
refs, err := mount.GetMountRefs(ebs.mounter, dir) refs, err := mount.GetMountRefs(c.mounter, dir)
if err != nil { if err != nil {
glog.V(2).Info("Error getting mountrefs for ", dir, ": ", err) glog.V(2).Info("Error getting mountrefs for ", dir, ": ", err)
return err return err
@ -302,27 +314,27 @@ func (ebs *awsElasticBlockStore) TearDownAt(dir string) error {
glog.Warning("Did not find pod-mount for ", dir, " during tear-down") glog.Warning("Did not find pod-mount for ", dir, " during tear-down")
} }
// Unmount the bind-mount inside this pod // Unmount the bind-mount inside this pod
if err := ebs.mounter.Unmount(dir); err != nil { if err := c.mounter.Unmount(dir); err != nil {
glog.V(2).Info("Error unmounting dir ", dir, ": ", err) glog.V(2).Info("Error unmounting dir ", dir, ": ", err)
return err return err
} }
// If len(refs) is 1, then all bind mounts have been removed, and the // If len(refs) is 1, then all bind mounts have been removed, and the
// remaining reference is the global mount. It is safe to detach. // remaining reference is the global mount. It is safe to detach.
if len(refs) == 1 { if len(refs) == 1 {
// ebs.volumeID is not initially set for volume-cleaners, so set it here. // c.volumeID is not initially set for volume-cleaners, so set it here.
ebs.volumeID, err = getVolumeIDFromGlobalMount(ebs.plugin.host, refs[0]) c.volumeID, err = getVolumeIDFromGlobalMount(c.plugin.host, refs[0])
if err != nil { if err != nil {
glog.V(2).Info("Could not determine volumeID from mountpoint ", refs[0], ": ", err) glog.V(2).Info("Could not determine volumeID from mountpoint ", refs[0], ": ", err)
return err return err
} }
if err := ebs.manager.DetachDisk(ebs); err != nil { if err := c.manager.DetachDisk(&awsElasticBlockStoreCleaner{c.awsElasticBlockStore}); err != nil {
glog.V(2).Info("Error detaching disk ", ebs.volumeID, ": ", err) glog.V(2).Info("Error detaching disk ", c.volumeID, ": ", err)
return err return err
} }
} else { } else {
glog.V(2).Infof("Found multiple refs; won't detach EBS volume: %v", refs) glog.V(2).Infof("Found multiple refs; won't detach EBS volume: %v", refs)
} }
mountpoint, mntErr := ebs.mounter.IsMountPoint(dir) mountpoint, mntErr := c.mounter.IsMountPoint(dir)
if mntErr != nil { if mntErr != nil {
glog.Errorf("isMountpoint check failed: %v", mntErr) glog.Errorf("isMountpoint check failed: %v", mntErr)
return err return err

View File

@ -76,8 +76,8 @@ type fakePDManager struct{}
// TODO(jonesdl) To fully test this, we could create a loopback device // TODO(jonesdl) To fully test this, we could create a loopback device
// and mount that instead. // and mount that instead.
func (fake *fakePDManager) AttachAndMountDisk(pd *awsElasticBlockStore, globalPDPath string) error { func (fake *fakePDManager) AttachAndMountDisk(b *awsElasticBlockStoreBuilder, globalPDPath string) error {
globalPath := makeGlobalPDPath(pd.plugin.host, pd.volumeID) globalPath := makeGlobalPDPath(b.plugin.host, b.volumeID)
err := os.MkdirAll(globalPath, 0750) err := os.MkdirAll(globalPath, 0750)
if err != nil { if err != nil {
return err return err
@ -85,8 +85,8 @@ func (fake *fakePDManager) AttachAndMountDisk(pd *awsElasticBlockStore, globalPD
return nil return nil
} }
func (fake *fakePDManager) DetachDisk(pd *awsElasticBlockStore) error { func (fake *fakePDManager) DetachDisk(c *awsElasticBlockStoreCleaner) error {
globalPath := makeGlobalPDPath(pd.plugin.host, pd.volumeID) globalPath := makeGlobalPDPath(c.plugin.host, c.volumeID)
err := os.RemoveAll(globalPath) err := os.RemoveAll(globalPath)
if err != nil { if err != nil {
return err return err
@ -206,3 +206,32 @@ func TestPersistentClaimReadOnlyFlag(t *testing.T) {
t.Errorf("Expected true for builder.IsReadOnly") t.Errorf("Expected true for builder.IsReadOnly")
} }
} }
func TestBuilderAndCleanerTypeAssert(t *testing.T) {
plugMgr := volume.VolumePluginMgr{}
plugMgr.InitPlugins(ProbeVolumePlugins(), volume.NewFakeVolumeHost("/tmp/fake", nil, nil))
plug, err := plugMgr.FindPluginByName("kubernetes.io/aws-ebs")
if err != nil {
t.Errorf("Can't find the plugin by name")
}
spec := &api.Volume{
Name: "vol1",
VolumeSource: api.VolumeSource{
AWSElasticBlockStore: &api.AWSElasticBlockStoreVolumeSource{
VolumeID: "pd",
FSType: "ext4",
},
},
}
builder, err := plug.(*awsElasticBlockStorePlugin).newBuilderInternal(volume.NewSpecFromVolume(spec), types.UID("poduid"), &fakePDManager{}, &mount.FakeMounter{})
if _, ok := builder.(volume.Cleaner); ok {
t.Errorf("Volume Builder can be type-assert to Cleaner")
}
cleaner, err := plug.(*awsElasticBlockStorePlugin).newCleanerInternal("vol1", types.UID("poduid"), &fakePDManager{}, &mount.FakeMounter{})
if _, ok := cleaner.(volume.Builder); ok {
t.Errorf("Volume Cleaner can be type-assert to Builder")
}
}

View File

@ -31,17 +31,17 @@ type AWSDiskUtil struct{}
// Attaches a disk specified by a volume.AWSElasticBlockStore to the current kubelet. // Attaches a disk specified by a volume.AWSElasticBlockStore to the current kubelet.
// Mounts the disk to it's global path. // Mounts the disk to it's global path.
func (util *AWSDiskUtil) AttachAndMountDisk(pd *awsElasticBlockStore, globalPDPath string) error { func (util *AWSDiskUtil) AttachAndMountDisk(b *awsElasticBlockStoreBuilder, globalPDPath string) error {
volumes, err := pd.getVolumeProvider() volumes, err := b.getVolumeProvider()
if err != nil { if err != nil {
return err return err
} }
devicePath, err := volumes.AttachDisk("", pd.volumeID, pd.readOnly) devicePath, err := volumes.AttachDisk("", b.volumeID, b.readOnly)
if err != nil { if err != nil {
return err return err
} }
if pd.partition != "" { if b.partition != "" {
devicePath = devicePath + pd.partition devicePath = devicePath + b.partition
} }
//TODO(jonesdl) There should probably be better method than busy-waiting here. //TODO(jonesdl) There should probably be better method than busy-waiting here.
numTries := 0 numTries := 0
@ -61,7 +61,7 @@ func (util *AWSDiskUtil) AttachAndMountDisk(pd *awsElasticBlockStore, globalPDPa
} }
// Only mount the PD globally once. // Only mount the PD globally once.
mountpoint, err := pd.mounter.IsMountPoint(globalPDPath) mountpoint, err := b.mounter.IsMountPoint(globalPDPath)
if err != nil { if err != nil {
if os.IsNotExist(err) { if os.IsNotExist(err) {
if err := os.MkdirAll(globalPDPath, 0750); err != nil { if err := os.MkdirAll(globalPDPath, 0750); err != nil {
@ -73,11 +73,11 @@ func (util *AWSDiskUtil) AttachAndMountDisk(pd *awsElasticBlockStore, globalPDPa
} }
} }
options := []string{} options := []string{}
if pd.readOnly { if b.readOnly {
options = append(options, "ro") options = append(options, "ro")
} }
if !mountpoint { if !mountpoint {
err = pd.diskMounter.Mount(devicePath, globalPDPath, pd.fsType, options) err = b.diskMounter.Mount(devicePath, globalPDPath, b.fsType, options)
if err != nil { if err != nil {
os.Remove(globalPDPath) os.Remove(globalPDPath)
return err return err
@ -87,10 +87,10 @@ func (util *AWSDiskUtil) AttachAndMountDisk(pd *awsElasticBlockStore, globalPDPa
} }
// Unmounts the device and detaches the disk from the kubelet's host machine. // Unmounts the device and detaches the disk from the kubelet's host machine.
func (util *AWSDiskUtil) DetachDisk(pd *awsElasticBlockStore) error { func (util *AWSDiskUtil) DetachDisk(c *awsElasticBlockStoreCleaner) error {
// Unmount the global PD mount, which should be the only one. // Unmount the global PD mount, which should be the only one.
globalPDPath := makeGlobalPDPath(pd.plugin.host, pd.volumeID) globalPDPath := makeGlobalPDPath(c.plugin.host, c.volumeID)
if err := pd.mounter.Unmount(globalPDPath); err != nil { if err := c.mounter.Unmount(globalPDPath); err != nil {
glog.V(2).Info("Error unmount dir ", globalPDPath, ": ", err) glog.V(2).Info("Error unmount dir ", globalPDPath, ": ", err)
return err return err
} }
@ -99,13 +99,13 @@ func (util *AWSDiskUtil) DetachDisk(pd *awsElasticBlockStore) error {
return err return err
} }
// Detach the disk // Detach the disk
volumes, err := pd.getVolumeProvider() volumes, err := c.getVolumeProvider()
if err != nil { if err != nil {
glog.V(2).Info("Error getting volume provider for volumeID ", pd.volumeID, ": ", err) glog.V(2).Info("Error getting volume provider for volumeID ", c.volumeID, ": ", err)
return err return err
} }
if err := volumes.DetachDisk("", pd.volumeID); err != nil { if err := volumes.DetachDisk("", c.volumeID); err != nil {
glog.V(2).Info("Error detaching disk ", pd.volumeID, ": ", err) glog.V(2).Info("Error detaching disk ", c.volumeID, ": ", err)
return err return err
} }
return nil return nil