mirror of
https://github.com/k3s-io/kubernetes.git
synced 2025-07-24 04:06:03 +00:00
Merge pull request #101737 from Jiawei0227/migration_fix
Use CSI driver to determine unique name for migrated in-tree plugins
This commit is contained in:
commit
a5cf298a95
@ -57,6 +57,7 @@ import (
|
||||
"k8s.io/kubernetes/pkg/features"
|
||||
proxyutil "k8s.io/kubernetes/pkg/proxy/util"
|
||||
"k8s.io/kubernetes/pkg/volume"
|
||||
"k8s.io/kubernetes/pkg/volume/csi"
|
||||
"k8s.io/kubernetes/pkg/volume/csimigration"
|
||||
volumeutil "k8s.io/kubernetes/pkg/volume/util"
|
||||
"k8s.io/kubernetes/pkg/volume/util/operationexecutor"
|
||||
@ -725,6 +726,22 @@ func (adc *attachDetachController) processVolumeAttachments() error {
|
||||
err)
|
||||
continue
|
||||
}
|
||||
pluginName := plugin.GetPluginName()
|
||||
if adc.csiMigratedPluginManager.IsMigrationEnabledForPlugin(pluginName) {
|
||||
plugin, _ = adc.volumePluginMgr.FindAttachablePluginByName(csi.CSIPluginName)
|
||||
// podNamespace is not needed here for Azurefile as the volumeName generated will be the same with or without podNamespace
|
||||
volumeSpec, err = csimigration.TranslateInTreeSpecToCSI(volumeSpec, "" /* podNamespace */, adc.intreeToCSITranslator)
|
||||
if err != nil {
|
||||
klog.Errorf(
|
||||
"Failed to translate intree volumeSpec to CSI volumeSpec for volume:%q, va.Name:%q, nodeName:%q: %v",
|
||||
*pvName,
|
||||
va.Name,
|
||||
nodeName,
|
||||
pluginName,
|
||||
err)
|
||||
continue
|
||||
}
|
||||
}
|
||||
volumeName, err := volumeutil.GetUniqueVolumeNameFromSpec(plugin, volumeSpec)
|
||||
if err != nil {
|
||||
klog.Errorf(
|
||||
|
@ -26,12 +26,23 @@ import (
|
||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||
"k8s.io/apimachinery/pkg/labels"
|
||||
"k8s.io/apimachinery/pkg/types"
|
||||
utilfeature "k8s.io/apiserver/pkg/util/feature"
|
||||
"k8s.io/client-go/informers"
|
||||
kcache "k8s.io/client-go/tools/cache"
|
||||
featuregatetesting "k8s.io/component-base/featuregate/testing"
|
||||
"k8s.io/kubernetes/pkg/controller"
|
||||
"k8s.io/kubernetes/pkg/controller/volume/attachdetach/cache"
|
||||
controllervolumetesting "k8s.io/kubernetes/pkg/controller/volume/attachdetach/testing"
|
||||
"k8s.io/kubernetes/pkg/features"
|
||||
"k8s.io/kubernetes/pkg/volume"
|
||||
"k8s.io/kubernetes/pkg/volume/csi"
|
||||
"k8s.io/kubernetes/pkg/volume/gcepd"
|
||||
"k8s.io/kubernetes/pkg/volume/util"
|
||||
)
|
||||
|
||||
const (
|
||||
intreePDUniqueNamePrefix = "kubernetes.io/gce-pd/"
|
||||
csiPDUniqueNamePrefix = "kubernetes.io/csi/pd.csi.storage.gke.io^projects/UNSPECIFIED/zones/UNSPECIFIED/disks/"
|
||||
)
|
||||
|
||||
func Test_NewAttachDetachController_Positive(t *testing.T) {
|
||||
@ -348,6 +359,7 @@ type vaTest struct {
|
||||
vaName string
|
||||
vaNodeName string
|
||||
vaAttachStatus bool
|
||||
csiMigration bool
|
||||
expected_attaches map[string][]string
|
||||
expected_detaches map[string][]string
|
||||
}
|
||||
@ -386,6 +398,16 @@ func Test_ADC_VolumeAttachmentRecovery(t *testing.T) {
|
||||
expected_attaches: map[string][]string{},
|
||||
expected_detaches: map[string][]string{"mynode-1": {"vol1"}},
|
||||
},
|
||||
{
|
||||
testName: "CSI Migration",
|
||||
volName: "vol1",
|
||||
podNodeName: "mynode-1",
|
||||
pvName: "pv1",
|
||||
vaName: "va1",
|
||||
vaNodeName: "mynode-1",
|
||||
vaAttachStatus: false,
|
||||
csiMigration: true,
|
||||
},
|
||||
} {
|
||||
t.Run(tc.testName, func(t *testing.T) {
|
||||
volumeAttachmentRecoveryTestCase(t, tc)
|
||||
@ -396,7 +418,17 @@ func Test_ADC_VolumeAttachmentRecovery(t *testing.T) {
|
||||
func volumeAttachmentRecoveryTestCase(t *testing.T, tc vaTest) {
|
||||
fakeKubeClient := controllervolumetesting.CreateTestClient()
|
||||
informerFactory := informers.NewSharedInformerFactory(fakeKubeClient, time.Second*1)
|
||||
plugins := controllervolumetesting.CreateTestPlugin()
|
||||
var plugins []volume.VolumePlugin
|
||||
if tc.csiMigration {
|
||||
defer featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.CSIMigration, tc.csiMigration)()
|
||||
defer featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.CSIMigrationGCE, tc.csiMigration)()
|
||||
defer featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.InTreePluginGCEUnregister, tc.csiMigration)()
|
||||
|
||||
plugins = gcepd.ProbeVolumePlugins()
|
||||
plugins = append(plugins, csi.ProbeVolumePlugins()...)
|
||||
} else {
|
||||
plugins = controllervolumetesting.CreateTestPlugin()
|
||||
}
|
||||
nodeInformer := informerFactory.Core().V1().Nodes().Informer()
|
||||
podInformer := informerFactory.Core().V1().Pods().Informer()
|
||||
pvInformer := informerFactory.Core().V1().PersistentVolumes().Informer()
|
||||
@ -443,6 +475,32 @@ func volumeAttachmentRecoveryTestCase(t *testing.T, tc vaTest) {
|
||||
nodeInformer.GetIndexer().Add(&nodeToAdd)
|
||||
}
|
||||
|
||||
if tc.csiMigration {
|
||||
newNode := &v1.Node{
|
||||
ObjectMeta: metav1.ObjectMeta{
|
||||
Name: tc.podNodeName,
|
||||
Labels: map[string]string{
|
||||
"name": tc.podNodeName,
|
||||
},
|
||||
Annotations: map[string]string{
|
||||
util.ControllerManagedAttachAnnotation: "true",
|
||||
},
|
||||
},
|
||||
Status: v1.NodeStatus{
|
||||
VolumesAttached: []v1.AttachedVolume{
|
||||
{
|
||||
Name: v1.UniqueVolumeName(csiPDUniqueNamePrefix + tc.volName),
|
||||
DevicePath: "fake/path",
|
||||
},
|
||||
},
|
||||
},
|
||||
}
|
||||
_, err = adc.kubeClient.CoreV1().Nodes().Update(context.TODO(), newNode, metav1.UpdateOptions{})
|
||||
if err != nil {
|
||||
t.Fatalf("Run failed with error. Failed to create a new pod: <%v>", err)
|
||||
}
|
||||
nodeInformer.GetIndexer().Add(&newNode)
|
||||
}
|
||||
// Create and add objects requested by the test
|
||||
if tc.podName != "" {
|
||||
newPod := controllervolumetesting.NewPodWithVolume(tc.podName, tc.volName, tc.podNodeName)
|
||||
@ -461,7 +519,7 @@ func volumeAttachmentRecoveryTestCase(t *testing.T, tc vaTest) {
|
||||
pvInformer.GetIndexer().Add(newPv)
|
||||
}
|
||||
if tc.vaName != "" {
|
||||
newVa := controllervolumetesting.NewVolumeAttachment("va1", "pv1", "mynode-1", false)
|
||||
newVa := controllervolumetesting.NewVolumeAttachment(tc.vaName, tc.pvName, tc.vaNodeName, tc.vaAttachStatus)
|
||||
_, err = adc.kubeClient.StorageV1().VolumeAttachments().Create(context.TODO(), newVa, metav1.CreateOptions{})
|
||||
if err != nil {
|
||||
t.Fatalf("Run failed with error. Failed to create a new volumeAttachment: <%v>", err)
|
||||
@ -497,8 +555,34 @@ func volumeAttachmentRecoveryTestCase(t *testing.T, tc vaTest) {
|
||||
go adc.desiredStateOfWorldPopulator.Run(stopCh)
|
||||
defer close(stopCh)
|
||||
|
||||
// Verify if expected attaches and detaches have happened
|
||||
testPlugin := plugins[0].(*controllervolumetesting.TestPlugin)
|
||||
if tc.csiMigration {
|
||||
verifyExpectedVolumeState(t, adc, tc)
|
||||
} else {
|
||||
// Verify if expected attaches and detaches have happened
|
||||
testPlugin := plugins[0].(*controllervolumetesting.TestPlugin)
|
||||
verifyAttachDetachCalls(t, testPlugin, tc)
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
func verifyExpectedVolumeState(t *testing.T, adc *attachDetachController, tc vaTest) {
|
||||
// Since csi migration is turned on, the attach state for the PV should be in CSI format.
|
||||
attachedState := adc.actualStateOfWorld.GetAttachState(
|
||||
v1.UniqueVolumeName(csiPDUniqueNamePrefix+tc.volName), types.NodeName(tc.vaNodeName))
|
||||
if attachedState != cache.AttachStateAttached {
|
||||
t.Fatalf("Expected attachedState %v, but it is %v", cache.AttachStateAttached, attachedState)
|
||||
}
|
||||
|
||||
// kubernetes.io/gce-pd/<volName> should not be marked when CSI Migration is on
|
||||
// so it should be in detach status
|
||||
attachedState = adc.actualStateOfWorld.GetAttachState(
|
||||
v1.UniqueVolumeName(intreePDUniqueNamePrefix+tc.volName), types.NodeName(tc.vaNodeName))
|
||||
if attachedState != cache.AttachStateDetached {
|
||||
t.Fatalf("Expected attachedState not to be %v, but it is %v", cache.AttachStateDetached, attachedState)
|
||||
}
|
||||
}
|
||||
|
||||
func verifyAttachDetachCalls(t *testing.T, testPlugin *controllervolumetesting.TestPlugin, tc vaTest) {
|
||||
for tries := 0; tries <= 10; tries++ { // wait & try few times before failing the test
|
||||
expected_op_map := tc.expected_attaches
|
||||
plugin_map := testPlugin.GetAttachedVolumes()
|
||||
@ -543,5 +627,4 @@ func volumeAttachmentRecoveryTestCase(t *testing.T, tc vaTest) {
|
||||
if testPlugin.GetErrorEncountered() {
|
||||
t.Fatalf("Fatal error encountered in the testing volume plugin")
|
||||
}
|
||||
|
||||
}
|
||||
|
@ -158,6 +158,16 @@ func CreateTestClient() *fake.Clientset {
|
||||
}
|
||||
attachVolumeToNode("lostVolumeName", nodeName)
|
||||
}
|
||||
fakeClient.AddReactor("update", "nodes", func(action core.Action) (handled bool, ret runtime.Object, err error) {
|
||||
updateAction := action.(core.UpdateAction)
|
||||
node := updateAction.GetObject().(*v1.Node)
|
||||
for index, n := range nodes.Items {
|
||||
if n.Name == node.Name {
|
||||
nodes.Items[index] = *node
|
||||
}
|
||||
}
|
||||
return true, updateAction.GetObject(), nil
|
||||
})
|
||||
fakeClient.AddReactor("list", "nodes", func(action core.Action) (handled bool, ret runtime.Object, err error) {
|
||||
obj := &v1.NodeList{}
|
||||
obj.Items = append(obj.Items, nodes.Items...)
|
||||
|
Loading…
Reference in New Issue
Block a user