mirror of
https://github.com/k3s-io/kubernetes.git
synced 2025-07-25 04:33:26 +00:00
Merge pull request #56533 from vladimirvivien/csi-featuregate-fix
Automatic merge from submit-queue. If you want to cherry-pick this change to another branch, please follow the instructions <a href="https://github.com/kubernetes/community/blob/master/contributors/devel/cherry-picks.md">here</a>. CSI - Fix feature gate bug and add bootstrap RBAC rules **What this PR does / why we need it**: This PR addresses show-stopper bug https://github.com/kubernetes/kubernetes/issues/56532. It fixes the faulty feature gate logic and adds RBAC rules for kube-controller-manager and kubelet that allows `VolumeAttachment` API operations against the api-server. **Which issue(s) this PR fixes** *(optional, in `fixes #<issue number>(, fixes #<issue_number>, ...)` format, will close the issue(s) when PR gets merged)*: Fixes #56532, #56667 **Special notes for your reviewer**: **Release note**: ```release-note NONE ```
This commit is contained in:
commit
050956b08e
@ -77,7 +77,7 @@ func ProbeAttachableVolumePlugins() []volume.VolumePlugin {
|
|||||||
allPlugins = append(allPlugins, fc.ProbeVolumePlugins()...)
|
allPlugins = append(allPlugins, fc.ProbeVolumePlugins()...)
|
||||||
allPlugins = append(allPlugins, iscsi.ProbeVolumePlugins()...)
|
allPlugins = append(allPlugins, iscsi.ProbeVolumePlugins()...)
|
||||||
allPlugins = append(allPlugins, rbd.ProbeVolumePlugins()...)
|
allPlugins = append(allPlugins, rbd.ProbeVolumePlugins()...)
|
||||||
if !utilfeature.DefaultFeatureGate.Enabled(features.CSIPersistentVolume) {
|
if utilfeature.DefaultFeatureGate.Enabled(features.CSIPersistentVolume) {
|
||||||
allPlugins = append(allPlugins, csi.ProbeVolumePlugins()...)
|
allPlugins = append(allPlugins, csi.ProbeVolumePlugins()...)
|
||||||
}
|
}
|
||||||
return allPlugins
|
return allPlugins
|
||||||
|
@ -100,7 +100,7 @@ func ProbeVolumePlugins() []volume.VolumePlugin {
|
|||||||
allPlugins = append(allPlugins, scaleio.ProbeVolumePlugins()...)
|
allPlugins = append(allPlugins, scaleio.ProbeVolumePlugins()...)
|
||||||
allPlugins = append(allPlugins, local.ProbeVolumePlugins()...)
|
allPlugins = append(allPlugins, local.ProbeVolumePlugins()...)
|
||||||
allPlugins = append(allPlugins, storageos.ProbeVolumePlugins()...)
|
allPlugins = append(allPlugins, storageos.ProbeVolumePlugins()...)
|
||||||
if !utilfeature.DefaultFeatureGate.Enabled(features.CSIPersistentVolume) {
|
if utilfeature.DefaultFeatureGate.Enabled(features.CSIPersistentVolume) {
|
||||||
allPlugins = append(allPlugins, csi.ProbeVolumePlugins()...)
|
allPlugins = append(allPlugins, csi.ProbeVolumePlugins()...)
|
||||||
}
|
}
|
||||||
return allPlugins
|
return allPlugins
|
||||||
|
@ -603,3 +603,7 @@ func (adc *attachDetachController) addNodeToDswp(node *v1.Node, nodeName types.N
|
|||||||
func (adc *attachDetachController) GetNodeLabels() (map[string]string, error) {
|
func (adc *attachDetachController) GetNodeLabels() (map[string]string, error) {
|
||||||
return nil, fmt.Errorf("GetNodeLabels() unsupported in Attach/Detach controller")
|
return nil, fmt.Errorf("GetNodeLabels() unsupported in Attach/Detach controller")
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (adc *attachDetachController) GetNodeName() types.NodeName {
|
||||||
|
return ""
|
||||||
|
}
|
||||||
|
@ -277,3 +277,7 @@ func (expc *expandController) GetConfigMapFunc() func(namespace, name string) (*
|
|||||||
func (expc *expandController) GetNodeLabels() (map[string]string, error) {
|
func (expc *expandController) GetNodeLabels() (map[string]string, error) {
|
||||||
return nil, fmt.Errorf("GetNodeLabels unsupported in expandController")
|
return nil, fmt.Errorf("GetNodeLabels unsupported in expandController")
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (expc *expandController) GetNodeName() types.NodeName {
|
||||||
|
return ""
|
||||||
|
}
|
||||||
|
@ -108,3 +108,7 @@ func (adc *PersistentVolumeController) GetExec(pluginName string) mount.Exec {
|
|||||||
func (ctrl *PersistentVolumeController) GetNodeLabels() (map[string]string, error) {
|
func (ctrl *PersistentVolumeController) GetNodeLabels() (map[string]string, error) {
|
||||||
return nil, fmt.Errorf("GetNodeLabels() unsupported in PersistentVolumeController")
|
return nil, fmt.Errorf("GetNodeLabels() unsupported in PersistentVolumeController")
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (ctrl *PersistentVolumeController) GetNodeName() types.NodeName {
|
||||||
|
return ""
|
||||||
|
}
|
||||||
|
@ -188,6 +188,10 @@ func (kvh *kubeletVolumeHost) GetNodeLabels() (map[string]string, error) {
|
|||||||
return node.Labels, nil
|
return node.Labels, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (kvh *kubeletVolumeHost) GetNodeName() types.NodeName {
|
||||||
|
return kvh.kubelet.nodeName
|
||||||
|
}
|
||||||
|
|
||||||
func (kvh *kubeletVolumeHost) GetExec(pluginName string) mount.Exec {
|
func (kvh *kubeletVolumeHost) GetExec(pluginName string) mount.Exec {
|
||||||
exec, err := kvh.getMountExec(pluginName)
|
exec, err := kvh.getMountExec(pluginName)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
@ -52,18 +52,19 @@ func (c *csiAttacher) Attach(spec *volume.Spec, nodeName types.NodeName) (string
|
|||||||
csiSource, err := getCSISourceFromSpec(spec)
|
csiSource, err := getCSISourceFromSpec(spec)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
glog.Error(log("attacher.Attach failed to get CSI persistent source: %v", err))
|
glog.Error(log("attacher.Attach failed to get CSI persistent source: %v", err))
|
||||||
return "", errors.New("missing CSI persistent volume")
|
return "", err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
node := string(nodeName)
|
||||||
pvName := spec.PersistentVolume.GetName()
|
pvName := spec.PersistentVolume.GetName()
|
||||||
attachID := getAttachmentName(csiSource.VolumeHandle, string(nodeName))
|
attachID := getAttachmentName(csiSource.VolumeHandle, csiSource.Driver, node)
|
||||||
|
|
||||||
attachment := &storage.VolumeAttachment{
|
attachment := &storage.VolumeAttachment{
|
||||||
ObjectMeta: meta.ObjectMeta{
|
ObjectMeta: meta.ObjectMeta{
|
||||||
Name: attachID,
|
Name: attachID,
|
||||||
},
|
},
|
||||||
Spec: storage.VolumeAttachmentSpec{
|
Spec: storage.VolumeAttachmentSpec{
|
||||||
NodeName: string(nodeName),
|
NodeName: node,
|
||||||
Attacher: csiPluginName,
|
Attacher: csiPluginName,
|
||||||
Source: storage.VolumeAttachmentSource{
|
Source: storage.VolumeAttachmentSource{
|
||||||
PersistentVolumeName: &pvName,
|
PersistentVolumeName: &pvName,
|
||||||
@ -72,7 +73,7 @@ func (c *csiAttacher) Attach(spec *volume.Spec, nodeName types.NodeName) (string
|
|||||||
Status: storage.VolumeAttachmentStatus{Attached: false},
|
Status: storage.VolumeAttachmentStatus{Attached: false},
|
||||||
}
|
}
|
||||||
|
|
||||||
attach, err := c.k8s.StorageV1alpha1().VolumeAttachments().Create(attachment)
|
_, err = c.k8s.StorageV1alpha1().VolumeAttachments().Create(attachment)
|
||||||
alreadyExist := false
|
alreadyExist := false
|
||||||
if err != nil {
|
if err != nil {
|
||||||
if !apierrs.IsAlreadyExists(err) {
|
if !apierrs.IsAlreadyExists(err) {
|
||||||
@ -83,19 +84,23 @@ func (c *csiAttacher) Attach(spec *volume.Spec, nodeName types.NodeName) (string
|
|||||||
}
|
}
|
||||||
|
|
||||||
if alreadyExist {
|
if alreadyExist {
|
||||||
glog.V(4).Info(log("attachment [%v] for volume [%v] already exists (will not be recreated)", attach.GetName(), csiSource.VolumeHandle))
|
glog.V(4).Info(log("attachment [%v] for volume [%v] already exists (will not be recreated)", attachID, csiSource.VolumeHandle))
|
||||||
} else {
|
} else {
|
||||||
glog.V(4).Info(log("attachment [%v] for volume [%v] created successfully, will start probing for updates", attach.GetName(), csiSource.VolumeHandle))
|
glog.V(4).Info(log("attachment [%v] for volume [%v] created successfully", attachID, csiSource.VolumeHandle))
|
||||||
}
|
}
|
||||||
|
|
||||||
// probe for attachment update here
|
// probe for attachment update here
|
||||||
// NOTE: any error from waiting for attachment is logged only. This is because
|
// NOTE: any error from waiting for attachment is logged only. This is because
|
||||||
// the primariy intent of the enclosing method is to create VolumeAttachment.
|
// the primariy intent of the enclosing method is to create VolumeAttachment.
|
||||||
// DONOT return that error here as it is mitigated in attacher.WaitForAttach.
|
// DONOT return that error here as it is mitigated in attacher.WaitForAttach.
|
||||||
|
volAttachmentOK := true
|
||||||
if _, err := c.waitForVolumeAttachment(csiSource.VolumeHandle, attachID, csiTimeout); err != nil {
|
if _, err := c.waitForVolumeAttachment(csiSource.VolumeHandle, attachID, csiTimeout); err != nil {
|
||||||
glog.Error(log("attacher.Attach encountered error during attachment probing: %v", err))
|
volAttachmentOK = false
|
||||||
|
glog.Error(log("attacher.Attach attempted to wait for attachment to be ready, but failed with: %v", err))
|
||||||
}
|
}
|
||||||
|
|
||||||
|
glog.V(4).Info(log("attacher.Attach finished OK with VolumeAttachment verified=%t: attachment object [%s]", volAttachmentOK, attachID))
|
||||||
|
|
||||||
return attachID, nil
|
return attachID, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -151,7 +156,7 @@ func (c *csiAttacher) waitForVolumeAttachment(volumeHandle, attachID string, tim
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (c *csiAttacher) VolumesAreAttached(specs []*volume.Spec, nodeName types.NodeName) (map[*volume.Spec]bool, error) {
|
func (c *csiAttacher) VolumesAreAttached(specs []*volume.Spec, nodeName types.NodeName) (map[*volume.Spec]bool, error) {
|
||||||
glog.V(4).Info(log("probing attachment status for %d volumes ", len(specs)))
|
glog.V(4).Info(log("probing attachment status for %d volume(s) ", len(specs)))
|
||||||
|
|
||||||
attached := make(map[*volume.Spec]bool)
|
attached := make(map[*volume.Spec]bool)
|
||||||
|
|
||||||
@ -165,13 +170,15 @@ func (c *csiAttacher) VolumesAreAttached(specs []*volume.Spec, nodeName types.No
|
|||||||
glog.Error(log("attacher.VolumesAreAttached failed: %v", err))
|
glog.Error(log("attacher.VolumesAreAttached failed: %v", err))
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
attachID := getAttachmentName(source.VolumeHandle, string(nodeName))
|
|
||||||
|
attachID := getAttachmentName(source.VolumeHandle, source.Driver, string(nodeName))
|
||||||
glog.V(4).Info(log("probing attachment status for VolumeAttachment %v", attachID))
|
glog.V(4).Info(log("probing attachment status for VolumeAttachment %v", attachID))
|
||||||
attach, err := c.k8s.StorageV1alpha1().VolumeAttachments().Get(attachID, meta.GetOptions{})
|
attach, err := c.k8s.StorageV1alpha1().VolumeAttachments().Get(attachID, meta.GetOptions{})
|
||||||
if err != nil {
|
if err != nil {
|
||||||
glog.Error(log("attacher.VolumesAreAttached failed for attach.ID=%v: %v", attachID, err))
|
glog.Error(log("attacher.VolumesAreAttached failed for attach.ID=%v: %v", attachID, err))
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
|
glog.V(4).Info(log("attacher.VolumesAreAttached attachment [%v] has status.attached=%t", attachID, attach.Status.Attached))
|
||||||
attached[spec] = attach.Status.Attached
|
attached[spec] = attach.Status.Attached
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -201,10 +208,11 @@ func (c *csiAttacher) Detach(volumeName string, nodeName types.NodeName) error {
|
|||||||
glog.Error(log("detacher.Detach insufficient info encoded in volumeName"))
|
glog.Error(log("detacher.Detach insufficient info encoded in volumeName"))
|
||||||
return errors.New("volumeName missing expected data")
|
return errors.New("volumeName missing expected data")
|
||||||
}
|
}
|
||||||
|
|
||||||
|
driverName := parts[0]
|
||||||
volID := parts[1]
|
volID := parts[1]
|
||||||
attachID := getAttachmentName(volID, string(nodeName))
|
attachID := getAttachmentName(volID, driverName, string(nodeName))
|
||||||
err := c.k8s.StorageV1alpha1().VolumeAttachments().Delete(attachID, nil)
|
if err := c.k8s.StorageV1alpha1().VolumeAttachments().Delete(attachID, nil); err != nil {
|
||||||
if err != nil {
|
|
||||||
glog.Error(log("detacher.Detach failed to delete VolumeAttachment [%s]: %v", attachID, err))
|
glog.Error(log("detacher.Detach failed to delete VolumeAttachment [%s]: %v", attachID, err))
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
@ -257,12 +265,8 @@ func (c *csiAttacher) UnmountDevice(deviceMountPath string) error {
|
|||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func hashAttachmentName(volName, nodeName string) string {
|
// getAttachmentName returns csi-<sha252(volName,csiDriverName,NodeName>
|
||||||
result := sha256.Sum256([]byte(fmt.Sprintf("%s%s", volName, nodeName)))
|
func getAttachmentName(volName, csiDriverName, nodeName string) string {
|
||||||
return fmt.Sprintf("%x", result)
|
result := sha256.Sum256([]byte(fmt.Sprintf("%s%s%s", volName, csiDriverName, nodeName)))
|
||||||
}
|
return fmt.Sprintf("csi-%x", result)
|
||||||
|
|
||||||
func getAttachmentName(volName, nodeName string) string {
|
|
||||||
// TODO consider using a different prefix for attachment
|
|
||||||
return fmt.Sprintf("pv-%s", hashAttachmentName(volName, nodeName))
|
|
||||||
}
|
}
|
||||||
|
@ -17,13 +17,11 @@ limitations under the License.
|
|||||||
package csi
|
package csi
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"crypto/sha256"
|
|
||||||
"fmt"
|
"fmt"
|
||||||
"os"
|
"os"
|
||||||
"testing"
|
"testing"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
"k8s.io/api/core/v1"
|
|
||||||
storage "k8s.io/api/storage/v1alpha1"
|
storage "k8s.io/api/storage/v1alpha1"
|
||||||
apierrs "k8s.io/apimachinery/pkg/api/errors"
|
apierrs "k8s.io/apimachinery/pkg/api/errors"
|
||||||
meta "k8s.io/apimachinery/pkg/apis/meta/v1"
|
meta "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||||
@ -64,47 +62,93 @@ func TestAttacherAttach(t *testing.T) {
|
|||||||
|
|
||||||
testCases := []struct {
|
testCases := []struct {
|
||||||
name string
|
name string
|
||||||
pv *v1.PersistentVolume
|
|
||||||
nodeName string
|
nodeName string
|
||||||
attachHash [32]byte
|
driverName string
|
||||||
|
volumeName string
|
||||||
|
attachID string
|
||||||
shouldFail bool
|
shouldFail bool
|
||||||
}{
|
}{
|
||||||
{
|
{
|
||||||
name: "test ok 1",
|
name: "test ok 1",
|
||||||
pv: makeTestPV("test-pv-001", 10, testDriver, "test-vol-1"),
|
nodeName: "testnode-01",
|
||||||
nodeName: "test-node",
|
driverName: "testdriver-01",
|
||||||
attachHash: sha256.Sum256([]byte(fmt.Sprintf("%s%s", "test-vol-1", "test-node"))),
|
volumeName: "testvol-01",
|
||||||
|
attachID: getAttachmentName("testvol-01", "testdriver-01", "testnode-01"),
|
||||||
},
|
},
|
||||||
{
|
{
|
||||||
name: "test ok 2",
|
name: "test ok 2",
|
||||||
pv: makeTestPV("test-pv-002", 10, testDriver, "test-vol-002"),
|
nodeName: "node02",
|
||||||
nodeName: "test-node",
|
driverName: "driver02",
|
||||||
attachHash: sha256.Sum256([]byte(fmt.Sprintf("%s%s", "test-vol-002", "test-node"))),
|
volumeName: "vol02",
|
||||||
|
attachID: getAttachmentName("vol02", "driver02", "node02"),
|
||||||
},
|
},
|
||||||
{
|
{
|
||||||
name: "missing spec",
|
name: "mismatch vol",
|
||||||
pv: nil,
|
nodeName: "node02",
|
||||||
nodeName: "test-node",
|
driverName: "driver02",
|
||||||
attachHash: sha256.Sum256([]byte(fmt.Sprintf("%s%s", "test-vol-3", "test-node"))),
|
volumeName: "vol01",
|
||||||
|
attachID: getAttachmentName("vol02", "driver02", "node02"),
|
||||||
|
shouldFail: true,
|
||||||
|
},
|
||||||
|
{
|
||||||
|
name: "mismatch driver",
|
||||||
|
nodeName: "node02",
|
||||||
|
driverName: "driver000",
|
||||||
|
volumeName: "vol02",
|
||||||
|
attachID: getAttachmentName("vol02", "driver02", "node02"),
|
||||||
|
shouldFail: true,
|
||||||
|
},
|
||||||
|
{
|
||||||
|
name: "mismatch node",
|
||||||
|
nodeName: "node000",
|
||||||
|
driverName: "driver000",
|
||||||
|
volumeName: "vol02",
|
||||||
|
attachID: getAttachmentName("vol02", "driver02", "node02"),
|
||||||
shouldFail: true,
|
shouldFail: true,
|
||||||
},
|
},
|
||||||
}
|
}
|
||||||
|
|
||||||
for _, tc := range testCases {
|
// attacher loop
|
||||||
var spec *volume.Spec
|
for i, tc := range testCases {
|
||||||
if tc.pv != nil {
|
t.Log("test case: ", tc.name)
|
||||||
spec = volume.NewSpecFromPersistentVolume(tc.pv, tc.pv.Spec.PersistentVolumeSource.CSI.ReadOnly)
|
spec := volume.NewSpecFromPersistentVolume(makeTestPV(fmt.Sprintf("test-pv%d", i), 10, tc.driverName, tc.volumeName), false)
|
||||||
|
|
||||||
|
go func(id, nodename string, fail bool) {
|
||||||
|
attachID, err := csiAttacher.Attach(spec, types.NodeName(nodename))
|
||||||
|
if !fail && err != nil {
|
||||||
|
t.Error("was not expecting failure, but got err: ", err)
|
||||||
|
}
|
||||||
|
if attachID != id && !fail {
|
||||||
|
t.Errorf("expecting attachID %v, got %v", id, attachID)
|
||||||
|
}
|
||||||
|
}(tc.attachID, tc.nodeName, tc.shouldFail)
|
||||||
|
|
||||||
|
// update attachment to avoid long waitForAttachment
|
||||||
|
ticker := time.NewTicker(10 * time.Millisecond)
|
||||||
|
defer ticker.Stop()
|
||||||
|
// wait for attachment to be saved
|
||||||
|
var attach *storage.VolumeAttachment
|
||||||
|
for i := 0; i < 100; i++ {
|
||||||
|
attach, err = csiAttacher.k8s.StorageV1alpha1().VolumeAttachments().Get(tc.attachID, meta.GetOptions{})
|
||||||
|
if err != nil {
|
||||||
|
if apierrs.IsNotFound(err) {
|
||||||
|
<-ticker.C
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
t.Error(err)
|
||||||
|
}
|
||||||
|
if attach != nil {
|
||||||
|
break
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
attachID, err := csiAttacher.Attach(spec, types.NodeName(tc.nodeName))
|
if attach == nil {
|
||||||
if tc.shouldFail && err == nil {
|
t.Error("attachment not found")
|
||||||
t.Error("expected failure, but got nil err")
|
|
||||||
}
|
}
|
||||||
if attachID != "" {
|
attach.Status.Attached = true
|
||||||
expectedID := fmt.Sprintf("pv-%x", tc.attachHash)
|
_, err = csiAttacher.k8s.StorageV1alpha1().VolumeAttachments().Update(attach)
|
||||||
if attachID != expectedID {
|
if err != nil {
|
||||||
t.Errorf("expecting attachID %v, got %v", expectedID, attachID)
|
t.Error(err)
|
||||||
}
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -136,8 +180,8 @@ func TestAttacherWaitForVolumeAttachment(t *testing.T) {
|
|||||||
for i, tc := range testCases {
|
for i, tc := range testCases {
|
||||||
t.Logf("running test: %v", tc.name)
|
t.Logf("running test: %v", tc.name)
|
||||||
pvName := fmt.Sprintf("test-pv-%d", i)
|
pvName := fmt.Sprintf("test-pv-%d", i)
|
||||||
attachID := fmt.Sprintf("pv-%s", hashAttachmentName(pvName, nodeName))
|
volID := fmt.Sprintf("test-vol-%d", i)
|
||||||
|
attachID := getAttachmentName(volID, testDriver, nodeName)
|
||||||
attachment := makeTestAttachment(attachID, nodeName, pvName)
|
attachment := makeTestAttachment(attachID, nodeName, pvName)
|
||||||
attachment.Status.Attached = tc.attached
|
attachment.Status.Attached = tc.attached
|
||||||
attachment.Status.AttachError = tc.attachErr
|
attachment.Status.AttachError = tc.attachErr
|
||||||
@ -150,7 +194,7 @@ func TestAttacherWaitForVolumeAttachment(t *testing.T) {
|
|||||||
}
|
}
|
||||||
}()
|
}()
|
||||||
|
|
||||||
retID, err := csiAttacher.waitForVolumeAttachment("test-vol", attachID, tc.timeout)
|
retID, err := csiAttacher.waitForVolumeAttachment(volID, attachID, tc.timeout)
|
||||||
if tc.shouldFail && err == nil {
|
if tc.shouldFail && err == nil {
|
||||||
t.Error("expecting failure, but err is nil")
|
t.Error("expecting failure, but err is nil")
|
||||||
}
|
}
|
||||||
@ -192,7 +236,7 @@ func TestAttacherVolumesAreAttached(t *testing.T) {
|
|||||||
pv := makeTestPV("test-pv", 10, testDriver, volName)
|
pv := makeTestPV("test-pv", 10, testDriver, volName)
|
||||||
spec := volume.NewSpecFromPersistentVolume(pv, pv.Spec.PersistentVolumeSource.CSI.ReadOnly)
|
spec := volume.NewSpecFromPersistentVolume(pv, pv.Spec.PersistentVolumeSource.CSI.ReadOnly)
|
||||||
specs = append(specs, spec)
|
specs = append(specs, spec)
|
||||||
attachID := getAttachmentName(volName, nodeName)
|
attachID := getAttachmentName(volName, testDriver, nodeName)
|
||||||
attachment := makeTestAttachment(attachID, nodeName, pv.GetName())
|
attachment := makeTestAttachment(attachID, nodeName, pv.GetName())
|
||||||
attachment.Status.Attached = stat
|
attachment.Status.Attached = stat
|
||||||
_, err := csiAttacher.k8s.StorageV1alpha1().VolumeAttachments().Create(attachment)
|
_, err := csiAttacher.k8s.StorageV1alpha1().VolumeAttachments().Create(attachment)
|
||||||
@ -239,9 +283,9 @@ func TestAttacherDetach(t *testing.T) {
|
|||||||
attachID string
|
attachID string
|
||||||
shouldFail bool
|
shouldFail bool
|
||||||
}{
|
}{
|
||||||
{name: "normal test", volID: "vol-001", attachID: fmt.Sprintf("pv-%s", hashAttachmentName("vol-001", nodeName))},
|
{name: "normal test", volID: "vol-001", attachID: getAttachmentName("vol-001", testDriver, nodeName)},
|
||||||
{name: "normal test 2", volID: "vol-002", attachID: fmt.Sprintf("pv-%s", hashAttachmentName("vol-002", nodeName))},
|
{name: "normal test 2", volID: "vol-002", attachID: getAttachmentName("vol-002", testDriver, nodeName)},
|
||||||
{name: "object not found", volID: "vol-001", attachID: fmt.Sprintf("pv-%s", hashAttachmentName("vol-002", nodeName)), shouldFail: true},
|
{name: "object not found", volID: "vol-001", attachID: getAttachmentName("vol-002", testDriver, nodeName), shouldFail: true},
|
||||||
}
|
}
|
||||||
|
|
||||||
for _, tc := range testCases {
|
for _, tc := range testCases {
|
||||||
|
@ -24,7 +24,6 @@ import (
|
|||||||
"github.com/golang/glog"
|
"github.com/golang/glog"
|
||||||
grpctx "golang.org/x/net/context"
|
grpctx "golang.org/x/net/context"
|
||||||
api "k8s.io/api/core/v1"
|
api "k8s.io/api/core/v1"
|
||||||
storage "k8s.io/api/storage/v1alpha1"
|
|
||||||
meta "k8s.io/apimachinery/pkg/apis/meta/v1"
|
meta "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||||
"k8s.io/apimachinery/pkg/types"
|
"k8s.io/apimachinery/pkg/types"
|
||||||
"k8s.io/client-go/kubernetes"
|
"k8s.io/client-go/kubernetes"
|
||||||
@ -77,11 +76,18 @@ func (c *csiMountMgr) SetUp(fsGroup *int64) error {
|
|||||||
func (c *csiMountMgr) SetUpAt(dir string, fsGroup *int64) error {
|
func (c *csiMountMgr) SetUpAt(dir string, fsGroup *int64) error {
|
||||||
glog.V(4).Infof(log("Mounter.SetUpAt(%s)", dir))
|
glog.V(4).Infof(log("Mounter.SetUpAt(%s)", dir))
|
||||||
|
|
||||||
|
csiSource, err := getCSISourceFromSpec(c.spec)
|
||||||
|
if err != nil {
|
||||||
|
glog.Error(log("mounter.SetupAt failed to get CSI persistent source: %v", err))
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
ctx, cancel := grpctx.WithTimeout(grpctx.Background(), csiTimeout)
|
ctx, cancel := grpctx.WithTimeout(grpctx.Background(), csiTimeout)
|
||||||
defer cancel()
|
defer cancel()
|
||||||
|
|
||||||
csi := c.csiClient
|
csi := c.csiClient
|
||||||
pvName := c.spec.PersistentVolume.GetName()
|
nodeName := string(c.plugin.host.GetNodeName())
|
||||||
|
attachID := getAttachmentName(csiSource.VolumeHandle, csiSource.Driver, nodeName)
|
||||||
|
|
||||||
// ensure version is supported
|
// ensure version is supported
|
||||||
if err := csi.AssertSupportedVersion(ctx, csiVersion); err != nil {
|
if err := csi.AssertSupportedVersion(ctx, csiVersion); err != nil {
|
||||||
@ -92,25 +98,14 @@ func (c *csiMountMgr) SetUpAt(dir string, fsGroup *int64) error {
|
|||||||
// search for attachment by VolumeAttachment.Spec.Source.PersistentVolumeName
|
// search for attachment by VolumeAttachment.Spec.Source.PersistentVolumeName
|
||||||
if c.volumeInfo == nil {
|
if c.volumeInfo == nil {
|
||||||
|
|
||||||
//TODO (vladimirvivien) consider using VolumesAttachments().Get() to retrieve
|
attachment, err := c.k8s.StorageV1alpha1().VolumeAttachments().Get(attachID, meta.GetOptions{})
|
||||||
//the object directly. This requires the ability to reconstruct the ID using volumeName+nodeName (nodename may not be avilable)
|
|
||||||
attachList, err := c.k8s.StorageV1alpha1().VolumeAttachments().List(meta.ListOptions{})
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
glog.Error(log("failed to get volume attachments: %v", err))
|
glog.Error(log("mounter.SetupAt failed while getting volume attachment [id=%v]: %v", attachID, err))
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
var attachment *storage.VolumeAttachment
|
|
||||||
for _, attach := range attachList.Items {
|
|
||||||
if attach.Spec.Source.PersistentVolumeName != nil &&
|
|
||||||
*attach.Spec.Source.PersistentVolumeName == pvName {
|
|
||||||
attachment = &attach
|
|
||||||
break
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
if attachment == nil {
|
if attachment == nil {
|
||||||
glog.Error(log("unable to find VolumeAttachment with PV.name = %s", pvName))
|
glog.Error(log("unable to find VolumeAttachment [id=%s]", attachID))
|
||||||
return errors.New("no existing VolumeAttachment found")
|
return errors.New("no existing VolumeAttachment found")
|
||||||
}
|
}
|
||||||
c.volumeInfo = attachment.Status.AttachmentMetadata
|
c.volumeInfo = attachment.Status.AttachmentMetadata
|
||||||
@ -122,7 +117,7 @@ func (c *csiMountMgr) SetUpAt(dir string, fsGroup *int64) error {
|
|||||||
accessMode = c.spec.PersistentVolume.Spec.AccessModes[0]
|
accessMode = c.spec.PersistentVolume.Spec.AccessModes[0]
|
||||||
}
|
}
|
||||||
|
|
||||||
err := csi.NodePublishVolume(
|
err = csi.NodePublishVolume(
|
||||||
ctx,
|
ctx,
|
||||||
c.volumeID,
|
c.volumeID,
|
||||||
c.readOnly,
|
c.readOnly,
|
||||||
@ -133,7 +128,7 @@ func (c *csiMountMgr) SetUpAt(dir string, fsGroup *int64) error {
|
|||||||
)
|
)
|
||||||
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
glog.Errorf(log("Mounter.Setup failed: %v", err))
|
glog.Errorf(log("Mounter.SetupAt failed: %v", err))
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
glog.V(4).Infof(log("successfully mounted %s", dir))
|
glog.V(4).Infof(log("successfully mounted %s", dir))
|
||||||
|
@ -26,8 +26,10 @@ import (
|
|||||||
storage "k8s.io/api/storage/v1alpha1"
|
storage "k8s.io/api/storage/v1alpha1"
|
||||||
meta "k8s.io/apimachinery/pkg/apis/meta/v1"
|
meta "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||||
"k8s.io/apimachinery/pkg/types"
|
"k8s.io/apimachinery/pkg/types"
|
||||||
|
fakeclient "k8s.io/client-go/kubernetes/fake"
|
||||||
"k8s.io/kubernetes/pkg/volume"
|
"k8s.io/kubernetes/pkg/volume"
|
||||||
"k8s.io/kubernetes/pkg/volume/csi/fake"
|
"k8s.io/kubernetes/pkg/volume/csi/fake"
|
||||||
|
volumetest "k8s.io/kubernetes/pkg/volume/testing"
|
||||||
)
|
)
|
||||||
|
|
||||||
var (
|
var (
|
||||||
@ -68,7 +70,14 @@ func TestMounterGetPath(t *testing.T) {
|
|||||||
func TestMounterSetUp(t *testing.T) {
|
func TestMounterSetUp(t *testing.T) {
|
||||||
plug, tmpDir := newTestPlugin(t)
|
plug, tmpDir := newTestPlugin(t)
|
||||||
defer os.RemoveAll(tmpDir)
|
defer os.RemoveAll(tmpDir)
|
||||||
|
fakeClient := fakeclient.NewSimpleClientset()
|
||||||
|
host := volumetest.NewFakeVolumeHostWithNodeName(
|
||||||
|
tmpDir,
|
||||||
|
fakeClient,
|
||||||
|
nil,
|
||||||
|
"fakeNode",
|
||||||
|
)
|
||||||
|
plug.host = host
|
||||||
pv := makeTestPV("test-pv", 10, testDriver, testVol)
|
pv := makeTestPV("test-pv", 10, testDriver, testVol)
|
||||||
pvName := pv.GetName()
|
pvName := pv.GetName()
|
||||||
|
|
||||||
@ -88,9 +97,11 @@ func TestMounterSetUp(t *testing.T) {
|
|||||||
csiMounter := mounter.(*csiMountMgr)
|
csiMounter := mounter.(*csiMountMgr)
|
||||||
csiMounter.csiClient = setupClient(t)
|
csiMounter.csiClient = setupClient(t)
|
||||||
|
|
||||||
|
attachID := getAttachmentName(csiMounter.volumeID, csiMounter.driverName, string(plug.host.GetNodeName()))
|
||||||
|
|
||||||
attachment := &storage.VolumeAttachment{
|
attachment := &storage.VolumeAttachment{
|
||||||
ObjectMeta: meta.ObjectMeta{
|
ObjectMeta: meta.ObjectMeta{
|
||||||
Name: "pv-1234556775313",
|
Name: attachID,
|
||||||
},
|
},
|
||||||
Spec: storage.VolumeAttachmentSpec{
|
Spec: storage.VolumeAttachmentSpec{
|
||||||
NodeName: "test-node",
|
NodeName: "test-node",
|
||||||
|
@ -34,14 +34,13 @@ import (
|
|||||||
)
|
)
|
||||||
|
|
||||||
const (
|
const (
|
||||||
csiName = "csi"
|
|
||||||
csiPluginName = "kubernetes.io/csi"
|
csiPluginName = "kubernetes.io/csi"
|
||||||
|
|
||||||
// TODO (vladimirvivien) implement a more dynamic way to discover
|
// TODO (vladimirvivien) implement a more dynamic way to discover
|
||||||
// the unix domain socket path for each installed csi driver.
|
// the unix domain socket path for each installed csi driver.
|
||||||
// TODO (vladimirvivien) would be nice to name socket with a .sock extension
|
// TODO (vladimirvivien) would be nice to name socket with a .sock extension
|
||||||
// for consistency.
|
// for consistency.
|
||||||
csiAddrTemplate = "/var/lib/kubelet/plugins/%v"
|
csiAddrTemplate = "/var/lib/kubelet/plugins/%v/csi.sock"
|
||||||
csiTimeout = 15 * time.Second
|
csiTimeout = 15 * time.Second
|
||||||
volNameSep = "^"
|
volNameSep = "^"
|
||||||
)
|
)
|
||||||
|
@ -303,6 +303,9 @@ type VolumeHost interface {
|
|||||||
|
|
||||||
// Returns the labels on the node
|
// Returns the labels on the node
|
||||||
GetNodeLabels() (map[string]string, error)
|
GetNodeLabels() (map[string]string, error)
|
||||||
|
|
||||||
|
// Returns the name of the node
|
||||||
|
GetNodeName() types.NodeName
|
||||||
}
|
}
|
||||||
|
|
||||||
// VolumePluginMgr tracks registered plugins.
|
// VolumePluginMgr tracks registered plugins.
|
||||||
|
@ -53,6 +53,7 @@ type fakeVolumeHost struct {
|
|||||||
exec mount.Exec
|
exec mount.Exec
|
||||||
writer io.Writer
|
writer io.Writer
|
||||||
nodeLabels map[string]string
|
nodeLabels map[string]string
|
||||||
|
nodeName string
|
||||||
}
|
}
|
||||||
|
|
||||||
func NewFakeVolumeHost(rootDir string, kubeClient clientset.Interface, plugins []VolumePlugin) *fakeVolumeHost {
|
func NewFakeVolumeHost(rootDir string, kubeClient clientset.Interface, plugins []VolumePlugin) *fakeVolumeHost {
|
||||||
@ -69,6 +70,12 @@ func NewFakeVolumeHostWithNodeLabels(rootDir string, kubeClient clientset.Interf
|
|||||||
return volHost
|
return volHost
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func NewFakeVolumeHostWithNodeName(rootDir string, kubeClient clientset.Interface, plugins []VolumePlugin, nodeName string) *fakeVolumeHost {
|
||||||
|
volHost := newFakeVolumeHost(rootDir, kubeClient, plugins, nil)
|
||||||
|
volHost.nodeName = nodeName
|
||||||
|
return volHost
|
||||||
|
}
|
||||||
|
|
||||||
func newFakeVolumeHost(rootDir string, kubeClient clientset.Interface, plugins []VolumePlugin, cloud cloudprovider.Interface) *fakeVolumeHost {
|
func newFakeVolumeHost(rootDir string, kubeClient clientset.Interface, plugins []VolumePlugin, cloud cloudprovider.Interface) *fakeVolumeHost {
|
||||||
host := &fakeVolumeHost{rootDir: rootDir, kubeClient: kubeClient, cloud: cloud}
|
host := &fakeVolumeHost{rootDir: rootDir, kubeClient: kubeClient, cloud: cloud}
|
||||||
host.mounter = &mount.FakeMounter{}
|
host.mounter = &mount.FakeMounter{}
|
||||||
@ -177,6 +184,10 @@ func (f *fakeVolumeHost) GetNodeLabels() (map[string]string, error) {
|
|||||||
return f.nodeLabels, nil
|
return f.nodeLabels, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (f *fakeVolumeHost) GetNodeName() types.NodeName {
|
||||||
|
return types.NodeName(f.nodeName)
|
||||||
|
}
|
||||||
|
|
||||||
func ProbeVolumePlugins(config VolumeConfig) []VolumePlugin {
|
func ProbeVolumePlugins(config VolumeConfig) []VolumePlugin {
|
||||||
if _, ok := config.OtherAttributes["fake-property"]; ok {
|
if _, ok := config.OtherAttributes["fake-property"]; ok {
|
||||||
return []VolumePlugin{
|
return []VolumePlugin{
|
||||||
|
@ -58,16 +58,25 @@ func buildControllerRoles() ([]rbac.ClusterRole, []rbac.ClusterRoleBinding) {
|
|||||||
// controllerRoleBindings is a slice of roles used for controllers
|
// controllerRoleBindings is a slice of roles used for controllers
|
||||||
controllerRoleBindings := []rbac.ClusterRoleBinding{}
|
controllerRoleBindings := []rbac.ClusterRoleBinding{}
|
||||||
|
|
||||||
addControllerRole(&controllerRoles, &controllerRoleBindings, rbac.ClusterRole{
|
addControllerRole(&controllerRoles, &controllerRoleBindings, func() rbac.ClusterRole {
|
||||||
ObjectMeta: metav1.ObjectMeta{Name: saRolePrefix + "attachdetach-controller"},
|
role := rbac.ClusterRole{
|
||||||
Rules: []rbac.PolicyRule{
|
ObjectMeta: metav1.ObjectMeta{Name: saRolePrefix + "attachdetach-controller"},
|
||||||
rbac.NewRule("list", "watch").Groups(legacyGroup).Resources("persistentvolumes", "persistentvolumeclaims").RuleOrDie(),
|
Rules: []rbac.PolicyRule{
|
||||||
rbac.NewRule("get", "list", "watch").Groups(legacyGroup).Resources("nodes").RuleOrDie(),
|
rbac.NewRule("list", "watch").Groups(legacyGroup).Resources("persistentvolumes", "persistentvolumeclaims").RuleOrDie(),
|
||||||
rbac.NewRule("patch", "update").Groups(legacyGroup).Resources("nodes/status").RuleOrDie(),
|
rbac.NewRule("get", "list", "watch").Groups(legacyGroup).Resources("nodes").RuleOrDie(),
|
||||||
rbac.NewRule("list", "watch").Groups(legacyGroup).Resources("pods").RuleOrDie(),
|
rbac.NewRule("patch", "update").Groups(legacyGroup).Resources("nodes/status").RuleOrDie(),
|
||||||
eventsRule(),
|
rbac.NewRule("list", "watch").Groups(legacyGroup).Resources("pods").RuleOrDie(),
|
||||||
},
|
eventsRule(),
|
||||||
})
|
},
|
||||||
|
}
|
||||||
|
|
||||||
|
if utilfeature.DefaultFeatureGate.Enabled(features.CSIPersistentVolume) {
|
||||||
|
role.Rules = append(role.Rules, rbac.NewRule("get", "create", "delete", "list", "watch").Groups(storageGroup).Resources("volumeattachments").RuleOrDie())
|
||||||
|
}
|
||||||
|
|
||||||
|
return role
|
||||||
|
}())
|
||||||
|
|
||||||
addControllerRole(&controllerRoles, &controllerRoleBindings, rbac.ClusterRole{
|
addControllerRole(&controllerRoles, &controllerRoleBindings, rbac.ClusterRole{
|
||||||
ObjectMeta: metav1.ObjectMeta{Name: saRolePrefix + "clusterrole-aggregation-controller"},
|
ObjectMeta: metav1.ObjectMeta{Name: saRolePrefix + "clusterrole-aggregation-controller"},
|
||||||
Rules: []rbac.PolicyRule{
|
Rules: []rbac.PolicyRule{
|
||||||
|
@ -143,6 +143,12 @@ func NodeRules() []rbac.PolicyRule {
|
|||||||
pvcStatusPolicyRule := rbac.NewRule("get", "update", "patch").Groups(legacyGroup).Resources("persistentvolumeclaims/status").RuleOrDie()
|
pvcStatusPolicyRule := rbac.NewRule("get", "update", "patch").Groups(legacyGroup).Resources("persistentvolumeclaims/status").RuleOrDie()
|
||||||
nodePolicyRules = append(nodePolicyRules, pvcStatusPolicyRule)
|
nodePolicyRules = append(nodePolicyRules, pvcStatusPolicyRule)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// CSI
|
||||||
|
if utilfeature.DefaultFeatureGate.Enabled(features.CSIPersistentVolume) {
|
||||||
|
volAttachRule := rbac.NewRule("get").Groups(storageGroup).Resources("volumeattachments").RuleOrDie()
|
||||||
|
nodePolicyRules = append(nodePolicyRules, volAttachRule)
|
||||||
|
}
|
||||||
return nodePolicyRules
|
return nodePolicyRules
|
||||||
}
|
}
|
||||||
|
|
||||||
|
Loading…
Reference in New Issue
Block a user