mirror of
https://github.com/k3s-io/kubernetes.git
synced 2025-08-07 11:13:48 +00:00
Merge pull request #124439 from bells17/csi-translation-lib-structured-and-contextual-logging
Migrate k8s.io/csi-translation-lib/.* to structured logging
This commit is contained in:
commit
113b12c6fb
@ -136,6 +136,7 @@ linters-settings: # please keep this alphabetized
|
||||
contextual k8s.io/component-helpers/.*
|
||||
contextual k8s.io/cri-api/.*
|
||||
contextual k8s.io/cri-client/.*
|
||||
contextual k8s.io/csi-translation-lib/.*
|
||||
contextual k8s.io/dynamic-resource-allocation/.*
|
||||
contextual k8s.io/endpointslice/.*
|
||||
contextual k8s.io/kms/.*
|
||||
|
@ -182,6 +182,7 @@ linters-settings: # please keep this alphabetized
|
||||
contextual k8s.io/component-helpers/.*
|
||||
contextual k8s.io/cri-api/.*
|
||||
contextual k8s.io/cri-client/.*
|
||||
contextual k8s.io/csi-translation-lib/.*
|
||||
contextual k8s.io/dynamic-resource-allocation/.*
|
||||
contextual k8s.io/endpointslice/.*
|
||||
contextual k8s.io/kms/.*
|
||||
|
@ -185,6 +185,7 @@ linters-settings: # please keep this alphabetized
|
||||
contextual k8s.io/component-helpers/.*
|
||||
contextual k8s.io/cri-api/.*
|
||||
contextual k8s.io/cri-client/.*
|
||||
contextual k8s.io/csi-translation-lib/.*
|
||||
contextual k8s.io/dynamic-resource-allocation/.*
|
||||
contextual k8s.io/endpointslice/.*
|
||||
contextual k8s.io/kms/.*
|
||||
|
@ -32,6 +32,7 @@ contextual k8s.io/client-go/tools/record/.*
|
||||
contextual k8s.io/component-helpers/.*
|
||||
contextual k8s.io/cri-api/.*
|
||||
contextual k8s.io/cri-client/.*
|
||||
contextual k8s.io/csi-translation-lib/.*
|
||||
contextual k8s.io/dynamic-resource-allocation/.*
|
||||
contextual k8s.io/endpointslice/.*
|
||||
contextual k8s.io/kms/.*
|
||||
|
@ -711,7 +711,7 @@ func (adc *attachDetachController) processVolumeAttachments(logger klog.Logger)
|
||||
// PV is migrated and should be handled by the CSI plugin instead of the in-tree one
|
||||
plugin, _ = adc.volumePluginMgr.FindAttachablePluginByName(csi.CSIPluginName)
|
||||
// podNamespace is not needed here for Azurefile as the volumeName generated will be the same with or without podNamespace
|
||||
volumeSpec, err = csimigration.TranslateInTreeSpecToCSI(volumeSpec, "" /* podNamespace */, adc.intreeToCSITranslator)
|
||||
volumeSpec, err = csimigration.TranslateInTreeSpecToCSI(logger, volumeSpec, "" /* podNamespace */, adc.intreeToCSITranslator)
|
||||
if err != nil {
|
||||
logger.Error(err, "Failed to translate intree volumeSpec to CSI volumeSpec for volume", "node", klog.KRef("", string(nodeName)), "inTreePluginName", inTreePluginName, "vaName", va.Name, "PV", klog.KRef("", *pvName))
|
||||
continue
|
||||
|
@ -81,7 +81,7 @@ func CreateVolumeSpec(logger klog.Logger, podVolume v1.Volume, pod *v1.Pod, node
|
||||
err)
|
||||
}
|
||||
|
||||
volumeSpec, err = translateInTreeSpecToCSIIfNeeded(volumeSpec, nodeName, vpm, csiMigratedPluginManager, csiTranslator, pod.Namespace)
|
||||
volumeSpec, err = translateInTreeSpecToCSIIfNeeded(logger, volumeSpec, nodeName, vpm, csiMigratedPluginManager, csiTranslator, pod.Namespace)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf(
|
||||
"error performing CSI migration checks and translation for PVC %q/%q: %v",
|
||||
@ -100,7 +100,7 @@ func CreateVolumeSpec(logger klog.Logger, podVolume v1.Volume, pod *v1.Pod, node
|
||||
clonedPodVolume := podVolume.DeepCopy()
|
||||
|
||||
origspec := volume.NewSpecFromVolume(clonedPodVolume)
|
||||
spec, err := translateInTreeSpecToCSIIfNeeded(origspec, nodeName, vpm, csiMigratedPluginManager, csiTranslator, pod.Namespace)
|
||||
spec, err := translateInTreeSpecToCSIIfNeeded(logger, origspec, nodeName, vpm, csiMigratedPluginManager, csiTranslator, pod.Namespace)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf(
|
||||
"error performing CSI migration checks and translation for inline volume %q: %v",
|
||||
@ -241,7 +241,7 @@ func ProcessPodVolumes(logger klog.Logger, pod *v1.Pod, addVolumes bool, desired
|
||||
return
|
||||
}
|
||||
|
||||
func translateInTreeSpecToCSIIfNeeded(spec *volume.Spec, nodeName types.NodeName, vpm *volume.VolumePluginMgr, csiMigratedPluginManager csimigration.PluginManager, csiTranslator csimigration.InTreeToCSITranslator, podNamespace string) (*volume.Spec, error) {
|
||||
func translateInTreeSpecToCSIIfNeeded(logger klog.Logger, spec *volume.Spec, nodeName types.NodeName, vpm *volume.VolumePluginMgr, csiMigratedPluginManager csimigration.PluginManager, csiTranslator csimigration.InTreeToCSITranslator, podNamespace string) (*volume.Spec, error) {
|
||||
translatedSpec := spec
|
||||
migratable, err := csiMigratedPluginManager.IsMigratable(spec)
|
||||
if err != nil {
|
||||
@ -256,7 +256,7 @@ func translateInTreeSpecToCSIIfNeeded(spec *volume.Spec, nodeName types.NodeName
|
||||
return nil, err
|
||||
}
|
||||
if migratable && migrationSupportedOnNode {
|
||||
translatedSpec, err = csimigration.TranslateInTreeSpecToCSI(spec, podNamespace, csiTranslator)
|
||||
translatedSpec, err = csimigration.TranslateInTreeSpecToCSI(logger, spec, podNamespace, csiTranslator)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
@ -1648,7 +1648,7 @@ func (kl *Kubelet) Run(updates <-chan kubetypes.PodUpdate) {
|
||||
kl.warnCgroupV1Usage()
|
||||
|
||||
// Start volume manager
|
||||
go kl.volumeManager.Run(kl.sourcesReady, wait.NeverStop)
|
||||
go kl.volumeManager.Run(ctx, kl.sourcesReady)
|
||||
|
||||
if kl.kubeClient != nil {
|
||||
// Start two go-routines to update the status.
|
||||
|
@ -2997,12 +2997,6 @@ func simulateVolumeInUseUpdate(
|
||||
}
|
||||
}
|
||||
|
||||
func runVolumeManager(kubelet *Kubelet) chan struct{} {
|
||||
stopCh := make(chan struct{})
|
||||
go kubelet.volumeManager.Run(kubelet.sourcesReady, stopCh)
|
||||
return stopCh
|
||||
}
|
||||
|
||||
// dirExists returns true if the path exists and represents a directory.
|
||||
func dirExists(path string) bool {
|
||||
s, err := os.Stat(path)
|
||||
|
@ -31,6 +31,7 @@ import (
|
||||
"k8s.io/kubernetes/pkg/volume"
|
||||
volumetest "k8s.io/kubernetes/pkg/volume/testing"
|
||||
"k8s.io/kubernetes/pkg/volume/util"
|
||||
"k8s.io/kubernetes/test/utils/ktesting"
|
||||
)
|
||||
|
||||
func TestListVolumesForPod(t *testing.T) {
|
||||
@ -78,8 +79,9 @@ func TestListVolumesForPod(t *testing.T) {
|
||||
},
|
||||
})
|
||||
|
||||
stopCh := runVolumeManager(kubelet)
|
||||
defer close(stopCh)
|
||||
tCtx := ktesting.Init(t)
|
||||
defer tCtx.Cancel("test has completed")
|
||||
go kubelet.volumeManager.Run(tCtx, kubelet.sourcesReady)
|
||||
|
||||
kubelet.podManager.SetPods([]*v1.Pod{pod})
|
||||
err := kubelet.volumeManager.WaitForAttachAndMount(context.Background(), pod)
|
||||
@ -196,8 +198,9 @@ func TestPodVolumesExist(t *testing.T) {
|
||||
},
|
||||
}
|
||||
|
||||
stopCh := runVolumeManager(kubelet)
|
||||
defer close(stopCh)
|
||||
tCtx := ktesting.Init(t)
|
||||
defer tCtx.Cancel("test has completed")
|
||||
go kubelet.volumeManager.Run(tCtx, kubelet.sourcesReady)
|
||||
|
||||
kubelet.podManager.SetPods(pods)
|
||||
for _, pod := range pods {
|
||||
@ -255,8 +258,9 @@ func TestPodVolumeDeadlineAttachAndMount(t *testing.T) {
|
||||
},
|
||||
}
|
||||
|
||||
stopCh := runVolumeManager(kubelet)
|
||||
defer close(stopCh)
|
||||
tCtx := ktesting.Init(t)
|
||||
defer tCtx.Cancel("test has completed")
|
||||
go kubelet.volumeManager.Run(tCtx, kubelet.sourcesReady)
|
||||
|
||||
kubelet.podManager.SetPods(pods)
|
||||
for _, pod := range pods {
|
||||
@ -316,8 +320,9 @@ func TestPodVolumeDeadlineUnmount(t *testing.T) {
|
||||
},
|
||||
}
|
||||
|
||||
stopCh := runVolumeManager(kubelet)
|
||||
defer close(stopCh)
|
||||
tCtx := ktesting.Init(t)
|
||||
defer tCtx.Cancel("test has completed")
|
||||
go kubelet.volumeManager.Run(tCtx, kubelet.sourcesReady)
|
||||
|
||||
kubelet.podManager.SetPods(pods)
|
||||
for i, pod := range pods {
|
||||
@ -369,8 +374,9 @@ func TestVolumeAttachAndMountControllerDisabled(t *testing.T) {
|
||||
},
|
||||
})
|
||||
|
||||
stopCh := runVolumeManager(kubelet)
|
||||
defer close(stopCh)
|
||||
tCtx := ktesting.Init(t)
|
||||
defer tCtx.Cancel("test has completed")
|
||||
go kubelet.volumeManager.Run(tCtx, kubelet.sourcesReady)
|
||||
|
||||
kubelet.podManager.SetPods([]*v1.Pod{pod})
|
||||
err := kubelet.volumeManager.WaitForAttachAndMount(context.Background(), pod)
|
||||
@ -428,8 +434,9 @@ func TestVolumeUnmountAndDetachControllerDisabled(t *testing.T) {
|
||||
},
|
||||
})
|
||||
|
||||
stopCh := runVolumeManager(kubelet)
|
||||
defer close(stopCh)
|
||||
tCtx := ktesting.Init(t)
|
||||
defer tCtx.Cancel("test has completed")
|
||||
go kubelet.volumeManager.Run(tCtx, kubelet.sourcesReady)
|
||||
|
||||
// Add pod
|
||||
kubelet.podManager.SetPods([]*v1.Pod{pod})
|
||||
@ -534,15 +541,16 @@ func TestVolumeAttachAndMountControllerEnabled(t *testing.T) {
|
||||
},
|
||||
})
|
||||
|
||||
stopCh := runVolumeManager(kubelet)
|
||||
defer close(stopCh)
|
||||
tCtx := ktesting.Init(t)
|
||||
defer tCtx.Cancel("test has completed")
|
||||
go kubelet.volumeManager.Run(tCtx, kubelet.sourcesReady)
|
||||
|
||||
kubelet.podManager.SetPods([]*v1.Pod{pod})
|
||||
|
||||
// Fake node status update
|
||||
go simulateVolumeInUseUpdate(
|
||||
v1.UniqueVolumeName("fake/fake-device"),
|
||||
stopCh,
|
||||
tCtx.Done(),
|
||||
kubelet.volumeManager)
|
||||
|
||||
assert.NoError(t, kubelet.volumeManager.WaitForAttachAndMount(context.Background(), pod))
|
||||
@ -618,8 +626,9 @@ func TestVolumeUnmountAndDetachControllerEnabled(t *testing.T) {
|
||||
},
|
||||
})
|
||||
|
||||
stopCh := runVolumeManager(kubelet)
|
||||
defer close(stopCh)
|
||||
tCtx := ktesting.Init(t)
|
||||
defer tCtx.Cancel("test has completed")
|
||||
go kubelet.volumeManager.Run(tCtx, kubelet.sourcesReady)
|
||||
|
||||
// Add pod
|
||||
kubelet.podManager.SetPods([]*v1.Pod{pod})
|
||||
@ -627,7 +636,7 @@ func TestVolumeUnmountAndDetachControllerEnabled(t *testing.T) {
|
||||
// Fake node status update
|
||||
go simulateVolumeInUseUpdate(
|
||||
v1.UniqueVolumeName("fake/fake-device"),
|
||||
stopCh,
|
||||
tCtx.Done(),
|
||||
kubelet.volumeManager)
|
||||
|
||||
// Verify volumes attached
|
||||
|
@ -50,7 +50,7 @@ import (
|
||||
// if it has volumes. It also verifies that the pods in the desired state of the
|
||||
// world cache still exist, if not, it removes them.
|
||||
type DesiredStateOfWorldPopulator interface {
|
||||
Run(sourcesReady config.SourcesReady, stopCh <-chan struct{})
|
||||
Run(ctx context.Context, sourcesReady config.SourcesReady)
|
||||
|
||||
// ReprocessPod sets value for the specified pod in processedPods
|
||||
// to false, forcing it to be reprocessed. This is required to enable
|
||||
@ -141,21 +141,22 @@ type processedPods struct {
|
||||
sync.RWMutex
|
||||
}
|
||||
|
||||
func (dswp *desiredStateOfWorldPopulator) Run(sourcesReady config.SourcesReady, stopCh <-chan struct{}) {
|
||||
func (dswp *desiredStateOfWorldPopulator) Run(ctx context.Context, sourcesReady config.SourcesReady) {
|
||||
// Wait for the completion of a loop that started after sources are all ready, then set hasAddedPods accordingly
|
||||
klog.InfoS("Desired state populator starts to run")
|
||||
wait.PollUntil(dswp.loopSleepDuration, func() (bool, error) {
|
||||
logger := klog.FromContext(ctx)
|
||||
logger.Info("Desired state populator starts to run")
|
||||
_ = wait.PollUntilContextCancel(ctx, dswp.loopSleepDuration, false, func(ctx context.Context) (bool, error) {
|
||||
done := sourcesReady.AllReady()
|
||||
dswp.populatorLoop()
|
||||
dswp.populatorLoop(ctx)
|
||||
return done, nil
|
||||
}, stopCh)
|
||||
})
|
||||
dswp.hasAddedPodsLock.Lock()
|
||||
if !dswp.hasAddedPods {
|
||||
klog.InfoS("Finished populating initial desired state of world")
|
||||
logger.Info("Finished populating initial desired state of world")
|
||||
dswp.hasAddedPods = true
|
||||
}
|
||||
dswp.hasAddedPodsLock.Unlock()
|
||||
wait.Until(dswp.populatorLoop, dswp.loopSleepDuration, stopCh)
|
||||
wait.UntilWithContext(ctx, dswp.populatorLoop, dswp.loopSleepDuration)
|
||||
}
|
||||
|
||||
func (dswp *desiredStateOfWorldPopulator) ReprocessPod(
|
||||
@ -169,14 +170,14 @@ func (dswp *desiredStateOfWorldPopulator) HasAddedPods() bool {
|
||||
return dswp.hasAddedPods
|
||||
}
|
||||
|
||||
func (dswp *desiredStateOfWorldPopulator) populatorLoop() {
|
||||
dswp.findAndAddNewPods()
|
||||
func (dswp *desiredStateOfWorldPopulator) populatorLoop(ctx context.Context) {
|
||||
dswp.findAndAddNewPods(ctx)
|
||||
dswp.findAndRemoveDeletedPods()
|
||||
}
|
||||
|
||||
// Iterate through all pods and add to desired state of world if they don't
|
||||
// exist but should
|
||||
func (dswp *desiredStateOfWorldPopulator) findAndAddNewPods() {
|
||||
func (dswp *desiredStateOfWorldPopulator) findAndAddNewPods(ctx context.Context) {
|
||||
// Map unique pod name to outer volume name to MountedVolume.
|
||||
mountedVolumesForPod := make(map[volumetypes.UniquePodName]map[string]cache.MountedVolume)
|
||||
for _, mountedVolume := range dswp.actualStateOfWorld.GetMountedVolumes() {
|
||||
@ -201,7 +202,7 @@ func (dswp *desiredStateOfWorldPopulator) findAndAddNewPods() {
|
||||
continue
|
||||
}
|
||||
|
||||
dswp.processPodVolumes(pod, mountedVolumesForPod)
|
||||
dswp.processPodVolumes(ctx, pod, mountedVolumesForPod)
|
||||
}
|
||||
}
|
||||
|
||||
@ -284,12 +285,14 @@ func (dswp *desiredStateOfWorldPopulator) findAndRemoveDeletedPods() {
|
||||
// processPodVolumes processes the volumes in the given pod and adds them to the
|
||||
// desired state of the world.
|
||||
func (dswp *desiredStateOfWorldPopulator) processPodVolumes(
|
||||
ctx context.Context,
|
||||
pod *v1.Pod,
|
||||
mountedVolumesForPod map[volumetypes.UniquePodName]map[string]cache.MountedVolume) {
|
||||
if pod == nil {
|
||||
return
|
||||
}
|
||||
|
||||
logger := klog.FromContext(ctx)
|
||||
uniquePodName := util.GetUniquePodName(pod)
|
||||
if dswp.podPreviouslyProcessed(uniquePodName) {
|
||||
return
|
||||
@ -302,14 +305,14 @@ func (dswp *desiredStateOfWorldPopulator) processPodVolumes(
|
||||
for _, podVolume := range pod.Spec.Volumes {
|
||||
if !mounts.Has(podVolume.Name) && !devices.Has(podVolume.Name) {
|
||||
// Volume is not used in the pod, ignore it.
|
||||
klog.V(4).InfoS("Skipping unused volume", "pod", klog.KObj(pod), "volumeName", podVolume.Name)
|
||||
logger.V(4).Info("Skipping unused volume", "pod", klog.KObj(pod), "volumeName", podVolume.Name)
|
||||
continue
|
||||
}
|
||||
|
||||
pvc, volumeSpec, volumeGidValue, err :=
|
||||
dswp.createVolumeSpec(podVolume, pod, mounts, devices)
|
||||
dswp.createVolumeSpec(logger, podVolume, pod, mounts, devices)
|
||||
if err != nil {
|
||||
klog.ErrorS(err, "Error processing volume", "pod", klog.KObj(pod), "volumeName", podVolume.Name)
|
||||
logger.Error(err, "Error processing volume", "pod", klog.KObj(pod), "volumeName", podVolume.Name)
|
||||
dswp.desiredStateOfWorld.AddErrorToPod(uniquePodName, err.Error())
|
||||
allVolumesAdded = false
|
||||
continue
|
||||
@ -319,11 +322,11 @@ func (dswp *desiredStateOfWorldPopulator) processPodVolumes(
|
||||
_, err = dswp.desiredStateOfWorld.AddPodToVolume(
|
||||
uniquePodName, pod, volumeSpec, podVolume.Name, volumeGidValue, seLinuxContainerContexts[podVolume.Name])
|
||||
if err != nil {
|
||||
klog.ErrorS(err, "Failed to add volume to desiredStateOfWorld", "pod", klog.KObj(pod), "volumeName", podVolume.Name, "volumeSpecName", volumeSpec.Name())
|
||||
logger.Error(err, "Failed to add volume to desiredStateOfWorld", "pod", klog.KObj(pod), "volumeName", podVolume.Name, "volumeSpecName", volumeSpec.Name())
|
||||
dswp.desiredStateOfWorld.AddErrorToPod(uniquePodName, err.Error())
|
||||
allVolumesAdded = false
|
||||
} else {
|
||||
klog.V(4).InfoS("Added volume to desired state", "pod", klog.KObj(pod), "volumeName", podVolume.Name, "volumeSpecName", volumeSpec.Name())
|
||||
logger.V(4).Info("Added volume to desired state", "pod", klog.KObj(pod), "volumeName", podVolume.Name, "volumeSpecName", volumeSpec.Name())
|
||||
}
|
||||
|
||||
dswp.checkVolumeFSResize(pod, podVolume, pvc, volumeSpec, uniquePodName, mountedVolumesForPod)
|
||||
@ -456,7 +459,7 @@ func (dswp *desiredStateOfWorldPopulator) deleteProcessedPod(
|
||||
// specified volume. It dereference any PVC to get PV objects, if needed.
|
||||
// Returns an error if unable to obtain the volume at this time.
|
||||
func (dswp *desiredStateOfWorldPopulator) createVolumeSpec(
|
||||
podVolume v1.Volume, pod *v1.Pod, mounts, devices sets.Set[string]) (*v1.PersistentVolumeClaim, *volume.Spec, string, error) {
|
||||
logger klog.Logger, podVolume v1.Volume, pod *v1.Pod, mounts, devices sets.Set[string]) (*v1.PersistentVolumeClaim, *volume.Spec, string, error) {
|
||||
pvcSource := podVolume.VolumeSource.PersistentVolumeClaim
|
||||
isEphemeral := pvcSource == nil && podVolume.VolumeSource.Ephemeral != nil
|
||||
if isEphemeral {
|
||||
@ -469,7 +472,7 @@ func (dswp *desiredStateOfWorldPopulator) createVolumeSpec(
|
||||
}
|
||||
}
|
||||
if pvcSource != nil {
|
||||
klog.V(5).InfoS("Found PVC", "PVC", klog.KRef(pod.Namespace, pvcSource.ClaimName))
|
||||
logger.V(5).Info("Found PVC", "PVC", klog.KRef(pod.Namespace, pvcSource.ClaimName))
|
||||
// If podVolume is a PVC, fetch the real PV behind the claim
|
||||
pvc, err := dswp.getPVCExtractPV(
|
||||
pod.Namespace, pvcSource.ClaimName)
|
||||
@ -486,7 +489,7 @@ func (dswp *desiredStateOfWorldPopulator) createVolumeSpec(
|
||||
}
|
||||
}
|
||||
pvName, pvcUID := pvc.Spec.VolumeName, pvc.UID
|
||||
klog.V(5).InfoS("Found bound PV for PVC", "PVC", klog.KRef(pod.Namespace, pvcSource.ClaimName), "PVCUID", pvcUID, "PVName", pvName)
|
||||
logger.V(5).Info("Found bound PV for PVC", "PVC", klog.KRef(pod.Namespace, pvcSource.ClaimName), "PVCUID", pvcUID, "PVName", pvName)
|
||||
// Fetch actual PV object
|
||||
volumeSpec, volumeGidValue, err :=
|
||||
dswp.getPVSpec(pvName, pvcSource.ReadOnly, pvcUID)
|
||||
@ -497,13 +500,13 @@ func (dswp *desiredStateOfWorldPopulator) createVolumeSpec(
|
||||
pvcSource.ClaimName,
|
||||
err)
|
||||
}
|
||||
klog.V(5).InfoS("Extracted volumeSpec from bound PV and PVC", "PVC", klog.KRef(pod.Namespace, pvcSource.ClaimName), "PVCUID", pvcUID, "PVName", pvName, "volumeSpecName", volumeSpec.Name())
|
||||
logger.V(5).Info("Extracted volumeSpec from bound PV and PVC", "PVC", klog.KRef(pod.Namespace, pvcSource.ClaimName), "PVCUID", pvcUID, "PVName", pvName, "volumeSpecName", volumeSpec.Name())
|
||||
migratable, err := dswp.csiMigratedPluginManager.IsMigratable(volumeSpec)
|
||||
if err != nil {
|
||||
return nil, nil, "", err
|
||||
}
|
||||
if migratable {
|
||||
volumeSpec, err = csimigration.TranslateInTreeSpecToCSI(volumeSpec, pod.Namespace, dswp.intreeToCSITranslator)
|
||||
volumeSpec, err = csimigration.TranslateInTreeSpecToCSI(logger, volumeSpec, pod.Namespace, dswp.intreeToCSITranslator)
|
||||
if err != nil {
|
||||
return nil, nil, "", err
|
||||
}
|
||||
@ -539,7 +542,7 @@ func (dswp *desiredStateOfWorldPopulator) createVolumeSpec(
|
||||
return nil, nil, "", err
|
||||
}
|
||||
if migratable {
|
||||
spec, err = csimigration.TranslateInTreeSpecToCSI(spec, pod.Namespace, dswp.intreeToCSITranslator)
|
||||
spec, err = csimigration.TranslateInTreeSpecToCSI(logger, spec, pod.Namespace, dswp.intreeToCSITranslator)
|
||||
if err != nil {
|
||||
return nil, nil, "", err
|
||||
}
|
||||
|
@ -17,14 +17,11 @@ limitations under the License.
|
||||
package populator
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"k8s.io/klog/v2/ktesting"
|
||||
"k8s.io/utils/ptr"
|
||||
|
||||
"fmt"
|
||||
|
||||
"github.com/stretchr/testify/require"
|
||||
v1 "k8s.io/api/core/v1"
|
||||
"k8s.io/apimachinery/pkg/api/resource"
|
||||
@ -46,6 +43,8 @@ import (
|
||||
"k8s.io/kubernetes/pkg/volume/util"
|
||||
"k8s.io/kubernetes/pkg/volume/util/operationexecutor"
|
||||
"k8s.io/kubernetes/pkg/volume/util/types"
|
||||
"k8s.io/kubernetes/test/utils/ktesting"
|
||||
"k8s.io/utils/ptr"
|
||||
)
|
||||
|
||||
const (
|
||||
@ -165,7 +164,8 @@ func TestFindAndAddNewPods_WithDifferentConditions(t *testing.T) {
|
||||
|
||||
dswp.hasAddedPods = tc.hasAddedPods
|
||||
// Action
|
||||
dswp.findAndAddNewPods()
|
||||
tCtx := ktesting.Init(t)
|
||||
dswp.findAndAddNewPods(tCtx)
|
||||
|
||||
// Verify
|
||||
podsInDSW := dswp.desiredStateOfWorld.GetPods()
|
||||
@ -204,7 +204,8 @@ func TestFindAndAddNewPods_WithReprocessPodAndVolumeRetrievalError(t *testing.T)
|
||||
|
||||
podName := util.GetUniquePodName(pod)
|
||||
|
||||
dswp.findAndAddNewPods()
|
||||
tCtx := ktesting.Init(t)
|
||||
dswp.findAndAddNewPods(tCtx)
|
||||
|
||||
if !dswp.podPreviouslyProcessed(podName) {
|
||||
t.Fatalf("Failed to record that the volumes for the specified pod: %s have been processed by the populator", podName)
|
||||
@ -212,7 +213,7 @@ func TestFindAndAddNewPods_WithReprocessPodAndVolumeRetrievalError(t *testing.T)
|
||||
pluginPVOmittingClient(dswp)
|
||||
|
||||
dswp.ReprocessPod(podName)
|
||||
dswp.findAndAddNewPods()
|
||||
dswp.findAndAddNewPods(tCtx)
|
||||
|
||||
if !dswp.podPreviouslyProcessed(podName) {
|
||||
t.Fatalf("Failed to record that the volumes for the specified pod: %s have been processed by the populator", podName)
|
||||
@ -243,7 +244,8 @@ func TestFindAndAddNewPods_WithVolumeRetrievalError(t *testing.T) {
|
||||
|
||||
podName := util.GetUniquePodName(pod)
|
||||
|
||||
dswp.findAndAddNewPods()
|
||||
tCtx := ktesting.Init(t)
|
||||
dswp.findAndAddNewPods(tCtx)
|
||||
|
||||
if dswp.podPreviouslyProcessed(podName) {
|
||||
t.Fatalf("The volumes for the specified pod: %s should not have been processed by the populator", podName)
|
||||
@ -485,7 +487,8 @@ func prepareDSWPWithPodPV(t *testing.T) (*desiredStateOfWorldPopulator, *fakePod
|
||||
|
||||
generatedVolumeName := "fake-plugin/" + pod.Spec.Volumes[0].Name
|
||||
|
||||
dswp.findAndAddNewPods()
|
||||
tCtx := ktesting.Init(t)
|
||||
dswp.findAndAddNewPods(tCtx)
|
||||
|
||||
if !dswp.pods.processedPods[podName] {
|
||||
t.Fatalf("Failed to record that the volumes for the specified pod: %s have been processed by the populator", podName)
|
||||
@ -535,7 +538,7 @@ func TestFindAndRemoveNonattachableVolumes(t *testing.T) {
|
||||
}
|
||||
|
||||
fakeVolumePluginMgr, fakeVolumePlugin := volumetesting.GetTestKubeletVolumePluginMgr(t)
|
||||
dswp, fakePodManager, fakesDSW, _, _ := createDswpWithVolumeWithCustomPluginMgr(t, pv, pvc, fakeVolumePluginMgr)
|
||||
dswp, fakePodManager, fakesDSW, _, _ := createDswpWithVolumeWithCustomPluginMgr(pv, pvc, fakeVolumePluginMgr)
|
||||
|
||||
// create pod
|
||||
containers := []v1.Container{
|
||||
@ -556,7 +559,8 @@ func TestFindAndRemoveNonattachableVolumes(t *testing.T) {
|
||||
|
||||
generatedVolumeName := "fake-plugin/" + pod.Spec.Volumes[0].Name
|
||||
|
||||
dswp.findAndAddNewPods()
|
||||
tCtx := ktesting.Init(t)
|
||||
dswp.findAndAddNewPods(tCtx)
|
||||
|
||||
if !dswp.pods.processedPods[podName] {
|
||||
t.Fatalf("Failed to record that the volumes for the specified pod: %s have been processed by the populator", podName)
|
||||
@ -602,7 +606,8 @@ func TestEphemeralVolumeOwnerCheck(t *testing.T) {
|
||||
|
||||
podName := util.GetUniquePodName(pod)
|
||||
|
||||
dswp.findAndAddNewPods()
|
||||
tCtx := ktesting.Init(t)
|
||||
dswp.findAndAddNewPods(tCtx)
|
||||
if dswp.pods.processedPods[podName] {
|
||||
t.Fatalf("%s should not have been processed by the populator", podName)
|
||||
}
|
||||
@ -656,7 +661,8 @@ func TestFindAndAddNewPods_FindAndRemoveDeletedPods_Valid_Block_VolumeDevices(t
|
||||
|
||||
generatedVolumeName := "fake-plugin/" + pod.Spec.Volumes[0].Name
|
||||
|
||||
dswp.findAndAddNewPods()
|
||||
tCtx := ktesting.Init(t)
|
||||
dswp.findAndAddNewPods(tCtx)
|
||||
|
||||
if !dswp.pods.processedPods[podName] {
|
||||
t.Fatalf("Failed to record that the volumes for the specified pod: %s have been processed by the populator", podName)
|
||||
@ -758,10 +764,11 @@ func TestCreateVolumeSpec_Valid_File_VolumeMounts(t *testing.T) {
|
||||
}
|
||||
pod := createPodWithVolume("dswp-test-pod", "dswp-test-volume-name", "file-bound", containers)
|
||||
|
||||
logger, _ := ktesting.NewTestContext(t)
|
||||
fakePodManager.AddPod(pod)
|
||||
mountsMap, devicesMap, _ := util.GetPodVolumeNames(pod)
|
||||
_, volumeSpec, _, err :=
|
||||
dswp.createVolumeSpec(pod.Spec.Volumes[0], pod, mountsMap, devicesMap)
|
||||
dswp.createVolumeSpec(logger, pod.Spec.Volumes[0], pod, mountsMap, devicesMap)
|
||||
|
||||
// Assert
|
||||
if volumeSpec == nil || err != nil {
|
||||
@ -804,10 +811,11 @@ func TestCreateVolumeSpec_Valid_Nil_VolumeMounts(t *testing.T) {
|
||||
}
|
||||
pod := createPodWithVolume("dswp-test-pod", "dswp-test-volume-name", "file-bound", containers)
|
||||
|
||||
logger, _ := ktesting.NewTestContext(t)
|
||||
fakePodManager.AddPod(pod)
|
||||
mountsMap, devicesMap, _ := util.GetPodVolumeNames(pod)
|
||||
_, volumeSpec, _, err :=
|
||||
dswp.createVolumeSpec(pod.Spec.Volumes[0], pod, mountsMap, devicesMap)
|
||||
dswp.createVolumeSpec(logger, pod.Spec.Volumes[0], pod, mountsMap, devicesMap)
|
||||
|
||||
// Assert
|
||||
if volumeSpec == nil || err != nil {
|
||||
@ -850,10 +858,11 @@ func TestCreateVolumeSpec_Valid_Block_VolumeDevices(t *testing.T) {
|
||||
}
|
||||
pod := createPodWithVolume("dswp-test-pod", "dswp-test-volume-name", "block-bound", containers)
|
||||
|
||||
logger, _ := ktesting.NewTestContext(t)
|
||||
fakePodManager.AddPod(pod)
|
||||
mountsMap, devicesMap, _ := util.GetPodVolumeNames(pod)
|
||||
_, volumeSpec, _, err :=
|
||||
dswp.createVolumeSpec(pod.Spec.Volumes[0], pod, mountsMap, devicesMap)
|
||||
dswp.createVolumeSpec(logger, pod.Spec.Volumes[0], pod, mountsMap, devicesMap)
|
||||
|
||||
// Assert
|
||||
if volumeSpec == nil || err != nil {
|
||||
@ -896,10 +905,11 @@ func TestCreateVolumeSpec_Invalid_File_VolumeDevices(t *testing.T) {
|
||||
}
|
||||
pod := createPodWithVolume("dswp-test-pod", "dswp-test-volume-name", "file-bound", containers)
|
||||
|
||||
logger, _ := ktesting.NewTestContext(t)
|
||||
fakePodManager.AddPod(pod)
|
||||
mountsMap, devicesMap, _ := util.GetPodVolumeNames(pod)
|
||||
_, volumeSpec, _, err :=
|
||||
dswp.createVolumeSpec(pod.Spec.Volumes[0], pod, mountsMap, devicesMap)
|
||||
dswp.createVolumeSpec(logger, pod.Spec.Volumes[0], pod, mountsMap, devicesMap)
|
||||
|
||||
// Assert
|
||||
if volumeSpec != nil || err == nil {
|
||||
@ -942,10 +952,11 @@ func TestCreateVolumeSpec_Invalid_Block_VolumeMounts(t *testing.T) {
|
||||
}
|
||||
pod := createPodWithVolume("dswp-test-pod", "dswp-test-volume-name", "block-bound", containers)
|
||||
|
||||
logger, _ := ktesting.NewTestContext(t)
|
||||
fakePodManager.AddPod(pod)
|
||||
mountsMap, devicesMap, _ := util.GetPodVolumeNames(pod)
|
||||
_, volumeSpec, _, err :=
|
||||
dswp.createVolumeSpec(pod.Spec.Volumes[0], pod, mountsMap, devicesMap)
|
||||
dswp.createVolumeSpec(logger, pod.Spec.Volumes[0], pod, mountsMap, devicesMap)
|
||||
|
||||
// Assert
|
||||
if volumeSpec != nil || err == nil {
|
||||
@ -1104,13 +1115,14 @@ func TestCheckVolumeFSResize(t *testing.T) {
|
||||
|
||||
fakePodManager.AddPod(pod)
|
||||
// Fill the dsw to contains volumes and pods.
|
||||
dswp.findAndAddNewPods()
|
||||
tCtx := ktesting.Init(t)
|
||||
dswp.findAndAddNewPods(tCtx)
|
||||
reconcileASW(fakeASW, fakeDSW, t)
|
||||
|
||||
func() {
|
||||
tc.resize(t, pv, pvc, dswp)
|
||||
|
||||
resizeRequiredVolumes := reprocess(dswp, uniquePodName, fakeDSW, fakeASW, *pv.Spec.Capacity.Storage())
|
||||
resizeRequiredVolumes := reprocess(tCtx, dswp, uniquePodName, fakeDSW, fakeASW, *pv.Spec.Capacity.Storage())
|
||||
|
||||
tc.verify(t, resizeRequiredVolumes, uniqueVolumeName)
|
||||
}()
|
||||
@ -1279,8 +1291,9 @@ func TestCheckVolumeSELinux(t *testing.T) {
|
||||
|
||||
fakeVolumePluginMgr, plugin := volumetesting.GetTestKubeletVolumePluginMgr(t)
|
||||
plugin.SupportsSELinux = tc.pluginSupportsSELinux
|
||||
dswp, fakePodManager, fakeDSW, _, _ := createDswpWithVolumeWithCustomPluginMgr(t, pv, pvc, fakeVolumePluginMgr)
|
||||
dswp, fakePodManager, fakeDSW, _, _ := createDswpWithVolumeWithCustomPluginMgr(pv, pvc, fakeVolumePluginMgr)
|
||||
|
||||
tCtx := ktesting.Init(t)
|
||||
var existingPod *v1.Pod
|
||||
if tc.existingContainerSELinuxOpts != nil {
|
||||
// Add existing pod + volume
|
||||
@ -1288,7 +1301,7 @@ func TestCheckVolumeSELinux(t *testing.T) {
|
||||
existingContainer.SecurityContext.SELinuxOptions = tc.existingContainerSELinuxOpts
|
||||
existingPod = createPodWithVolume("dswp-old-pod", "dswp-test-volume-name", "file-bound", []v1.Container{existingContainer})
|
||||
fakePodManager.AddPod(existingPod)
|
||||
dswp.findAndAddNewPods()
|
||||
dswp.findAndAddNewPods(tCtx)
|
||||
}
|
||||
|
||||
newContainer := container
|
||||
@ -1297,7 +1310,7 @@ func TestCheckVolumeSELinux(t *testing.T) {
|
||||
|
||||
// Act - add the new Pod
|
||||
fakePodManager.AddPod(newPod)
|
||||
dswp.findAndAddNewPods()
|
||||
dswp.findAndAddNewPods(tCtx)
|
||||
|
||||
// Assert
|
||||
|
||||
@ -1402,10 +1415,10 @@ func clearASW(asw cache.ActualStateOfWorld, dsw cache.DesiredStateOfWorld, t *te
|
||||
}
|
||||
}
|
||||
|
||||
func reprocess(dswp *desiredStateOfWorldPopulator, uniquePodName types.UniquePodName,
|
||||
func reprocess(ctx context.Context, dswp *desiredStateOfWorldPopulator, uniquePodName types.UniquePodName,
|
||||
dsw cache.DesiredStateOfWorld, asw cache.ActualStateOfWorld, newSize resource.Quantity) []v1.UniqueVolumeName {
|
||||
dswp.ReprocessPod(uniquePodName)
|
||||
dswp.findAndAddNewPods()
|
||||
dswp.findAndAddNewPods(ctx)
|
||||
return getResizeRequiredVolumes(dsw, asw, newSize)
|
||||
}
|
||||
|
||||
@ -1544,7 +1557,7 @@ func createEphemeralVolumeObjects(podName, volumeName string, owned bool, volume
|
||||
|
||||
func createDswpWithVolume(t *testing.T, pv *v1.PersistentVolume, pvc *v1.PersistentVolumeClaim) (*desiredStateOfWorldPopulator, kubepod.Manager, cache.DesiredStateOfWorld, *containertest.FakeRuntime, *fakePodStateProvider) {
|
||||
fakeVolumePluginMgr, _ := volumetesting.GetTestKubeletVolumePluginMgr(t)
|
||||
dswp, fakePodManager, fakesDSW, fakeRuntime, fakeStateProvider := createDswpWithVolumeWithCustomPluginMgr(t, pv, pvc, fakeVolumePluginMgr)
|
||||
dswp, fakePodManager, fakesDSW, fakeRuntime, fakeStateProvider := createDswpWithVolumeWithCustomPluginMgr(pv, pvc, fakeVolumePluginMgr)
|
||||
return dswp, fakePodManager, fakesDSW, fakeRuntime, fakeStateProvider
|
||||
}
|
||||
|
||||
@ -1567,7 +1580,7 @@ func (p *fakePodStateProvider) ShouldPodRuntimeBeRemoved(uid kubetypes.UID) bool
|
||||
return ok
|
||||
}
|
||||
|
||||
func createDswpWithVolumeWithCustomPluginMgr(t *testing.T, pv *v1.PersistentVolume, pvc *v1.PersistentVolumeClaim,
|
||||
func createDswpWithVolumeWithCustomPluginMgr(pv *v1.PersistentVolume, pvc *v1.PersistentVolumeClaim,
|
||||
fakeVolumePluginMgr *volume.VolumePluginMgr) (*desiredStateOfWorldPopulator, kubepod.Manager, cache.DesiredStateOfWorld, *containertest.FakeRuntime, *fakePodStateProvider) {
|
||||
fakeClient := &fake.Clientset{}
|
||||
fakeClient.AddReactor("get", "persistentvolumeclaims", func(action core.Action) (bool, runtime.Object, error) {
|
||||
|
@ -39,6 +39,8 @@ import (
|
||||
core "k8s.io/client-go/testing"
|
||||
"k8s.io/client-go/tools/record"
|
||||
"k8s.io/klog/v2"
|
||||
"k8s.io/klog/v2/ktesting"
|
||||
_ "k8s.io/klog/v2/ktesting/init"
|
||||
"k8s.io/kubernetes/pkg/kubelet/volumemanager/cache"
|
||||
"k8s.io/kubernetes/pkg/volume"
|
||||
volumetesting "k8s.io/kubernetes/pkg/volume/testing"
|
||||
@ -188,6 +190,7 @@ func Test_Run_Positive_VolumeAttachAndMount(t *testing.T) {
|
||||
// Verifies there is are attach/mount/etc calls and no detach/unmount calls.
|
||||
func Test_Run_Positive_VolumeAttachAndMountMigrationEnabled(t *testing.T) {
|
||||
// Arrange
|
||||
logger, _ := ktesting.NewTestContext(t)
|
||||
intreeToCSITranslator := csitrans.New()
|
||||
node := &v1.Node{
|
||||
ObjectMeta: metav1.ObjectMeta{
|
||||
@ -254,7 +257,7 @@ func Test_Run_Positive_VolumeAttachAndMountMigrationEnabled(t *testing.T) {
|
||||
}
|
||||
|
||||
volumeSpec := &volume.Spec{Volume: &pod.Spec.Volumes[0]}
|
||||
migratedSpec, err := csimigration.TranslateInTreeSpecToCSI(volumeSpec, pod.Namespace, intreeToCSITranslator)
|
||||
migratedSpec, err := csimigration.TranslateInTreeSpecToCSI(logger, volumeSpec, pod.Namespace, intreeToCSITranslator)
|
||||
if err != nil {
|
||||
t.Fatalf("unexpected error while translating spec %v: %v", volumeSpec, err)
|
||||
}
|
||||
|
@ -90,7 +90,7 @@ const (
|
||||
// this node and makes it so.
|
||||
type VolumeManager interface {
|
||||
// Starts the volume manager and all the asynchronous loops that it controls
|
||||
Run(sourcesReady config.SourcesReady, stopCh <-chan struct{})
|
||||
Run(ctx context.Context, sourcesReady config.SourcesReady)
|
||||
|
||||
// WaitForAttachAndMount processes the volumes referenced in the specified
|
||||
// pod and blocks until they are all attached and mounted (reflected in
|
||||
@ -275,23 +275,23 @@ type volumeManager struct {
|
||||
intreeToCSITranslator csimigration.InTreeToCSITranslator
|
||||
}
|
||||
|
||||
func (vm *volumeManager) Run(sourcesReady config.SourcesReady, stopCh <-chan struct{}) {
|
||||
func (vm *volumeManager) Run(ctx context.Context, sourcesReady config.SourcesReady) {
|
||||
defer runtime.HandleCrash()
|
||||
|
||||
if vm.kubeClient != nil {
|
||||
// start informer for CSIDriver
|
||||
go vm.volumePluginMgr.Run(stopCh)
|
||||
go vm.volumePluginMgr.Run(ctx.Done())
|
||||
}
|
||||
|
||||
go vm.desiredStateOfWorldPopulator.Run(sourcesReady, stopCh)
|
||||
go vm.desiredStateOfWorldPopulator.Run(ctx, sourcesReady)
|
||||
klog.V(2).InfoS("The desired_state_of_world populator starts")
|
||||
|
||||
klog.InfoS("Starting Kubelet Volume Manager")
|
||||
go vm.reconciler.Run(stopCh)
|
||||
go vm.reconciler.Run(ctx.Done())
|
||||
|
||||
metrics.Register(vm.actualStateOfWorld, vm.desiredStateOfWorld, vm.volumePluginMgr)
|
||||
|
||||
<-stopCh
|
||||
<-ctx.Done()
|
||||
klog.InfoS("Shutting down Kubelet Volume Manager")
|
||||
}
|
||||
|
||||
|
@ -44,7 +44,7 @@ func NewFakeVolumeManager(initialVolumes []v1.UniqueVolumeName) *FakeVolumeManag
|
||||
}
|
||||
|
||||
// Run is not implemented
|
||||
func (f *FakeVolumeManager) Run(sourcesReady config.SourcesReady, stopCh <-chan struct{}) {
|
||||
func (f *FakeVolumeManager) Run(ctx context.Context, sourcesReady config.SourcesReady) {
|
||||
}
|
||||
|
||||
// WaitForAttachAndMount is not implemented
|
||||
|
@ -25,8 +25,6 @@ import (
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"k8s.io/mount-utils"
|
||||
|
||||
v1 "k8s.io/api/core/v1"
|
||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||
kubetypes "k8s.io/apimachinery/pkg/types"
|
||||
@ -44,6 +42,8 @@ import (
|
||||
"k8s.io/kubernetes/pkg/volume/util"
|
||||
"k8s.io/kubernetes/pkg/volume/util/hostutil"
|
||||
"k8s.io/kubernetes/pkg/volume/util/types"
|
||||
"k8s.io/kubernetes/test/utils/ktesting"
|
||||
"k8s.io/mount-utils"
|
||||
)
|
||||
|
||||
const (
|
||||
@ -94,15 +94,17 @@ func TestGetMountedVolumesForPodAndGetVolumesInUse(t *testing.T) {
|
||||
|
||||
manager := newTestVolumeManager(t, tmpDir, podManager, kubeClient, node)
|
||||
|
||||
stopCh := runVolumeManager(manager)
|
||||
defer close(stopCh)
|
||||
tCtx := ktesting.Init(t)
|
||||
defer tCtx.Cancel("test has completed")
|
||||
sourcesReady := config.NewSourcesReady(func(_ sets.Set[string]) bool { return true })
|
||||
go manager.Run(tCtx, sourcesReady)
|
||||
|
||||
podManager.SetPods([]*v1.Pod{pod})
|
||||
|
||||
// Fake node status update
|
||||
go simulateVolumeInUseUpdate(
|
||||
v1.UniqueVolumeName(node.Status.VolumesAttached[0].Name),
|
||||
stopCh,
|
||||
tCtx.Done(),
|
||||
manager)
|
||||
|
||||
err = manager.WaitForAttachAndMount(context.Background(), pod)
|
||||
@ -218,8 +220,10 @@ func TestWaitForAttachAndMountError(t *testing.T) {
|
||||
|
||||
manager := newTestVolumeManager(t, tmpDir, podManager, kubeClient, nil)
|
||||
|
||||
stopCh := runVolumeManager(manager)
|
||||
defer close(stopCh)
|
||||
tCtx := ktesting.Init(t)
|
||||
defer tCtx.Cancel("test has completed")
|
||||
sourcesReady := config.NewSourcesReady(func(_ sets.Set[string]) bool { return true })
|
||||
go manager.Run(tCtx, sourcesReady)
|
||||
|
||||
podManager.SetPods([]*v1.Pod{pod})
|
||||
|
||||
@ -250,15 +254,17 @@ func TestInitialPendingVolumesForPodAndGetVolumesInUse(t *testing.T) {
|
||||
|
||||
manager := newTestVolumeManager(t, tmpDir, podManager, kubeClient, node)
|
||||
|
||||
stopCh := runVolumeManager(manager)
|
||||
defer close(stopCh)
|
||||
tCtx := ktesting.Init(t)
|
||||
defer tCtx.Cancel("test has completed")
|
||||
sourcesReady := config.NewSourcesReady(func(_ sets.Set[string]) bool { return true })
|
||||
go manager.Run(tCtx, sourcesReady)
|
||||
|
||||
podManager.SetPods([]*v1.Pod{pod})
|
||||
|
||||
// Fake node status update
|
||||
go simulateVolumeInUseUpdate(
|
||||
v1.UniqueVolumeName(node.Status.VolumesAttached[0].Name),
|
||||
stopCh,
|
||||
tCtx.Done(),
|
||||
manager)
|
||||
|
||||
// delayed claim binding
|
||||
@ -338,15 +344,17 @@ func TestGetExtraSupplementalGroupsForPod(t *testing.T) {
|
||||
|
||||
manager := newTestVolumeManager(t, tmpDir, podManager, kubeClient, node)
|
||||
|
||||
stopCh := runVolumeManager(manager)
|
||||
defer close(stopCh)
|
||||
tCtx := ktesting.Init(t)
|
||||
defer tCtx.Cancel("test has completed")
|
||||
sourcesReady := config.NewSourcesReady(func(_ sets.Set[string]) bool { return true })
|
||||
go manager.Run(tCtx, sourcesReady)
|
||||
|
||||
podManager.SetPods([]*v1.Pod{pod})
|
||||
|
||||
// Fake node status update
|
||||
go simulateVolumeInUseUpdate(
|
||||
v1.UniqueVolumeName(node.Status.VolumesAttached[0].Name),
|
||||
stopCh,
|
||||
tCtx.Done(),
|
||||
manager)
|
||||
|
||||
err = manager.WaitForAttachAndMount(context.Background(), pod)
|
||||
@ -537,12 +545,3 @@ func delayClaimBecomesBound(
|
||||
}
|
||||
kubeClient.CoreV1().PersistentVolumeClaims(namespace).Update(context.TODO(), volumeClaim, metav1.UpdateOptions{})
|
||||
}
|
||||
|
||||
func runVolumeManager(manager VolumeManager) chan struct{} {
|
||||
stopCh := make(chan struct{})
|
||||
//readyCh := make(chan bool, 1)
|
||||
//readyCh <- true
|
||||
sourcesReady := config.NewSourcesReady(func(_ sets.Set[string]) bool { return true })
|
||||
go manager.Run(sourcesReady, stopCh)
|
||||
return stopCh
|
||||
}
|
||||
|
@ -46,8 +46,8 @@ type InTreeToCSITranslator interface {
|
||||
IsMigratableIntreePluginByName(inTreePluginName string) bool
|
||||
GetInTreePluginNameFromSpec(pv *v1.PersistentVolume, vol *v1.Volume) (string, error)
|
||||
GetCSINameFromInTreeName(pluginName string) (string, error)
|
||||
TranslateInTreePVToCSI(pv *v1.PersistentVolume) (*v1.PersistentVolume, error)
|
||||
TranslateInTreeInlineVolumeToCSI(volume *v1.Volume, podNamespace string) (*v1.PersistentVolume, error)
|
||||
TranslateInTreePVToCSI(logger klog.Logger, pv *v1.PersistentVolume) (*v1.PersistentVolume, error)
|
||||
TranslateInTreeInlineVolumeToCSI(logger klog.Logger, volume *v1.Volume, podNamespace string) (*v1.PersistentVolume, error)
|
||||
}
|
||||
|
||||
// CSILimits is a plugin that checks node volume limits.
|
||||
@ -325,7 +325,7 @@ func (pl *CSILimits) checkAttachableInlineVolume(logger klog.Logger, vol *v1.Vol
|
||||
return nil
|
||||
}
|
||||
// Do translation for the in-tree volume.
|
||||
translatedPV, err := pl.translator.TranslateInTreeInlineVolumeToCSI(vol, pod.Namespace)
|
||||
translatedPV, err := pl.translator.TranslateInTreeInlineVolumeToCSI(logger, vol, pod.Namespace)
|
||||
if err != nil || translatedPV == nil {
|
||||
return fmt.Errorf("converting volume(%s) from inline to csi: %w", vol.Name, err)
|
||||
}
|
||||
@ -382,7 +382,7 @@ func (pl *CSILimits) getCSIDriverInfo(logger klog.Logger, csiNode *storagev1.CSI
|
||||
return "", ""
|
||||
}
|
||||
|
||||
csiPV, err := pl.translator.TranslateInTreePVToCSI(pv)
|
||||
csiPV, err := pl.translator.TranslateInTreePVToCSI(logger, pv)
|
||||
if err != nil {
|
||||
logger.V(5).Info("Unable to translate in-tree volume to CSI", "err", err)
|
||||
return "", ""
|
||||
|
@ -118,7 +118,7 @@ type PodVolumes struct {
|
||||
type InTreeToCSITranslator interface {
|
||||
IsPVMigratable(pv *v1.PersistentVolume) bool
|
||||
GetInTreePluginNameFromSpec(pv *v1.PersistentVolume, vol *v1.Volume) (string, error)
|
||||
TranslateInTreePVToCSI(pv *v1.PersistentVolume) (*v1.PersistentVolume, error)
|
||||
TranslateInTreePVToCSI(logger klog.Logger, pv *v1.PersistentVolume) (*v1.PersistentVolume, error)
|
||||
}
|
||||
|
||||
// SchedulerVolumeBinder is used by the scheduler VolumeBinding plugin to
|
||||
@ -674,7 +674,7 @@ func (b *volumeBinder) checkBindings(logger klog.Logger, pod *v1.Pod, bindings [
|
||||
return false, nil
|
||||
}
|
||||
|
||||
pv, err = b.tryTranslatePVToCSI(pv, csiNode)
|
||||
pv, err = b.tryTranslatePVToCSI(logger, pv, csiNode)
|
||||
if err != nil {
|
||||
return false, fmt.Errorf("failed to translate pv to csi: %w", err)
|
||||
}
|
||||
@ -733,7 +733,7 @@ func (b *volumeBinder) checkBindings(logger klog.Logger, pod *v1.Pod, bindings [
|
||||
return false, fmt.Errorf("failed to get pv %q from cache: %w", pvc.Spec.VolumeName, err)
|
||||
}
|
||||
|
||||
pv, err = b.tryTranslatePVToCSI(pv, csiNode)
|
||||
pv, err = b.tryTranslatePVToCSI(logger, pv, csiNode)
|
||||
if err != nil {
|
||||
return false, err
|
||||
}
|
||||
@ -882,7 +882,7 @@ func (b *volumeBinder) checkBoundClaims(logger klog.Logger, claims []*v1.Persist
|
||||
return true, false, err
|
||||
}
|
||||
|
||||
pv, err = b.tryTranslatePVToCSI(pv, csiNode)
|
||||
pv, err = b.tryTranslatePVToCSI(logger, pv, csiNode)
|
||||
if err != nil {
|
||||
return false, true, err
|
||||
}
|
||||
@ -1133,7 +1133,7 @@ func isPluginMigratedToCSIOnNode(pluginName string, csiNode *storagev1.CSINode)
|
||||
}
|
||||
|
||||
// tryTranslatePVToCSI will translate the in-tree PV to CSI if it meets the criteria. If not, it returns the unmodified in-tree PV.
|
||||
func (b *volumeBinder) tryTranslatePVToCSI(pv *v1.PersistentVolume, csiNode *storagev1.CSINode) (*v1.PersistentVolume, error) {
|
||||
func (b *volumeBinder) tryTranslatePVToCSI(logger klog.Logger, pv *v1.PersistentVolume, csiNode *storagev1.CSINode) (*v1.PersistentVolume, error) {
|
||||
if !b.translator.IsPVMigratable(pv) {
|
||||
return pv, nil
|
||||
}
|
||||
@ -1151,7 +1151,7 @@ func (b *volumeBinder) tryTranslatePVToCSI(pv *v1.PersistentVolume, csiNode *sto
|
||||
return pv, nil
|
||||
}
|
||||
|
||||
transPV, err := b.translator.TranslateInTreePVToCSI(pv)
|
||||
transPV, err := b.translator.TranslateInTreePVToCSI(logger, pv)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("could not translate pv: %v", err)
|
||||
}
|
||||
|
@ -23,6 +23,7 @@ import (
|
||||
v1 "k8s.io/api/core/v1"
|
||||
"k8s.io/component-base/featuregate"
|
||||
csilibplugins "k8s.io/csi-translation-lib/plugins"
|
||||
"k8s.io/klog/v2"
|
||||
"k8s.io/kubernetes/pkg/features"
|
||||
"k8s.io/kubernetes/pkg/volume"
|
||||
)
|
||||
@ -123,20 +124,20 @@ func (pm PluginManager) IsMigratable(spec *volume.Spec) (bool, error) {
|
||||
// InTreeToCSITranslator performs translation of Volume sources for PV and Volume objects
|
||||
// from references to in-tree plugins to migrated CSI plugins
|
||||
type InTreeToCSITranslator interface {
|
||||
TranslateInTreePVToCSI(pv *v1.PersistentVolume) (*v1.PersistentVolume, error)
|
||||
TranslateInTreeInlineVolumeToCSI(volume *v1.Volume, podNamespace string) (*v1.PersistentVolume, error)
|
||||
TranslateInTreePVToCSI(logger klog.Logger, pv *v1.PersistentVolume) (*v1.PersistentVolume, error)
|
||||
TranslateInTreeInlineVolumeToCSI(logger klog.Logger, volume *v1.Volume, podNamespace string) (*v1.PersistentVolume, error)
|
||||
}
|
||||
|
||||
// TranslateInTreeSpecToCSI translates a volume spec (either PV or inline volume)
|
||||
// supported by an in-tree plugin to CSI
|
||||
func TranslateInTreeSpecToCSI(spec *volume.Spec, podNamespace string, translator InTreeToCSITranslator) (*volume.Spec, error) {
|
||||
func TranslateInTreeSpecToCSI(logger klog.Logger, spec *volume.Spec, podNamespace string, translator InTreeToCSITranslator) (*volume.Spec, error) {
|
||||
var csiPV *v1.PersistentVolume
|
||||
var err error
|
||||
inlineVolume := false
|
||||
if spec.PersistentVolume != nil {
|
||||
csiPV, err = translator.TranslateInTreePVToCSI(spec.PersistentVolume)
|
||||
csiPV, err = translator.TranslateInTreePVToCSI(logger, spec.PersistentVolume)
|
||||
} else if spec.Volume != nil {
|
||||
csiPV, err = translator.TranslateInTreeInlineVolumeToCSI(spec.Volume, podNamespace)
|
||||
csiPV, err = translator.TranslateInTreeInlineVolumeToCSI(logger, spec.Volume, podNamespace)
|
||||
inlineVolume = true
|
||||
} else {
|
||||
err = errors.New("not a valid volume spec")
|
||||
|
@ -63,8 +63,8 @@ type InTreeToCSITranslator interface {
|
||||
IsMigratableIntreePluginByName(inTreePluginName string) bool
|
||||
GetInTreePluginNameFromSpec(pv *v1.PersistentVolume, vol *v1.Volume) (string, error)
|
||||
GetCSINameFromInTreeName(pluginName string) (string, error)
|
||||
TranslateInTreePVToCSI(pv *v1.PersistentVolume) (*v1.PersistentVolume, error)
|
||||
TranslateInTreeInlineVolumeToCSI(volume *v1.Volume, podNamespace string) (*v1.PersistentVolume, error)
|
||||
TranslateInTreePVToCSI(logger klog.Logger, pv *v1.PersistentVolume) (*v1.PersistentVolume, error)
|
||||
TranslateInTreeInlineVolumeToCSI(logger klog.Logger, volume *v1.Volume, podNamespace string) (*v1.PersistentVolume, error)
|
||||
}
|
||||
|
||||
var _ OperationGenerator = &operationGenerator{}
|
||||
|
@ -27,6 +27,7 @@ import (
|
||||
storage "k8s.io/api/storage/v1"
|
||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||
"k8s.io/apimachinery/pkg/util/sets"
|
||||
"k8s.io/klog/v2"
|
||||
)
|
||||
|
||||
const (
|
||||
@ -57,7 +58,7 @@ func NewAWSElasticBlockStoreCSITranslator() InTreePlugin {
|
||||
}
|
||||
|
||||
// TranslateInTreeStorageClassToCSI translates InTree EBS storage class parameters to CSI storage class
|
||||
func (t *awsElasticBlockStoreCSITranslator) TranslateInTreeStorageClassToCSI(sc *storage.StorageClass) (*storage.StorageClass, error) {
|
||||
func (t *awsElasticBlockStoreCSITranslator) TranslateInTreeStorageClassToCSI(logger klog.Logger, sc *storage.StorageClass) (*storage.StorageClass, error) {
|
||||
var (
|
||||
generatedTopologies []v1.TopologySelectorTerm
|
||||
params = map[string]string{}
|
||||
@ -100,7 +101,7 @@ func (t *awsElasticBlockStoreCSITranslator) TranslateInTreeStorageClassToCSI(sc
|
||||
|
||||
// TranslateInTreeInlineVolumeToCSI takes a Volume with AWSElasticBlockStore set from in-tree
|
||||
// and converts the AWSElasticBlockStore source to a CSIPersistentVolumeSource
|
||||
func (t *awsElasticBlockStoreCSITranslator) TranslateInTreeInlineVolumeToCSI(volume *v1.Volume, podNamespace string) (*v1.PersistentVolume, error) {
|
||||
func (t *awsElasticBlockStoreCSITranslator) TranslateInTreeInlineVolumeToCSI(logger klog.Logger, volume *v1.Volume, podNamespace string) (*v1.PersistentVolume, error) {
|
||||
if volume == nil || volume.AWSElasticBlockStore == nil {
|
||||
return nil, fmt.Errorf("volume is nil or AWS EBS not defined on volume")
|
||||
}
|
||||
@ -135,7 +136,7 @@ func (t *awsElasticBlockStoreCSITranslator) TranslateInTreeInlineVolumeToCSI(vol
|
||||
|
||||
// TranslateInTreePVToCSI takes a PV with AWSElasticBlockStore set from in-tree
|
||||
// and converts the AWSElasticBlockStore source to a CSIPersistentVolumeSource
|
||||
func (t *awsElasticBlockStoreCSITranslator) TranslateInTreePVToCSI(pv *v1.PersistentVolume) (*v1.PersistentVolume, error) {
|
||||
func (t *awsElasticBlockStoreCSITranslator) TranslateInTreePVToCSI(logger klog.Logger, pv *v1.PersistentVolume) (*v1.PersistentVolume, error) {
|
||||
if pv == nil || pv.Spec.AWSElasticBlockStore == nil {
|
||||
return nil, fmt.Errorf("pv is nil or AWS EBS not defined on pv")
|
||||
}
|
||||
|
@ -23,6 +23,9 @@ import (
|
||||
v1 "k8s.io/api/core/v1"
|
||||
|
||||
storage "k8s.io/api/storage/v1"
|
||||
|
||||
"k8s.io/klog/v2/ktesting"
|
||||
_ "k8s.io/klog/v2/ktesting/init"
|
||||
)
|
||||
|
||||
const (
|
||||
@ -79,6 +82,7 @@ func TestKubernetesVolumeIDToEBSVolumeID(t *testing.T) {
|
||||
|
||||
func TestTranslateEBSInTreeStorageClassToCSI(t *testing.T) {
|
||||
translator := NewAWSElasticBlockStoreCSITranslator()
|
||||
logger, _ := ktesting.NewTestContext(t)
|
||||
|
||||
cases := []struct {
|
||||
name string
|
||||
@ -111,7 +115,7 @@ func TestTranslateEBSInTreeStorageClassToCSI(t *testing.T) {
|
||||
|
||||
for _, tc := range cases {
|
||||
t.Logf("Testing %v", tc.name)
|
||||
got, err := translator.TranslateInTreeStorageClassToCSI(tc.sc)
|
||||
got, err := translator.TranslateInTreeStorageClassToCSI(logger, tc.sc)
|
||||
if err != nil && !tc.expErr {
|
||||
t.Errorf("Did not expect error but got: %v", err)
|
||||
}
|
||||
@ -129,6 +133,7 @@ func TestTranslateEBSInTreeStorageClassToCSI(t *testing.T) {
|
||||
|
||||
func TestTranslateInTreeInlineVolumeToCSI(t *testing.T) {
|
||||
translator := NewAWSElasticBlockStoreCSITranslator()
|
||||
logger, _ := ktesting.NewTestContext(t)
|
||||
|
||||
cases := []struct {
|
||||
name string
|
||||
@ -182,7 +187,7 @@ func TestTranslateInTreeInlineVolumeToCSI(t *testing.T) {
|
||||
for _, tc := range cases {
|
||||
t.Run(tc.name, func(t *testing.T) {
|
||||
t.Logf("Testing %v", tc.name)
|
||||
got, err := translator.TranslateInTreeInlineVolumeToCSI(&v1.Volume{Name: "volume", VolumeSource: tc.volumeSource}, "")
|
||||
got, err := translator.TranslateInTreeInlineVolumeToCSI(logger, &v1.Volume{Name: "volume", VolumeSource: tc.volumeSource}, "")
|
||||
if err != nil && !tc.expErr {
|
||||
t.Fatalf("Did not expect error but got: %v", err)
|
||||
}
|
||||
|
@ -24,6 +24,7 @@ import (
|
||||
v1 "k8s.io/api/core/v1"
|
||||
storage "k8s.io/api/storage/v1"
|
||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||
"k8s.io/klog/v2"
|
||||
)
|
||||
|
||||
const (
|
||||
@ -60,7 +61,7 @@ func NewAzureDiskCSITranslator() InTreePlugin {
|
||||
}
|
||||
|
||||
// TranslateInTreeStorageClassToCSI translates InTree Azure Disk storage class parameters to CSI storage class
|
||||
func (t *azureDiskCSITranslator) TranslateInTreeStorageClassToCSI(sc *storage.StorageClass) (*storage.StorageClass, error) {
|
||||
func (t *azureDiskCSITranslator) TranslateInTreeStorageClassToCSI(logger klog.Logger, sc *storage.StorageClass) (*storage.StorageClass, error) {
|
||||
var (
|
||||
generatedTopologies []v1.TopologySelectorTerm
|
||||
params = map[string]string{}
|
||||
@ -96,7 +97,7 @@ func (t *azureDiskCSITranslator) TranslateInTreeStorageClassToCSI(sc *storage.St
|
||||
|
||||
// TranslateInTreeInlineVolumeToCSI takes a Volume with AzureDisk set from in-tree
|
||||
// and converts the AzureDisk source to a CSIPersistentVolumeSource
|
||||
func (t *azureDiskCSITranslator) TranslateInTreeInlineVolumeToCSI(volume *v1.Volume, podNamespace string) (*v1.PersistentVolume, error) {
|
||||
func (t *azureDiskCSITranslator) TranslateInTreeInlineVolumeToCSI(logger klog.Logger, volume *v1.Volume, podNamespace string) (*v1.PersistentVolume, error) {
|
||||
if volume == nil || volume.AzureDisk == nil {
|
||||
return nil, fmt.Errorf("volume is nil or Azure Disk not defined on volume")
|
||||
}
|
||||
@ -140,7 +141,7 @@ func (t *azureDiskCSITranslator) TranslateInTreeInlineVolumeToCSI(volume *v1.Vol
|
||||
|
||||
// TranslateInTreePVToCSI takes a PV with AzureDisk set from in-tree
|
||||
// and converts the AzureDisk source to a CSIPersistentVolumeSource
|
||||
func (t *azureDiskCSITranslator) TranslateInTreePVToCSI(pv *v1.PersistentVolume) (*v1.PersistentVolume, error) {
|
||||
func (t *azureDiskCSITranslator) TranslateInTreePVToCSI(logger klog.Logger, pv *v1.PersistentVolume) (*v1.PersistentVolume, error) {
|
||||
if pv == nil || pv.Spec.AzureDisk == nil {
|
||||
return nil, fmt.Errorf("pv is nil or Azure Disk source not defined on pv")
|
||||
}
|
||||
|
@ -24,6 +24,8 @@ import (
|
||||
corev1 "k8s.io/api/core/v1"
|
||||
storage "k8s.io/api/storage/v1"
|
||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||
"k8s.io/klog/v2/ktesting"
|
||||
_ "k8s.io/klog/v2/ktesting/init"
|
||||
)
|
||||
|
||||
func TestIsManagedDisk(t *testing.T) {
|
||||
@ -99,6 +101,7 @@ func TestGetDiskName(t *testing.T) {
|
||||
func TestTranslateAzureDiskInTreeInlineVolumeToCSI(t *testing.T) {
|
||||
sharedBlobDiskKind := corev1.AzureDedicatedBlobDisk
|
||||
translator := NewAzureDiskCSITranslator()
|
||||
logger, _ := ktesting.NewTestContext(t)
|
||||
|
||||
cases := []struct {
|
||||
name string
|
||||
@ -158,7 +161,7 @@ func TestTranslateAzureDiskInTreeInlineVolumeToCSI(t *testing.T) {
|
||||
|
||||
for _, tc := range cases {
|
||||
t.Logf("Testing %v", tc.name)
|
||||
got, err := translator.TranslateInTreeInlineVolumeToCSI(tc.volume, "")
|
||||
got, err := translator.TranslateInTreeInlineVolumeToCSI(logger, tc.volume, "")
|
||||
if err != nil && !tc.expErr {
|
||||
t.Errorf("Did not expect error but got: %v", err)
|
||||
}
|
||||
@ -175,6 +178,7 @@ func TestTranslateAzureDiskInTreeInlineVolumeToCSI(t *testing.T) {
|
||||
|
||||
func TestTranslateAzureDiskInTreePVToCSI(t *testing.T) {
|
||||
translator := NewAzureDiskCSITranslator()
|
||||
logger, _ := ktesting.NewTestContext(t)
|
||||
|
||||
sharedBlobDiskKind := corev1.AzureDedicatedBlobDisk
|
||||
cachingMode := corev1.AzureDataDiskCachingMode("cachingmode")
|
||||
@ -250,7 +254,7 @@ func TestTranslateAzureDiskInTreePVToCSI(t *testing.T) {
|
||||
|
||||
for _, tc := range cases {
|
||||
t.Logf("Testing %v", tc.name)
|
||||
got, err := translator.TranslateInTreePVToCSI(tc.volume)
|
||||
got, err := translator.TranslateInTreePVToCSI(logger, tc.volume)
|
||||
if err != nil && !tc.expErr {
|
||||
t.Errorf("Did not expect error but got: %v", err)
|
||||
}
|
||||
@ -447,6 +451,7 @@ func TestTranslateTranslateCSIPVToInTree(t *testing.T) {
|
||||
|
||||
func TestTranslateInTreeStorageClassToCSI(t *testing.T) {
|
||||
translator := NewAzureDiskCSITranslator()
|
||||
logger, _ := ktesting.NewTestContext(t)
|
||||
|
||||
tcs := []struct {
|
||||
name string
|
||||
@ -513,7 +518,7 @@ func TestTranslateInTreeStorageClassToCSI(t *testing.T) {
|
||||
|
||||
for _, tc := range tcs {
|
||||
t.Logf("Testing %v", tc.name)
|
||||
gotOptions, err := translator.TranslateInTreeStorageClassToCSI(tc.options)
|
||||
gotOptions, err := translator.TranslateInTreeStorageClassToCSI(logger, tc.options)
|
||||
if err != nil && !tc.expErr {
|
||||
t.Errorf("Did not expect error but got: %v", err)
|
||||
}
|
||||
|
@ -59,13 +59,13 @@ func NewAzureFileCSITranslator() InTreePlugin {
|
||||
}
|
||||
|
||||
// TranslateInTreeStorageClassToCSI translates InTree Azure File storage class parameters to CSI storage class
|
||||
func (t *azureFileCSITranslator) TranslateInTreeStorageClassToCSI(sc *storage.StorageClass) (*storage.StorageClass, error) {
|
||||
func (t *azureFileCSITranslator) TranslateInTreeStorageClassToCSI(logger klog.Logger, sc *storage.StorageClass) (*storage.StorageClass, error) {
|
||||
return sc, nil
|
||||
}
|
||||
|
||||
// TranslateInTreeInlineVolumeToCSI takes a Volume with AzureFile set from in-tree
|
||||
// and converts the AzureFile source to a CSIPersistentVolumeSource
|
||||
func (t *azureFileCSITranslator) TranslateInTreeInlineVolumeToCSI(volume *v1.Volume, podNamespace string) (*v1.PersistentVolume, error) {
|
||||
func (t *azureFileCSITranslator) TranslateInTreeInlineVolumeToCSI(logger klog.Logger, volume *v1.Volume, podNamespace string) (*v1.PersistentVolume, error) {
|
||||
if volume == nil || volume.AzureFile == nil {
|
||||
return nil, fmt.Errorf("volume is nil or Azure File not defined on volume")
|
||||
}
|
||||
@ -73,7 +73,7 @@ func (t *azureFileCSITranslator) TranslateInTreeInlineVolumeToCSI(volume *v1.Vol
|
||||
azureSource := volume.AzureFile
|
||||
accountName, err := getStorageAccountName(azureSource.SecretName)
|
||||
if err != nil {
|
||||
klog.V(5).Infof("getStorageAccountName(%s) returned with error: %v", azureSource.SecretName, err)
|
||||
logger.V(5).Info("getStorageAccountName returned with error", "secretName", azureSource.SecretName, "err", err)
|
||||
accountName = azureSource.SecretName
|
||||
}
|
||||
|
||||
@ -112,7 +112,7 @@ func (t *azureFileCSITranslator) TranslateInTreeInlineVolumeToCSI(volume *v1.Vol
|
||||
|
||||
// TranslateInTreePVToCSI takes a PV with AzureFile set from in-tree
|
||||
// and converts the AzureFile source to a CSIPersistentVolumeSource
|
||||
func (t *azureFileCSITranslator) TranslateInTreePVToCSI(pv *v1.PersistentVolume) (*v1.PersistentVolume, error) {
|
||||
func (t *azureFileCSITranslator) TranslateInTreePVToCSI(logger klog.Logger, pv *v1.PersistentVolume) (*v1.PersistentVolume, error) {
|
||||
if pv == nil || pv.Spec.AzureFile == nil {
|
||||
return nil, fmt.Errorf("pv is nil or Azure File source not defined on pv")
|
||||
}
|
||||
@ -120,7 +120,7 @@ func (t *azureFileCSITranslator) TranslateInTreePVToCSI(pv *v1.PersistentVolume)
|
||||
azureSource := pv.Spec.PersistentVolumeSource.AzureFile
|
||||
accountName, err := getStorageAccountName(azureSource.SecretName)
|
||||
if err != nil {
|
||||
klog.V(5).Infof("getStorageAccountName(%s) returned with error: %v", azureSource.SecretName, err)
|
||||
logger.V(5).Info("getStorageAccountName returned with error", "secretName", azureSource.SecretName, "err", err)
|
||||
accountName = azureSource.SecretName
|
||||
}
|
||||
resourceGroup := ""
|
||||
|
@ -23,6 +23,8 @@ import (
|
||||
|
||||
corev1 "k8s.io/api/core/v1"
|
||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||
"k8s.io/klog/v2/ktesting"
|
||||
_ "k8s.io/klog/v2/ktesting/init"
|
||||
|
||||
"github.com/stretchr/testify/assert"
|
||||
)
|
||||
@ -100,6 +102,7 @@ func TestGetFileShareInfo(t *testing.T) {
|
||||
|
||||
func TestTranslateAzureFileInTreeStorageClassToCSI(t *testing.T) {
|
||||
translator := NewAzureFileCSITranslator()
|
||||
logger, _ := ktesting.NewTestContext(t)
|
||||
|
||||
cases := []struct {
|
||||
name string
|
||||
@ -188,7 +191,7 @@ func TestTranslateAzureFileInTreeStorageClassToCSI(t *testing.T) {
|
||||
|
||||
for _, tc := range cases {
|
||||
t.Logf("Testing %v", tc.name)
|
||||
got, err := translator.TranslateInTreeInlineVolumeToCSI(tc.volume, tc.podNamespace)
|
||||
got, err := translator.TranslateInTreeInlineVolumeToCSI(logger, tc.volume, tc.podNamespace)
|
||||
if err != nil && !tc.expErr {
|
||||
t.Errorf("Did not expect error but got: %v", err)
|
||||
}
|
||||
@ -205,6 +208,7 @@ func TestTranslateAzureFileInTreeStorageClassToCSI(t *testing.T) {
|
||||
|
||||
func TestTranslateAzureFileInTreePVToCSI(t *testing.T) {
|
||||
translator := NewAzureFileCSITranslator()
|
||||
logger, _ := ktesting.NewTestContext(t)
|
||||
|
||||
secretNamespace := "secretnamespace"
|
||||
|
||||
@ -367,7 +371,7 @@ func TestTranslateAzureFileInTreePVToCSI(t *testing.T) {
|
||||
|
||||
for _, tc := range cases {
|
||||
t.Logf("Testing %v", tc.name)
|
||||
got, err := translator.TranslateInTreePVToCSI(tc.volume)
|
||||
got, err := translator.TranslateInTreePVToCSI(logger, tc.volume)
|
||||
if err != nil && !tc.expErr {
|
||||
t.Errorf("Did not expect error but got: %v", err)
|
||||
}
|
||||
|
@ -25,6 +25,7 @@ import (
|
||||
storage "k8s.io/api/storage/v1"
|
||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||
"k8s.io/apimachinery/pkg/util/sets"
|
||||
"k8s.io/klog/v2"
|
||||
)
|
||||
|
||||
const (
|
||||
@ -78,7 +79,7 @@ func generateToplogySelectors(key string, values []string) []v1.TopologySelector
|
||||
}
|
||||
|
||||
// TranslateInTreeStorageClassToCSI translates InTree GCE storage class parameters to CSI storage class
|
||||
func (g *gcePersistentDiskCSITranslator) TranslateInTreeStorageClassToCSI(sc *storage.StorageClass) (*storage.StorageClass, error) {
|
||||
func (g *gcePersistentDiskCSITranslator) TranslateInTreeStorageClassToCSI(logger klog.Logger, sc *storage.StorageClass) (*storage.StorageClass, error) {
|
||||
var generatedTopologies []v1.TopologySelectorTerm
|
||||
|
||||
np := map[string]string{}
|
||||
@ -162,7 +163,7 @@ func backwardCompatibleAccessModes(ams []v1.PersistentVolumeAccessMode) []v1.Per
|
||||
|
||||
// TranslateInTreeInlineVolumeToCSI takes a Volume with GCEPersistentDisk set from in-tree
|
||||
// and converts the GCEPersistentDisk source to a CSIPersistentVolumeSource
|
||||
func (g *gcePersistentDiskCSITranslator) TranslateInTreeInlineVolumeToCSI(volume *v1.Volume, podNamespace string) (*v1.PersistentVolume, error) {
|
||||
func (g *gcePersistentDiskCSITranslator) TranslateInTreeInlineVolumeToCSI(logger klog.Logger, volume *v1.Volume, podNamespace string) (*v1.PersistentVolume, error) {
|
||||
if volume == nil || volume.GCEPersistentDisk == nil {
|
||||
return nil, fmt.Errorf("volume is nil or GCE PD not defined on volume")
|
||||
}
|
||||
@ -208,7 +209,7 @@ func (g *gcePersistentDiskCSITranslator) TranslateInTreeInlineVolumeToCSI(volume
|
||||
|
||||
// TranslateInTreePVToCSI takes a PV with GCEPersistentDisk set from in-tree
|
||||
// and converts the GCEPersistentDisk source to a CSIPersistentVolumeSource
|
||||
func (g *gcePersistentDiskCSITranslator) TranslateInTreePVToCSI(pv *v1.PersistentVolume) (*v1.PersistentVolume, error) {
|
||||
func (g *gcePersistentDiskCSITranslator) TranslateInTreePVToCSI(logger klog.Logger, pv *v1.PersistentVolume) (*v1.PersistentVolume, error) {
|
||||
var volID string
|
||||
|
||||
if pv == nil || pv.Spec.GCEPersistentDisk == nil {
|
||||
|
@ -24,6 +24,8 @@ import (
|
||||
v1 "k8s.io/api/core/v1"
|
||||
storage "k8s.io/api/storage/v1"
|
||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||
"k8s.io/klog/v2/ktesting"
|
||||
_ "k8s.io/klog/v2/ktesting/init"
|
||||
)
|
||||
|
||||
func NewStorageClass(params map[string]string, allowedTopologies []v1.TopologySelectorTerm) *storage.StorageClass {
|
||||
@ -35,6 +37,7 @@ func NewStorageClass(params map[string]string, allowedTopologies []v1.TopologySe
|
||||
|
||||
func TestTranslatePDInTreeStorageClassToCSI(t *testing.T) {
|
||||
g := NewGCEPersistentDiskCSITranslator()
|
||||
logger, _ := ktesting.NewTestContext(t)
|
||||
|
||||
tcs := []struct {
|
||||
name string
|
||||
@ -86,7 +89,7 @@ func TestTranslatePDInTreeStorageClassToCSI(t *testing.T) {
|
||||
|
||||
for _, tc := range tcs {
|
||||
t.Logf("Testing %v", tc.name)
|
||||
gotOptions, err := g.TranslateInTreeStorageClassToCSI(tc.options)
|
||||
gotOptions, err := g.TranslateInTreeStorageClassToCSI(logger, tc.options)
|
||||
if err != nil && !tc.expErr {
|
||||
t.Errorf("Did not expect error but got: %v", err)
|
||||
}
|
||||
@ -266,7 +269,8 @@ func TestBackwardCompatibleAccessModes(t *testing.T) {
|
||||
|
||||
func TestInlineReadOnly(t *testing.T) {
|
||||
g := NewGCEPersistentDiskCSITranslator()
|
||||
pv, err := g.TranslateInTreeInlineVolumeToCSI(&v1.Volume{
|
||||
logger, _ := ktesting.NewTestContext(t)
|
||||
pv, err := g.TranslateInTreeInlineVolumeToCSI(logger, &v1.Volume{
|
||||
VolumeSource: v1.VolumeSource{
|
||||
GCEPersistentDisk: &v1.GCEPersistentDiskVolumeSource{
|
||||
PDName: "foo",
|
||||
@ -298,6 +302,7 @@ func TestInlineReadOnly(t *testing.T) {
|
||||
|
||||
func TestTranslateInTreePVToCSIVolIDFmt(t *testing.T) {
|
||||
g := NewGCEPersistentDiskCSITranslator()
|
||||
logger, _ := ktesting.NewTestContext(t)
|
||||
pdName := "pd-name"
|
||||
tests := []struct {
|
||||
desc string
|
||||
@ -332,7 +337,7 @@ func TestTranslateInTreePVToCSIVolIDFmt(t *testing.T) {
|
||||
}
|
||||
for _, tc := range tests {
|
||||
t.Run(tc.desc, func(t *testing.T) {
|
||||
translatedPV, err := g.TranslateInTreePVToCSI(&v1.PersistentVolume{
|
||||
translatedPV, err := g.TranslateInTreePVToCSI(logger, &v1.PersistentVolume{
|
||||
ObjectMeta: metav1.ObjectMeta{
|
||||
Labels: map[string]string{tc.topologyLabelKey: tc.topologyLabelValue},
|
||||
},
|
||||
|
@ -25,6 +25,7 @@ import (
|
||||
v1 "k8s.io/api/core/v1"
|
||||
storage "k8s.io/api/storage/v1"
|
||||
"k8s.io/apimachinery/pkg/util/sets"
|
||||
"k8s.io/klog/v2"
|
||||
)
|
||||
|
||||
// InTreePlugin handles translations between CSI and in-tree sources in a PV
|
||||
@ -32,17 +33,17 @@ type InTreePlugin interface {
|
||||
|
||||
// TranslateInTreeStorageClassToCSI takes in-tree volume options
|
||||
// and translates them to a volume options consumable by CSI plugin
|
||||
TranslateInTreeStorageClassToCSI(sc *storage.StorageClass) (*storage.StorageClass, error)
|
||||
TranslateInTreeStorageClassToCSI(logger klog.Logger, sc *storage.StorageClass) (*storage.StorageClass, error)
|
||||
|
||||
// TranslateInTreeInlineVolumeToCSI takes a inline volume and will translate
|
||||
// the in-tree inline volume source to a CSIPersistentVolumeSource
|
||||
// A PV object containing the CSIPersistentVolumeSource in it's spec is returned
|
||||
// podNamespace is only needed for azurefile to fetch secret namespace, no need to be set for other plugins.
|
||||
TranslateInTreeInlineVolumeToCSI(volume *v1.Volume, podNamespace string) (*v1.PersistentVolume, error)
|
||||
TranslateInTreeInlineVolumeToCSI(logger klog.Logger, volume *v1.Volume, podNamespace string) (*v1.PersistentVolume, error)
|
||||
|
||||
// TranslateInTreePVToCSI takes a persistent volume and will translate
|
||||
// the in-tree pv source to a CSI Source. The input persistent volume can be modified
|
||||
TranslateInTreePVToCSI(pv *v1.PersistentVolume) (*v1.PersistentVolume, error)
|
||||
TranslateInTreePVToCSI(logger klog.Logger, pv *v1.PersistentVolume) (*v1.PersistentVolume, error)
|
||||
|
||||
// TranslateCSIPVToInTree takes a PV with a CSI PersistentVolume Source and will translate
|
||||
// it to a in-tree Persistent Volume Source for the in-tree volume
|
||||
|
@ -23,6 +23,7 @@ import (
|
||||
v1 "k8s.io/api/core/v1"
|
||||
storage "k8s.io/api/storage/v1"
|
||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||
"k8s.io/klog/v2"
|
||||
)
|
||||
|
||||
const (
|
||||
@ -45,7 +46,7 @@ func NewOpenStackCinderCSITranslator() InTreePlugin {
|
||||
}
|
||||
|
||||
// TranslateInTreeStorageClassToCSI translates InTree Cinder storage class parameters to CSI storage class
|
||||
func (t *osCinderCSITranslator) TranslateInTreeStorageClassToCSI(sc *storage.StorageClass) (*storage.StorageClass, error) {
|
||||
func (t *osCinderCSITranslator) TranslateInTreeStorageClassToCSI(logger klog.Logger, sc *storage.StorageClass) (*storage.StorageClass, error) {
|
||||
var (
|
||||
params = map[string]string{}
|
||||
)
|
||||
@ -75,7 +76,7 @@ func (t *osCinderCSITranslator) TranslateInTreeStorageClassToCSI(sc *storage.Sto
|
||||
|
||||
// TranslateInTreeInlineVolumeToCSI takes a Volume with Cinder set from in-tree
|
||||
// and converts the Cinder source to a CSIPersistentVolumeSource
|
||||
func (t *osCinderCSITranslator) TranslateInTreeInlineVolumeToCSI(volume *v1.Volume, podNamespace string) (*v1.PersistentVolume, error) {
|
||||
func (t *osCinderCSITranslator) TranslateInTreeInlineVolumeToCSI(logger klog.Logger, volume *v1.Volume, podNamespace string) (*v1.PersistentVolume, error) {
|
||||
if volume == nil || volume.Cinder == nil {
|
||||
return nil, fmt.Errorf("volume is nil or Cinder not defined on volume")
|
||||
}
|
||||
@ -105,7 +106,7 @@ func (t *osCinderCSITranslator) TranslateInTreeInlineVolumeToCSI(volume *v1.Volu
|
||||
|
||||
// TranslateInTreePVToCSI takes a PV with Cinder set from in-tree
|
||||
// and converts the Cinder source to a CSIPersistentVolumeSource
|
||||
func (t *osCinderCSITranslator) TranslateInTreePVToCSI(pv *v1.PersistentVolume) (*v1.PersistentVolume, error) {
|
||||
func (t *osCinderCSITranslator) TranslateInTreePVToCSI(logger klog.Logger, pv *v1.PersistentVolume) (*v1.PersistentVolume, error) {
|
||||
if pv == nil || pv.Spec.Cinder == nil {
|
||||
return nil, fmt.Errorf("pv is nil or Cinder not defined on pv")
|
||||
}
|
||||
|
@ -22,10 +22,13 @@ import (
|
||||
|
||||
v1 "k8s.io/api/core/v1"
|
||||
storage "k8s.io/api/storage/v1"
|
||||
"k8s.io/klog/v2/ktesting"
|
||||
_ "k8s.io/klog/v2/ktesting/init"
|
||||
)
|
||||
|
||||
func TestTranslateCinderInTreeStorageClassToCSI(t *testing.T) {
|
||||
translator := NewOpenStackCinderCSITranslator()
|
||||
logger, _ := ktesting.NewTestContext(t)
|
||||
|
||||
cases := []struct {
|
||||
name string
|
||||
@ -63,7 +66,7 @@ func TestTranslateCinderInTreeStorageClassToCSI(t *testing.T) {
|
||||
|
||||
for _, tc := range cases {
|
||||
t.Logf("Testing %v", tc.name)
|
||||
got, err := translator.TranslateInTreeStorageClassToCSI(tc.sc)
|
||||
got, err := translator.TranslateInTreeStorageClassToCSI(logger, tc.sc)
|
||||
if err != nil && !tc.expErr {
|
||||
t.Errorf("Did not expect error but got: %v", err)
|
||||
}
|
||||
|
@ -19,9 +19,10 @@ package plugins
|
||||
import (
|
||||
"fmt"
|
||||
|
||||
"k8s.io/api/core/v1"
|
||||
v1 "k8s.io/api/core/v1"
|
||||
storagev1 "k8s.io/api/storage/v1"
|
||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||
"k8s.io/klog/v2"
|
||||
)
|
||||
|
||||
const (
|
||||
@ -39,7 +40,7 @@ func NewPortworxCSITranslator() InTreePlugin {
|
||||
|
||||
// TranslateInTreeStorageClassToCSI takes in-tree storage class used by in-tree plugin
|
||||
// and translates them to a storageclass consumable by CSI plugin
|
||||
func (p portworxCSITranslator) TranslateInTreeStorageClassToCSI(sc *storagev1.StorageClass) (*storagev1.StorageClass, error) {
|
||||
func (p portworxCSITranslator) TranslateInTreeStorageClassToCSI(logger klog.Logger, sc *storagev1.StorageClass) (*storagev1.StorageClass, error) {
|
||||
if sc == nil {
|
||||
return nil, fmt.Errorf("sc is nil")
|
||||
}
|
||||
@ -49,7 +50,7 @@ func (p portworxCSITranslator) TranslateInTreeStorageClassToCSI(sc *storagev1.St
|
||||
|
||||
// TranslateInTreeInlineVolumeToCSI takes a inline volume and will translate
|
||||
// the in-tree inline volume source to a CSIPersistentVolumeSource
|
||||
func (p portworxCSITranslator) TranslateInTreeInlineVolumeToCSI(volume *v1.Volume, podNamespace string) (*v1.PersistentVolume, error) {
|
||||
func (p portworxCSITranslator) TranslateInTreeInlineVolumeToCSI(logger klog.Logger, volume *v1.Volume, podNamespace string) (*v1.PersistentVolume, error) {
|
||||
if volume == nil || volume.PortworxVolume == nil {
|
||||
return nil, fmt.Errorf("volume is nil or PortworxVolume not defined on volume")
|
||||
}
|
||||
@ -82,7 +83,7 @@ func (p portworxCSITranslator) TranslateInTreeInlineVolumeToCSI(volume *v1.Volum
|
||||
|
||||
// TranslateInTreePVToCSI takes a Portworx persistent volume and will translate
|
||||
// the in-tree pv source to a CSI Source
|
||||
func (p portworxCSITranslator) TranslateInTreePVToCSI(pv *v1.PersistentVolume) (*v1.PersistentVolume, error) {
|
||||
func (p portworxCSITranslator) TranslateInTreePVToCSI(logger klog.Logger, pv *v1.PersistentVolume) (*v1.PersistentVolume, error) {
|
||||
if pv == nil || pv.Spec.PortworxVolume == nil {
|
||||
return nil, fmt.Errorf("pv is nil or PortworxVolume not defined on pv")
|
||||
}
|
||||
|
@ -17,15 +17,19 @@ limitations under the License.
|
||||
package plugins
|
||||
|
||||
import (
|
||||
"reflect"
|
||||
"testing"
|
||||
|
||||
v1 "k8s.io/api/core/v1"
|
||||
storage "k8s.io/api/storage/v1"
|
||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||
"reflect"
|
||||
"testing"
|
||||
"k8s.io/klog/v2/ktesting"
|
||||
_ "k8s.io/klog/v2/ktesting/init"
|
||||
)
|
||||
|
||||
func TestTranslatePortworxInTreeStorageClassToCSI(t *testing.T) {
|
||||
translator := NewPortworxCSITranslator()
|
||||
logger, _ := ktesting.NewTestContext(t)
|
||||
testCases := []struct {
|
||||
name string
|
||||
inTreeSC *storage.StorageClass
|
||||
@ -71,7 +75,7 @@ func TestTranslatePortworxInTreeStorageClassToCSI(t *testing.T) {
|
||||
}
|
||||
for _, tc := range testCases {
|
||||
t.Logf("Testing %v", tc.name)
|
||||
result, err := translator.TranslateInTreeStorageClassToCSI(tc.inTreeSC)
|
||||
result, err := translator.TranslateInTreeStorageClassToCSI(logger, tc.inTreeSC)
|
||||
if err != nil && !tc.errorExp {
|
||||
t.Errorf("Did not expect error but got: %v", err)
|
||||
}
|
||||
@ -86,6 +90,8 @@ func TestTranslatePortworxInTreeStorageClassToCSI(t *testing.T) {
|
||||
|
||||
func TestTranslatePortworxInTreeInlineVolumeToCSI(t *testing.T) {
|
||||
translator := NewPortworxCSITranslator()
|
||||
logger, _ := ktesting.NewTestContext(t)
|
||||
|
||||
testCases := []struct {
|
||||
name string
|
||||
inLine *v1.Volume
|
||||
@ -136,7 +142,7 @@ func TestTranslatePortworxInTreeInlineVolumeToCSI(t *testing.T) {
|
||||
|
||||
for _, tc := range testCases {
|
||||
t.Logf("Testing %v", tc.name)
|
||||
result, err := translator.TranslateInTreeInlineVolumeToCSI(tc.inLine, "ns")
|
||||
result, err := translator.TranslateInTreeInlineVolumeToCSI(logger, tc.inLine, "ns")
|
||||
if err != nil && !tc.errExpected {
|
||||
t.Errorf("Did not expect error but got: %v", err)
|
||||
}
|
||||
@ -151,6 +157,7 @@ func TestTranslatePortworxInTreeInlineVolumeToCSI(t *testing.T) {
|
||||
|
||||
func TestTranslatePortworxInTreePVToCSI(t *testing.T) {
|
||||
translator := NewPortworxCSITranslator()
|
||||
logger, _ := ktesting.NewTestContext(t)
|
||||
|
||||
testCases := []struct {
|
||||
name string
|
||||
@ -234,7 +241,7 @@ func TestTranslatePortworxInTreePVToCSI(t *testing.T) {
|
||||
|
||||
for _, tc := range testCases {
|
||||
t.Logf("Testing %v", tc.name)
|
||||
result, err := translator.TranslateInTreePVToCSI(tc.inTree)
|
||||
result, err := translator.TranslateInTreePVToCSI(logger, tc.inTree)
|
||||
if err != nil && !tc.errExpected {
|
||||
t.Errorf("Did not expect error but got: %v", err)
|
||||
}
|
||||
|
@ -74,7 +74,7 @@ func NewvSphereCSITranslator() InTreePlugin {
|
||||
}
|
||||
|
||||
// TranslateInTreeStorageClassToCSI translates InTree vSphere storage class parameters to CSI storage class
|
||||
func (t *vSphereCSITranslator) TranslateInTreeStorageClassToCSI(sc *storage.StorageClass) (*storage.StorageClass, error) {
|
||||
func (t *vSphereCSITranslator) TranslateInTreeStorageClassToCSI(logger klog.Logger, sc *storage.StorageClass) (*storage.StorageClass, error) {
|
||||
if sc == nil {
|
||||
return nil, fmt.Errorf("sc is nil")
|
||||
}
|
||||
@ -102,7 +102,7 @@ func (t *vSphereCSITranslator) TranslateInTreeStorageClassToCSI(sc *storage.Stor
|
||||
case "iopslimit":
|
||||
params[paramIopslimit] = v
|
||||
default:
|
||||
klog.V(2).Infof("StorageClass parameter [name:%q, value:%q] is not supported", k, v)
|
||||
logger.V(2).Info("StorageClass parameter is not supported", "name", k, "value", v)
|
||||
}
|
||||
}
|
||||
|
||||
@ -124,7 +124,7 @@ func (t *vSphereCSITranslator) TranslateInTreeStorageClassToCSI(sc *storage.Stor
|
||||
|
||||
// TranslateInTreeInlineVolumeToCSI takes a Volume with VsphereVolume set from in-tree
|
||||
// and converts the VsphereVolume source to a CSIPersistentVolumeSource
|
||||
func (t *vSphereCSITranslator) TranslateInTreeInlineVolumeToCSI(volume *v1.Volume, podNamespace string) (*v1.PersistentVolume, error) {
|
||||
func (t *vSphereCSITranslator) TranslateInTreeInlineVolumeToCSI(logger klog.Logger, volume *v1.Volume, podNamespace string) (*v1.PersistentVolume, error) {
|
||||
if volume == nil || volume.VsphereVolume == nil {
|
||||
return nil, fmt.Errorf("volume is nil or VsphereVolume not defined on volume")
|
||||
}
|
||||
@ -154,7 +154,7 @@ func (t *vSphereCSITranslator) TranslateInTreeInlineVolumeToCSI(volume *v1.Volum
|
||||
|
||||
// TranslateInTreePVToCSI takes a PV with VsphereVolume set from in-tree
|
||||
// and converts the VsphereVolume source to a CSIPersistentVolumeSource
|
||||
func (t *vSphereCSITranslator) TranslateInTreePVToCSI(pv *v1.PersistentVolume) (*v1.PersistentVolume, error) {
|
||||
func (t *vSphereCSITranslator) TranslateInTreePVToCSI(logger klog.Logger, pv *v1.PersistentVolume) (*v1.PersistentVolume, error) {
|
||||
if pv == nil || pv.Spec.VsphereVolume == nil {
|
||||
return nil, fmt.Errorf("pv is nil or VsphereVolume not defined on pv")
|
||||
}
|
||||
|
@ -24,10 +24,13 @@ import (
|
||||
v1 "k8s.io/api/core/v1"
|
||||
storage "k8s.io/api/storage/v1"
|
||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||
"k8s.io/klog/v2/ktesting"
|
||||
_ "k8s.io/klog/v2/ktesting/init"
|
||||
)
|
||||
|
||||
func TestTranslatevSphereInTreeStorageClassToCSI(t *testing.T) {
|
||||
translator := NewvSphereCSITranslator()
|
||||
logger, _ := ktesting.NewTestContext(t)
|
||||
topologySelectorTerm := v1.TopologySelectorTerm{MatchLabelExpressions: []v1.TopologySelectorLabelRequirement{
|
||||
{
|
||||
Key: v1.LabelTopologyZone,
|
||||
@ -111,7 +114,7 @@ func TestTranslatevSphereInTreeStorageClassToCSI(t *testing.T) {
|
||||
}
|
||||
for _, tc := range cases {
|
||||
t.Logf("Testing %v", tc.name)
|
||||
got, err := translator.TranslateInTreeStorageClassToCSI(tc.sc)
|
||||
got, err := translator.TranslateInTreeStorageClassToCSI(logger, tc.sc)
|
||||
if err != nil && !tc.expErr {
|
||||
t.Errorf("Did not expect error but got: %v", err)
|
||||
}
|
||||
@ -291,6 +294,7 @@ func TestTranslateVSphereCSIPVToInTree(t *testing.T) {
|
||||
|
||||
func TestTranslateVSphereInTreePVToCSI(t *testing.T) {
|
||||
translator := NewvSphereCSITranslator()
|
||||
logger, _ := ktesting.NewTestContext(t)
|
||||
cases := []struct {
|
||||
name string
|
||||
intreePV *v1.PersistentVolume
|
||||
@ -462,7 +466,7 @@ func TestTranslateVSphereInTreePVToCSI(t *testing.T) {
|
||||
|
||||
for _, tc := range cases {
|
||||
t.Logf("Testing %v", tc.name)
|
||||
got, err := translator.TranslateInTreePVToCSI(tc.intreePV)
|
||||
got, err := translator.TranslateInTreePVToCSI(logger, tc.intreePV)
|
||||
if err != nil && !tc.expErr {
|
||||
t.Errorf("Did not expect error but got: %v", err)
|
||||
}
|
||||
@ -479,6 +483,7 @@ func TestTranslateVSphereInTreePVToCSI(t *testing.T) {
|
||||
|
||||
func TestTranslatevSphereInTreeInlineVolumeToCSI(t *testing.T) {
|
||||
translator := NewvSphereCSITranslator()
|
||||
logger, _ := ktesting.NewTestContext(t)
|
||||
cases := []struct {
|
||||
name string
|
||||
inlinevolume *v1.Volume
|
||||
@ -523,7 +528,7 @@ func TestTranslatevSphereInTreeInlineVolumeToCSI(t *testing.T) {
|
||||
|
||||
for _, tc := range cases {
|
||||
t.Logf("Testing %v", tc.name)
|
||||
got, err := translator.TranslateInTreeInlineVolumeToCSI(tc.inlinevolume, "")
|
||||
got, err := translator.TranslateInTreeInlineVolumeToCSI(logger, tc.inlinevolume, "")
|
||||
if err == nil && tc.expErr {
|
||||
t.Errorf("Expected error, but did not get one.")
|
||||
continue
|
||||
|
@ -23,6 +23,7 @@ import (
|
||||
v1 "k8s.io/api/core/v1"
|
||||
storage "k8s.io/api/storage/v1"
|
||||
"k8s.io/csi-translation-lib/plugins"
|
||||
"k8s.io/klog/v2"
|
||||
)
|
||||
|
||||
var (
|
||||
@ -50,11 +51,11 @@ func New() CSITranslator {
|
||||
|
||||
// TranslateInTreeStorageClassToCSI takes in-tree Storage Class
|
||||
// and translates it to a set of parameters consumable by CSI plugin
|
||||
func (CSITranslator) TranslateInTreeStorageClassToCSI(inTreePluginName string, sc *storage.StorageClass) (*storage.StorageClass, error) {
|
||||
func (CSITranslator) TranslateInTreeStorageClassToCSI(logger klog.Logger, inTreePluginName string, sc *storage.StorageClass) (*storage.StorageClass, error) {
|
||||
newSC := sc.DeepCopy()
|
||||
for _, curPlugin := range inTreePlugins {
|
||||
if inTreePluginName == curPlugin.GetInTreePluginName() {
|
||||
return curPlugin.TranslateInTreeStorageClassToCSI(newSC)
|
||||
return curPlugin.TranslateInTreeStorageClassToCSI(logger, newSC)
|
||||
}
|
||||
}
|
||||
return nil, fmt.Errorf("could not find in-tree storage class parameter translation logic for %#v", inTreePluginName)
|
||||
@ -63,13 +64,13 @@ func (CSITranslator) TranslateInTreeStorageClassToCSI(inTreePluginName string, s
|
||||
// TranslateInTreeInlineVolumeToCSI takes a inline volume and will translate
|
||||
// the in-tree volume source to a CSIPersistentVolumeSource (wrapped in a PV)
|
||||
// if the translation logic has been implemented.
|
||||
func (CSITranslator) TranslateInTreeInlineVolumeToCSI(volume *v1.Volume, podNamespace string) (*v1.PersistentVolume, error) {
|
||||
func (CSITranslator) TranslateInTreeInlineVolumeToCSI(logger klog.Logger, volume *v1.Volume, podNamespace string) (*v1.PersistentVolume, error) {
|
||||
if volume == nil {
|
||||
return nil, fmt.Errorf("persistent volume was nil")
|
||||
}
|
||||
for _, curPlugin := range inTreePlugins {
|
||||
if curPlugin.CanSupportInline(volume) {
|
||||
pv, err := curPlugin.TranslateInTreeInlineVolumeToCSI(volume, podNamespace)
|
||||
pv, err := curPlugin.TranslateInTreeInlineVolumeToCSI(logger, volume, podNamespace)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
@ -92,14 +93,14 @@ func (CSITranslator) TranslateInTreeInlineVolumeToCSI(volume *v1.Volume, podName
|
||||
// the in-tree source to a CSI Source if the translation logic
|
||||
// has been implemented. The input persistent volume will not
|
||||
// be modified
|
||||
func (CSITranslator) TranslateInTreePVToCSI(pv *v1.PersistentVolume) (*v1.PersistentVolume, error) {
|
||||
func (CSITranslator) TranslateInTreePVToCSI(logger klog.Logger, pv *v1.PersistentVolume) (*v1.PersistentVolume, error) {
|
||||
if pv == nil {
|
||||
return nil, errors.New("persistent volume was nil")
|
||||
}
|
||||
copiedPV := pv.DeepCopy()
|
||||
for _, curPlugin := range inTreePlugins {
|
||||
if curPlugin.CanSupport(copiedPV) {
|
||||
return curPlugin.TranslateInTreePVToCSI(copiedPV)
|
||||
return curPlugin.TranslateInTreePVToCSI(logger, copiedPV)
|
||||
}
|
||||
}
|
||||
return nil, fmt.Errorf("could not find in-tree plugin translation logic for %#v", copiedPV.Name)
|
||||
|
@ -25,6 +25,8 @@ import (
|
||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||
"k8s.io/apimachinery/pkg/util/uuid"
|
||||
"k8s.io/csi-translation-lib/plugins"
|
||||
"k8s.io/klog/v2/ktesting"
|
||||
_ "k8s.io/klog/v2/ktesting/init"
|
||||
)
|
||||
|
||||
var (
|
||||
@ -45,6 +47,7 @@ var (
|
||||
)
|
||||
|
||||
func TestTranslationStability(t *testing.T) {
|
||||
logger, _ := ktesting.NewTestContext(t)
|
||||
testCases := []struct {
|
||||
name string
|
||||
pv *v1.PersistentVolume
|
||||
@ -84,7 +87,7 @@ func TestTranslationStability(t *testing.T) {
|
||||
for _, test := range testCases {
|
||||
ctl := New()
|
||||
t.Logf("Testing %v", test.name)
|
||||
csiSource, err := ctl.TranslateInTreePVToCSI(test.pv)
|
||||
csiSource, err := ctl.TranslateInTreePVToCSI(logger, test.pv)
|
||||
if err != nil {
|
||||
t.Errorf("Error when translating to CSI: %v", err)
|
||||
}
|
||||
@ -99,6 +102,7 @@ func TestTranslationStability(t *testing.T) {
|
||||
}
|
||||
|
||||
func TestTopologyTranslation(t *testing.T) {
|
||||
logger, _ := ktesting.NewTestContext(t)
|
||||
testCases := []struct {
|
||||
name string
|
||||
key string
|
||||
@ -207,7 +211,7 @@ func TestTopologyTranslation(t *testing.T) {
|
||||
t.Logf("Testing %v", test.name)
|
||||
|
||||
// Translate to CSI PV and check translated node affinity
|
||||
newCSIPV, err := ctl.TranslateInTreePVToCSI(test.pv)
|
||||
newCSIPV, err := ctl.TranslateInTreePVToCSI(logger, test.pv)
|
||||
if err != nil {
|
||||
t.Errorf("Error when translating to CSI: %v", err)
|
||||
}
|
||||
@ -365,12 +369,13 @@ func makeTopology(key string, values ...string) *v1.NodeSelectorRequirement {
|
||||
func TestTranslateInTreeInlineVolumeToCSINameUniqueness(t *testing.T) {
|
||||
for driverName := range inTreePlugins {
|
||||
t.Run(driverName, func(t *testing.T) {
|
||||
logger, _ := ktesting.NewTestContext(t)
|
||||
ctl := New()
|
||||
vs1, err := generateUniqueVolumeSource(driverName)
|
||||
if err != nil {
|
||||
t.Fatalf("Couldn't generate random source: %v", err)
|
||||
}
|
||||
pv1, err := ctl.TranslateInTreeInlineVolumeToCSI(&v1.Volume{
|
||||
pv1, err := ctl.TranslateInTreeInlineVolumeToCSI(logger, &v1.Volume{
|
||||
VolumeSource: vs1,
|
||||
}, "")
|
||||
if err != nil {
|
||||
@ -380,7 +385,7 @@ func TestTranslateInTreeInlineVolumeToCSINameUniqueness(t *testing.T) {
|
||||
if err != nil {
|
||||
t.Fatalf("Couldn't generate random source: %v", err)
|
||||
}
|
||||
pv2, err := ctl.TranslateInTreeInlineVolumeToCSI(&v1.Volume{
|
||||
pv2, err := ctl.TranslateInTreeInlineVolumeToCSI(logger, &v1.Volume{
|
||||
VolumeSource: vs2,
|
||||
}, "")
|
||||
if err != nil {
|
||||
|
Loading…
Reference in New Issue
Block a user