mirror of
https://github.com/k3s-io/kubernetes.git
synced 2025-07-31 15:25:57 +00:00
Merge pull request #121576 from huww98/opti-aswp
ad controller: optimize populateActualStateOfWorld
This commit is contained in:
commit
755b4e2bc4
@ -34,6 +34,7 @@ import (
|
||||
"k8s.io/apimachinery/pkg/labels"
|
||||
"k8s.io/apimachinery/pkg/types"
|
||||
"k8s.io/apimachinery/pkg/util/runtime"
|
||||
"k8s.io/apimachinery/pkg/util/sets"
|
||||
"k8s.io/apimachinery/pkg/util/wait"
|
||||
utilfeature "k8s.io/apiserver/pkg/util/feature"
|
||||
coreinformers "k8s.io/client-go/informers/core/v1"
|
||||
@ -384,6 +385,8 @@ func (adc *attachDetachController) populateActualStateOfWorld(logger klog.Logger
|
||||
|
||||
for _, node := range nodes {
|
||||
nodeName := types.NodeName(node.Name)
|
||||
volumesInUse := sets.New(node.Status.VolumesInUse...)
|
||||
|
||||
for _, attachedVolume := range node.Status.VolumesAttached {
|
||||
uniqueName := attachedVolume.Name
|
||||
// The nil VolumeSpec is safe only in the case the volume is not in use by any pod.
|
||||
@ -396,9 +399,13 @@ func (adc *attachDetachController) populateActualStateOfWorld(logger klog.Logger
|
||||
logger.Error(err, "Failed to mark the volume as attached")
|
||||
continue
|
||||
}
|
||||
adc.processVolumesInUse(logger, nodeName, node.Status.VolumesInUse)
|
||||
adc.addNodeToDswp(node, types.NodeName(node.Name))
|
||||
inUse := volumesInUse.Has(uniqueName)
|
||||
err = adc.actualStateOfWorld.SetVolumeMountedByNode(logger, uniqueName, nodeName, inUse)
|
||||
if err != nil {
|
||||
logger.Error(err, "Failed to set volume mounted by node")
|
||||
}
|
||||
}
|
||||
adc.addNodeToDswp(node, types.NodeName(node.Name))
|
||||
}
|
||||
err = adc.processVolumeAttachments(logger)
|
||||
if err != nil {
|
||||
|
@ -26,7 +26,9 @@ import (
|
||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||
"k8s.io/apimachinery/pkg/labels"
|
||||
"k8s.io/apimachinery/pkg/types"
|
||||
"k8s.io/apimachinery/pkg/util/sets"
|
||||
"k8s.io/client-go/informers"
|
||||
"k8s.io/client-go/kubernetes/fake"
|
||||
kcache "k8s.io/client-go/tools/cache"
|
||||
"k8s.io/klog/v2/ktesting"
|
||||
"k8s.io/kubernetes/pkg/controller"
|
||||
@ -73,44 +75,42 @@ func Test_NewAttachDetachController_Positive(t *testing.T) {
|
||||
}
|
||||
}
|
||||
|
||||
func Test_AttachDetachControllerStateOfWolrdPopulators_Positive(t *testing.T) {
|
||||
func Test_AttachDetachControllerStateOfWorldPopulators_Positive(t *testing.T) {
|
||||
// Arrange
|
||||
fakeKubeClient := controllervolumetesting.CreateTestClient()
|
||||
informerFactory := informers.NewSharedInformerFactory(fakeKubeClient, controller.NoResyncPeriodFunc())
|
||||
podInformer := informerFactory.Core().V1().Pods()
|
||||
nodeInformer := informerFactory.Core().V1().Nodes()
|
||||
pvcInformer := informerFactory.Core().V1().PersistentVolumeClaims()
|
||||
pvInformer := informerFactory.Core().V1().PersistentVolumes()
|
||||
volumeAttachmentInformer := informerFactory.Storage().V1().VolumeAttachments()
|
||||
|
||||
adc := &attachDetachController{
|
||||
kubeClient: fakeKubeClient,
|
||||
pvcLister: pvcInformer.Lister(),
|
||||
pvcsSynced: pvcInformer.Informer().HasSynced,
|
||||
pvLister: pvInformer.Lister(),
|
||||
pvsSynced: pvInformer.Informer().HasSynced,
|
||||
podLister: podInformer.Lister(),
|
||||
podsSynced: podInformer.Informer().HasSynced,
|
||||
nodeLister: nodeInformer.Lister(),
|
||||
nodesSynced: nodeInformer.Informer().HasSynced,
|
||||
volumeAttachmentLister: volumeAttachmentInformer.Lister(),
|
||||
volumeAttachmentSynced: volumeAttachmentInformer.Informer().HasSynced,
|
||||
cloud: nil,
|
||||
logger, ctx := ktesting.NewTestContext(t)
|
||||
ctx, cancel := context.WithCancel(ctx)
|
||||
defer cancel()
|
||||
adcObj, err := NewAttachDetachController(
|
||||
logger,
|
||||
fakeKubeClient,
|
||||
informerFactory.Core().V1().Pods(),
|
||||
informerFactory.Core().V1().Nodes(),
|
||||
informerFactory.Core().V1().PersistentVolumeClaims(),
|
||||
informerFactory.Core().V1().PersistentVolumes(),
|
||||
informerFactory.Storage().V1().CSINodes(),
|
||||
informerFactory.Storage().V1().CSIDrivers(),
|
||||
informerFactory.Storage().V1().VolumeAttachments(),
|
||||
nil, /* cloud */
|
||||
controllervolumetesting.CreateTestPlugin(),
|
||||
nil, /* prober */
|
||||
false,
|
||||
5*time.Second,
|
||||
DefaultTimerConfig,
|
||||
)
|
||||
|
||||
if err != nil {
|
||||
t.Fatalf("Run failed with error. Expected: <no error> Actual: <%v>", err)
|
||||
}
|
||||
adc := adcObj.(*attachDetachController)
|
||||
|
||||
// Act
|
||||
plugins := controllervolumetesting.CreateTestPlugin()
|
||||
var prober volume.DynamicPluginProber = nil // TODO (#51147) inject mock
|
||||
informerFactory.Start(ctx.Done())
|
||||
informerFactory.WaitForCacheSync(ctx.Done())
|
||||
|
||||
if err := adc.volumePluginMgr.InitPlugins(plugins, prober, adc); err != nil {
|
||||
t.Fatalf("Could not initialize volume plugins for Attach/Detach Controller: %+v", err)
|
||||
}
|
||||
|
||||
adc.actualStateOfWorld = cache.NewActualStateOfWorld(&adc.volumePluginMgr)
|
||||
adc.desiredStateOfWorld = cache.NewDesiredStateOfWorld(&adc.volumePluginMgr)
|
||||
|
||||
logger, _ := ktesting.NewTestContext(t)
|
||||
err := adc.populateActualStateOfWorld(logger)
|
||||
err = adc.populateActualStateOfWorld(logger)
|
||||
if err != nil {
|
||||
t.Fatalf("Run failed with error. Expected: <no error> Actual: <%v>", err)
|
||||
}
|
||||
@ -128,11 +128,22 @@ func Test_AttachDetachControllerStateOfWolrdPopulators_Positive(t *testing.T) {
|
||||
|
||||
for _, node := range nodes {
|
||||
nodeName := types.NodeName(node.Name)
|
||||
inUseVolumes := sets.New(node.Status.VolumesInUse...)
|
||||
allAttachedVolumes := map[v1.UniqueVolumeName]cache.AttachedVolume{}
|
||||
for _, v := range adc.actualStateOfWorld.GetAttachedVolumesForNode(nodeName) {
|
||||
allAttachedVolumes[v.VolumeName] = v
|
||||
}
|
||||
|
||||
for _, attachedVolume := range node.Status.VolumesAttached {
|
||||
attachedState := adc.actualStateOfWorld.GetAttachState(attachedVolume.Name, nodeName)
|
||||
if attachedState != cache.AttachStateAttached {
|
||||
t.Fatalf("Run failed with error. Node %s, volume %s not found", nodeName, attachedVolume.Name)
|
||||
}
|
||||
inUse := inUseVolumes.Has(attachedVolume.Name)
|
||||
mounted := allAttachedVolumes[attachedVolume.Name].MountedByNode
|
||||
if mounted != inUse {
|
||||
t.Fatalf("Node %s, volume %s MountedByNode %v unexpected", nodeName, attachedVolume.Name, mounted)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@ -152,6 +163,84 @@ func Test_AttachDetachControllerStateOfWolrdPopulators_Positive(t *testing.T) {
|
||||
}
|
||||
}
|
||||
|
||||
func BenchmarkPopulateActualStateOfWorld(b *testing.B) {
|
||||
// Arrange
|
||||
fakeKubeClient := fake.NewSimpleClientset()
|
||||
|
||||
// populate 10000 nodes, each with 100 volumes
|
||||
for i := 0; i < 10000; i++ {
|
||||
nodeName := fmt.Sprintf("node-%d", i)
|
||||
node := &v1.Node{
|
||||
ObjectMeta: metav1.ObjectMeta{
|
||||
Name: nodeName,
|
||||
Labels: map[string]string{
|
||||
"name": nodeName,
|
||||
},
|
||||
Annotations: map[string]string{
|
||||
util.ControllerManagedAttachAnnotation: "true",
|
||||
},
|
||||
},
|
||||
}
|
||||
for j := 0; j < 100; j++ {
|
||||
volumeName := v1.UniqueVolumeName(fmt.Sprintf("test-volume/vol-%d-%d", i, j))
|
||||
node.Status.VolumesAttached = append(node.Status.VolumesAttached, v1.AttachedVolume{
|
||||
Name: volumeName,
|
||||
DevicePath: fmt.Sprintf("/dev/disk/by-id/vol-%d-%d", i, j),
|
||||
})
|
||||
node.Status.VolumesInUse = append(node.Status.VolumesInUse, volumeName)
|
||||
_, err := fakeKubeClient.CoreV1().PersistentVolumes().Create(context.Background(), &v1.PersistentVolume{
|
||||
ObjectMeta: metav1.ObjectMeta{
|
||||
Name: fmt.Sprintf("vol-%d-%d", i, j),
|
||||
},
|
||||
}, metav1.CreateOptions{})
|
||||
if err != nil {
|
||||
b.Fatalf("failed to create PV: %v", err)
|
||||
}
|
||||
}
|
||||
_, err := fakeKubeClient.CoreV1().Nodes().Create(context.Background(), node, metav1.CreateOptions{})
|
||||
if err != nil {
|
||||
b.Fatalf("failed to create node: %v", err)
|
||||
}
|
||||
}
|
||||
informerFactory := informers.NewSharedInformerFactory(fakeKubeClient, controller.NoResyncPeriodFunc())
|
||||
|
||||
logger, ctx := ktesting.NewTestContext(b)
|
||||
ctx, cancel := context.WithCancel(ctx)
|
||||
defer cancel()
|
||||
adcObj, err := NewAttachDetachController(
|
||||
logger,
|
||||
fakeKubeClient,
|
||||
informerFactory.Core().V1().Pods(),
|
||||
informerFactory.Core().V1().Nodes(),
|
||||
informerFactory.Core().V1().PersistentVolumeClaims(),
|
||||
informerFactory.Core().V1().PersistentVolumes(),
|
||||
informerFactory.Storage().V1().CSINodes(),
|
||||
informerFactory.Storage().V1().CSIDrivers(),
|
||||
informerFactory.Storage().V1().VolumeAttachments(),
|
||||
nil, /* cloud */
|
||||
nil, /* plugins */
|
||||
nil, /* prober */
|
||||
false,
|
||||
5*time.Second,
|
||||
DefaultTimerConfig,
|
||||
)
|
||||
|
||||
if err != nil {
|
||||
b.Fatalf("Run failed with error. Expected: <no error> Actual: <%v>", err)
|
||||
}
|
||||
adc := adcObj.(*attachDetachController)
|
||||
|
||||
// Act
|
||||
informerFactory.Start(ctx.Done())
|
||||
informerFactory.WaitForCacheSync(ctx.Done())
|
||||
|
||||
b.ResetTimer()
|
||||
err = adc.populateActualStateOfWorld(logger)
|
||||
if err != nil {
|
||||
b.Fatalf("Run failed with error. Expected: <no error> Actual: <%v>", err)
|
||||
}
|
||||
}
|
||||
|
||||
func Test_AttachDetachControllerRecovery(t *testing.T) {
|
||||
attachDetachRecoveryTestCase(t, []*v1.Pod{}, []*v1.Pod{})
|
||||
newPod1 := controllervolumetesting.NewPodWithVolume("newpod-1", "volumeName2", "mynode-1")
|
||||
|
@ -155,8 +155,9 @@ func CreateTestClient() *fake.Clientset {
|
||||
// We want also the "mynode" node since all the testing pods live there
|
||||
nodeName = nodeNamePrefix
|
||||
}
|
||||
attachVolumeToNode(nodes, "lostVolumeName", nodeName)
|
||||
attachVolumeToNode(nodes, "lostVolumeName", nodeName, false)
|
||||
}
|
||||
attachVolumeToNode(nodes, "inUseVolume", nodeNamePrefix, true)
|
||||
fakeClient.AddReactor("update", "nodes", func(action core.Action) (handled bool, ret runtime.Object, err error) {
|
||||
updateAction := action.(core.UpdateAction)
|
||||
node := updateAction.GetObject().(*v1.Node)
|
||||
@ -312,21 +313,18 @@ func NewNFSPV(pvName, volumeName string) *v1.PersistentVolume {
|
||||
}
|
||||
}
|
||||
|
||||
func attachVolumeToNode(nodes *v1.NodeList, volumeName, nodeName string) {
|
||||
func attachVolumeToNode(nodes *v1.NodeList, volumeName, nodeName string, inUse bool) {
|
||||
// if nodeName exists, get the object.. if not create node object
|
||||
var node *v1.Node
|
||||
found := false
|
||||
nodes.Size()
|
||||
for i := range nodes.Items {
|
||||
curNode := nodes.Items[i]
|
||||
curNode := &nodes.Items[i]
|
||||
if curNode.ObjectMeta.Name == nodeName {
|
||||
node = &curNode
|
||||
found = true
|
||||
node = curNode
|
||||
break
|
||||
}
|
||||
}
|
||||
if !found {
|
||||
node = &v1.Node{
|
||||
if node == nil {
|
||||
nodes.Items = append(nodes.Items, v1.Node{
|
||||
ObjectMeta: metav1.ObjectMeta{
|
||||
Name: nodeName,
|
||||
Labels: map[string]string{
|
||||
@ -336,24 +334,19 @@ func attachVolumeToNode(nodes *v1.NodeList, volumeName, nodeName string) {
|
||||
util.ControllerManagedAttachAnnotation: "true",
|
||||
},
|
||||
},
|
||||
Status: v1.NodeStatus{
|
||||
VolumesAttached: []v1.AttachedVolume{
|
||||
{
|
||||
Name: v1.UniqueVolumeName(TestPluginName + "/" + volumeName),
|
||||
DevicePath: "fake/path",
|
||||
},
|
||||
},
|
||||
},
|
||||
}
|
||||
} else {
|
||||
volumeAttached := v1.AttachedVolume{
|
||||
Name: v1.UniqueVolumeName(TestPluginName + "/" + volumeName),
|
||||
DevicePath: "fake/path",
|
||||
}
|
||||
node.Status.VolumesAttached = append(node.Status.VolumesAttached, volumeAttached)
|
||||
})
|
||||
node = &nodes.Items[len(nodes.Items)-1]
|
||||
}
|
||||
uniqueVolumeName := v1.UniqueVolumeName(TestPluginName + "/" + volumeName)
|
||||
volumeAttached := v1.AttachedVolume{
|
||||
Name: uniqueVolumeName,
|
||||
DevicePath: "fake/path",
|
||||
}
|
||||
node.Status.VolumesAttached = append(node.Status.VolumesAttached, volumeAttached)
|
||||
|
||||
nodes.Items = append(nodes.Items, *node)
|
||||
if inUse {
|
||||
node.Status.VolumesInUse = append(node.Status.VolumesInUse, uniqueVolumeName)
|
||||
}
|
||||
}
|
||||
|
||||
type TestPlugin struct {
|
||||
|
Loading…
Reference in New Issue
Block a user