Merge pull request #80084 from bertinatto/cache_csinode

Add separate cache for CSINode
This commit is contained in:
Kubernetes Prow Robot 2019-08-02 04:53:52 -07:00 committed by GitHub
commit 73b1bcba0f
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
13 changed files with 149 additions and 103 deletions

View File

@ -32,17 +32,19 @@ import (
// CSIMaxVolumeLimitChecker defines predicate needed for counting CSI volumes // CSIMaxVolumeLimitChecker defines predicate needed for counting CSI volumes
type CSIMaxVolumeLimitChecker struct { type CSIMaxVolumeLimitChecker struct {
pvInfo PersistentVolumeInfo csiNodeInfo CSINodeInfo
pvcInfo PersistentVolumeClaimInfo pvInfo PersistentVolumeInfo
scInfo StorageClassInfo pvcInfo PersistentVolumeClaimInfo
scInfo StorageClassInfo
randomVolumeIDPrefix string randomVolumeIDPrefix string
} }
// NewCSIMaxVolumeLimitPredicate returns a predicate for counting CSI volumes // NewCSIMaxVolumeLimitPredicate returns a predicate for counting CSI volumes
func NewCSIMaxVolumeLimitPredicate( func NewCSIMaxVolumeLimitPredicate(
pvInfo PersistentVolumeInfo, pvcInfo PersistentVolumeClaimInfo, scInfo StorageClassInfo) FitPredicate { csiNodeInfo CSINodeInfo, pvInfo PersistentVolumeInfo, pvcInfo PersistentVolumeClaimInfo, scInfo StorageClassInfo) FitPredicate {
c := &CSIMaxVolumeLimitChecker{ c := &CSIMaxVolumeLimitChecker{
csiNodeInfo: csiNodeInfo,
pvInfo: pvInfo, pvInfo: pvInfo,
pvcInfo: pvcInfo, pvcInfo: pvcInfo,
scInfo: scInfo, scInfo: scInfo,
@ -51,6 +53,22 @@ func NewCSIMaxVolumeLimitPredicate(
return c.attachableLimitPredicate return c.attachableLimitPredicate
} }
func (c *CSIMaxVolumeLimitChecker) getVolumeLimits(nodeInfo *schedulernodeinfo.NodeInfo, csiNode *storagev1beta1.CSINode) map[v1.ResourceName]int64 {
// TODO: stop getting values from Node object in v1.18
nodeVolumeLimits := nodeInfo.VolumeLimits()
if csiNode != nil {
for i := range csiNode.Spec.Drivers {
d := csiNode.Spec.Drivers[i]
if d.Allocatable != nil && d.Allocatable.Count != nil {
// TODO: drop GetCSIAttachLimitKey once we don't get values from Node object (v1.18)
k := v1.ResourceName(volumeutil.GetCSIAttachLimitKey(d.Name))
nodeVolumeLimits[k] = int64(*d.Allocatable.Count)
}
}
}
return nodeVolumeLimits
}
func (c *CSIMaxVolumeLimitChecker) attachableLimitPredicate( func (c *CSIMaxVolumeLimitChecker) attachableLimitPredicate(
pod *v1.Pod, meta PredicateMetadata, nodeInfo *schedulernodeinfo.NodeInfo) (bool, []PredicateFailureReason, error) { pod *v1.Pod, meta PredicateMetadata, nodeInfo *schedulernodeinfo.NodeInfo) (bool, []PredicateFailureReason, error) {
// If the new pod doesn't have any volume attached to it, the predicate will always be true // If the new pod doesn't have any volume attached to it, the predicate will always be true
@ -62,8 +80,20 @@ func (c *CSIMaxVolumeLimitChecker) attachableLimitPredicate(
return true, nil, nil return true, nil, nil
} }
node := nodeInfo.Node()
if node == nil {
return false, nil, fmt.Errorf("node not found")
}
// If CSINode doesn't exist, the predicate may read the limits from Node object
csiNode, err := c.csiNodeInfo.GetCSINodeInfo(node.Name)
if err != nil {
// TODO: return the error once CSINode is created by default (2 releases)
klog.V(5).Infof("Could not get a CSINode object for the node: %v", err)
}
newVolumes := make(map[string]string) newVolumes := make(map[string]string)
if err := c.filterAttachableVolumes(nodeInfo, pod.Spec.Volumes, pod.Namespace, newVolumes); err != nil { if err := c.filterAttachableVolumes(csiNode, pod.Spec.Volumes, pod.Namespace, newVolumes); err != nil {
return false, nil, err return false, nil, err
} }
@ -73,14 +103,14 @@ func (c *CSIMaxVolumeLimitChecker) attachableLimitPredicate(
} }
// If the node doesn't have volume limits, the predicate will always be true // If the node doesn't have volume limits, the predicate will always be true
nodeVolumeLimits := nodeInfo.VolumeLimits() nodeVolumeLimits := c.getVolumeLimits(nodeInfo, csiNode)
if len(nodeVolumeLimits) == 0 { if len(nodeVolumeLimits) == 0 {
return true, nil, nil return true, nil, nil
} }
attachedVolumes := make(map[string]string) attachedVolumes := make(map[string]string)
for _, existingPod := range nodeInfo.Pods() { for _, existingPod := range nodeInfo.Pods() {
if err := c.filterAttachableVolumes(nodeInfo, existingPod.Spec.Volumes, existingPod.Namespace, attachedVolumes); err != nil { if err := c.filterAttachableVolumes(csiNode, existingPod.Spec.Volumes, existingPod.Namespace, attachedVolumes); err != nil {
return false, nil, err return false, nil, err
} }
} }
@ -113,8 +143,7 @@ func (c *CSIMaxVolumeLimitChecker) attachableLimitPredicate(
} }
func (c *CSIMaxVolumeLimitChecker) filterAttachableVolumes( func (c *CSIMaxVolumeLimitChecker) filterAttachableVolumes(
nodeInfo *schedulernodeinfo.NodeInfo, volumes []v1.Volume, namespace string, result map[string]string) error { csiNode *storagev1beta1.CSINode, volumes []v1.Volume, namespace string, result map[string]string) error {
for _, vol := range volumes { for _, vol := range volumes {
// CSI volumes can only be used as persistent volumes // CSI volumes can only be used as persistent volumes
if vol.PersistentVolumeClaim == nil { if vol.PersistentVolumeClaim == nil {
@ -133,7 +162,6 @@ func (c *CSIMaxVolumeLimitChecker) filterAttachableVolumes(
continue continue
} }
csiNode := nodeInfo.CSINode()
driverName, volumeHandle := c.getCSIDriverInfo(csiNode, pvc) driverName, volumeHandle := c.getCSIDriverInfo(csiNode, pvc)
if driverName == "" || volumeHandle == "" { if driverName == "" || volumeHandle == "" {
klog.V(5).Infof("Could not find a CSI driver name or volume handle, not counting volume") klog.V(5).Infof("Could not find a CSI driver name or volume handle, not counting volume")

View File

@ -23,13 +23,13 @@ import (
"testing" "testing"
v1 "k8s.io/api/core/v1" v1 "k8s.io/api/core/v1"
storagev1beta1 "k8s.io/api/storage/v1beta1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/util/sets" "k8s.io/apimachinery/pkg/util/sets"
utilfeature "k8s.io/apiserver/pkg/util/feature" utilfeature "k8s.io/apiserver/pkg/util/feature"
featuregatetesting "k8s.io/component-base/featuregate/testing" featuregatetesting "k8s.io/component-base/featuregate/testing"
csilibplugins "k8s.io/csi-translation-lib/plugins" csilibplugins "k8s.io/csi-translation-lib/plugins"
"k8s.io/kubernetes/pkg/features" "k8s.io/kubernetes/pkg/features"
schedulernodeinfo "k8s.io/kubernetes/pkg/scheduler/nodeinfo"
) )
const ( const (
@ -437,11 +437,11 @@ func TestCSIVolumeCountPredicate(t *testing.T) {
// running attachable predicate tests with feature gate and limit present on nodes // running attachable predicate tests with feature gate and limit present on nodes
for _, test := range tests { for _, test := range tests {
t.Run(test.test, func(t *testing.T) { t.Run(test.test, func(t *testing.T) {
node := getNodeWithPodAndVolumeLimits(test.limitSource, test.existingPods, int64(test.maxVols), test.driverNames...) node, csiNode := getNodeWithPodAndVolumeLimits(test.limitSource, test.existingPods, int64(test.maxVols), test.driverNames...)
if test.migrationEnabled { if test.migrationEnabled {
defer featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.CSIMigration, true)() defer featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.CSIMigration, true)()
defer featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.CSIMigrationAWS, true)() defer featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.CSIMigrationAWS, true)()
enableMigrationOnNode(node, csilibplugins.AWSEBSInTreePluginName) enableMigrationOnNode(csiNode, csilibplugins.AWSEBSInTreePluginName)
} else { } else {
defer featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.CSIMigration, false)() defer featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.CSIMigration, false)()
defer featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.CSIMigrationAWS, false)() defer featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.CSIMigrationAWS, false)()
@ -452,7 +452,8 @@ func TestCSIVolumeCountPredicate(t *testing.T) {
expectedFailureReasons = []PredicateFailureReason{test.expectedFailureReason} expectedFailureReasons = []PredicateFailureReason{test.expectedFailureReason}
} }
pred := NewCSIMaxVolumeLimitPredicate(getFakeCSIPVInfo(test.filterName, test.driverNames...), pred := NewCSIMaxVolumeLimitPredicate(getFakeCSINodeInfo(csiNode),
getFakeCSIPVInfo(test.filterName, test.driverNames...),
getFakeCSIPVCInfo(test.filterName, "csi-sc", test.driverNames...), getFakeCSIPVCInfo(test.filterName, "csi-sc", test.driverNames...),
getFakeCSIStorageClassInfo("csi-sc", test.driverNames[0])) getFakeCSIStorageClassInfo("csi-sc", test.driverNames[0]))
@ -544,8 +545,7 @@ func getFakeCSIPVCInfo(volumeName, scName string, driverNames ...string) FakePer
return pvcInfos return pvcInfos
} }
func enableMigrationOnNode(nodeInfo *schedulernodeinfo.NodeInfo, pluginName string) { func enableMigrationOnNode(csiNode *storagev1beta1.CSINode, pluginName string) {
csiNode := nodeInfo.CSINode()
nodeInfoAnnotations := csiNode.GetAnnotations() nodeInfoAnnotations := csiNode.GetAnnotations()
if nodeInfoAnnotations == nil { if nodeInfoAnnotations == nil {
nodeInfoAnnotations = map[string]string{} nodeInfoAnnotations = map[string]string{}
@ -557,7 +557,6 @@ func enableMigrationOnNode(nodeInfo *schedulernodeinfo.NodeInfo, pluginName stri
nodeInfoAnnotations[v1.MigratedPluginsAnnotationKey] = nas nodeInfoAnnotations[v1.MigratedPluginsAnnotationKey] = nas
csiNode.Annotations = nodeInfoAnnotations csiNode.Annotations = nodeInfoAnnotations
nodeInfo.SetCSINode(csiNode)
} }
func getFakeCSIStorageClassInfo(scName, provisionerName string) FakeStorageClassInfo { func getFakeCSIStorageClassInfo(scName, provisionerName string) FakeStorageClassInfo {
@ -568,3 +567,10 @@ func getFakeCSIStorageClassInfo(scName, provisionerName string) FakeStorageClass
}, },
} }
} }
func getFakeCSINodeInfo(csiNode *storagev1beta1.CSINode) FakeCSINodeInfo {
if csiNode != nil {
return FakeCSINodeInfo(*csiNode)
}
return FakeCSINodeInfo{}
}

View File

@ -835,12 +835,14 @@ func TestVolumeCountConflicts(t *testing.T) {
// running attachable predicate tests without feature gate and no limit present on nodes // running attachable predicate tests without feature gate and no limit present on nodes
for _, test := range tests { for _, test := range tests {
os.Setenv(KubeMaxPDVols, strconv.Itoa(test.maxVols)) os.Setenv(KubeMaxPDVols, strconv.Itoa(test.maxVols))
node, csiNode := getNodeWithPodAndVolumeLimits("node", test.existingPods, int64(test.maxVols), test.filterName)
pred := NewMaxPDVolumeCountPredicate(test.filterName, pred := NewMaxPDVolumeCountPredicate(test.filterName,
getFakeCSINodeInfo(csiNode),
getFakeStorageClassInfo(test.filterName), getFakeStorageClassInfo(test.filterName),
getFakePVInfo(test.filterName), getFakePVInfo(test.filterName),
getFakePVCInfo(test.filterName)) getFakePVCInfo(test.filterName))
fits, reasons, err := pred(test.newPod, GetPredicateMetadata(test.newPod, nil), schedulernodeinfo.NewNodeInfo(test.existingPods...)) fits, reasons, err := pred(test.newPod, GetPredicateMetadata(test.newPod, nil), node)
if err != nil { if err != nil {
t.Errorf("[%s]%s: unexpected error: %v", test.filterName, test.test, err) t.Errorf("[%s]%s: unexpected error: %v", test.filterName, test.test, err)
} }
@ -856,8 +858,9 @@ func TestVolumeCountConflicts(t *testing.T) {
// running attachable predicate tests with feature gate and limit present on nodes // running attachable predicate tests with feature gate and limit present on nodes
for _, test := range tests { for _, test := range tests {
node := getNodeWithPodAndVolumeLimits("node", test.existingPods, int64(test.maxVols), test.filterName) node, csiNode := getNodeWithPodAndVolumeLimits("node", test.existingPods, int64(test.maxVols), test.filterName)
pred := NewMaxPDVolumeCountPredicate(test.filterName, pred := NewMaxPDVolumeCountPredicate(test.filterName,
getFakeCSINodeInfo(csiNode),
getFakeStorageClassInfo(test.filterName), getFakeStorageClassInfo(test.filterName),
getFakePVInfo(test.filterName), getFakePVInfo(test.filterName),
getFakePVCInfo(test.filterName)) getFakePVCInfo(test.filterName))
@ -1048,23 +1051,24 @@ func TestMaxVolumeFuncM4(t *testing.T) {
} }
} }
func getNodeWithPodAndVolumeLimits(limitSource string, pods []*v1.Pod, limit int64, driverNames ...string) *schedulernodeinfo.NodeInfo { func getNodeWithPodAndVolumeLimits(limitSource string, pods []*v1.Pod, limit int64, driverNames ...string) (*schedulernodeinfo.NodeInfo, *v1beta1.CSINode) {
nodeInfo := schedulernodeinfo.NewNodeInfo(pods...) nodeInfo := schedulernodeinfo.NewNodeInfo(pods...)
node := &v1.Node{
ObjectMeta: metav1.ObjectMeta{Name: "node-for-max-pd-test-1"},
Status: v1.NodeStatus{
Allocatable: v1.ResourceList{},
},
}
var csiNode *v1beta1.CSINode
addLimitToNode := func() { addLimitToNode := func() {
node := &v1.Node{
ObjectMeta: metav1.ObjectMeta{Name: "node-for-max-pd-test-1"},
Status: v1.NodeStatus{
Allocatable: v1.ResourceList{},
},
}
for _, driver := range driverNames { for _, driver := range driverNames {
node.Status.Allocatable[getVolumeLimitKey(driver)] = *resource.NewQuantity(limit, resource.DecimalSI) node.Status.Allocatable[getVolumeLimitKey(driver)] = *resource.NewQuantity(limit, resource.DecimalSI)
} }
nodeInfo.SetNode(node)
} }
createCSINode := func() *v1beta1.CSINode { initCSINode := func() {
return &v1beta1.CSINode{ csiNode = &v1beta1.CSINode{
ObjectMeta: metav1.ObjectMeta{Name: "csi-node-for-max-pd-test-1"}, ObjectMeta: metav1.ObjectMeta{Name: "csi-node-for-max-pd-test-1"},
Spec: v1beta1.CSINodeSpec{ Spec: v1beta1.CSINodeSpec{
Drivers: []v1beta1.CSINodeDriver{}, Drivers: []v1beta1.CSINodeDriver{},
@ -1072,8 +1076,8 @@ func getNodeWithPodAndVolumeLimits(limitSource string, pods []*v1.Pod, limit int
} }
} }
addLimitToCSINode := func(addLimits bool) { addDriversCSINode := func(addLimits bool) {
csiNode := createCSINode() initCSINode()
for _, driver := range driverNames { for _, driver := range driverNames {
driver := v1beta1.CSINodeDriver{ driver := v1beta1.CSINodeDriver{
Name: driver, Name: driver,
@ -1086,26 +1090,26 @@ func getNodeWithPodAndVolumeLimits(limitSource string, pods []*v1.Pod, limit int
} }
csiNode.Spec.Drivers = append(csiNode.Spec.Drivers, driver) csiNode.Spec.Drivers = append(csiNode.Spec.Drivers, driver)
} }
nodeInfo.SetCSINode(csiNode)
} }
switch limitSource { switch limitSource {
case "node": case "node":
addLimitToNode() addLimitToNode()
case "csinode": case "csinode":
addLimitToCSINode(true) addDriversCSINode(true)
case "both": case "both":
addLimitToNode() addLimitToNode()
addLimitToCSINode(true) addDriversCSINode(true)
case "csinode-with-no-limit": case "csinode-with-no-limit":
addLimitToCSINode(false) addDriversCSINode(false)
case "no-csi-driver": case "no-csi-driver":
csiNode := createCSINode() initCSINode()
nodeInfo.SetCSINode(csiNode)
default: default:
return nodeInfo // Do nothing.
} }
return nodeInfo
nodeInfo.SetNode(node)
return nodeInfo, csiNode
} }
func getVolumeLimitKey(filterType string) v1.ResourceName { func getVolumeLimitKey(filterType string) v1.ResourceName {

View File

@ -162,6 +162,11 @@ type NodeInfo interface {
GetNodeInfo(nodeID string) (*v1.Node, error) GetNodeInfo(nodeID string) (*v1.Node, error)
} }
// CSINodeInfo interface represents anything that can get CSINode object from node ID.
type CSINodeInfo interface {
GetCSINodeInfo(nodeName string) (*storagev1beta1.CSINode, error)
}
// PersistentVolumeInfo interface represents anything that can get persistent volume object by PV ID. // PersistentVolumeInfo interface represents anything that can get persistent volume object by PV ID.
type PersistentVolumeInfo interface { type PersistentVolumeInfo interface {
GetPersistentVolumeInfo(pvID string) (*v1.PersistentVolume, error) GetPersistentVolumeInfo(pvID string) (*v1.PersistentVolume, error)
@ -285,6 +290,7 @@ type MaxPDVolumeCountChecker struct {
filter VolumeFilter filter VolumeFilter
volumeLimitKey v1.ResourceName volumeLimitKey v1.ResourceName
maxVolumeFunc func(node *v1.Node) int maxVolumeFunc func(node *v1.Node) int
csiNodeInfo CSINodeInfo
pvInfo PersistentVolumeInfo pvInfo PersistentVolumeInfo
pvcInfo PersistentVolumeClaimInfo pvcInfo PersistentVolumeClaimInfo
scInfo StorageClassInfo scInfo StorageClassInfo
@ -316,8 +322,8 @@ type VolumeFilter struct {
// The predicate looks for both volumes used directly, as well as PVC volumes that are backed by relevant volume // The predicate looks for both volumes used directly, as well as PVC volumes that are backed by relevant volume
// types, counts the number of unique volumes, and rejects the new pod if it would place the total count over // types, counts the number of unique volumes, and rejects the new pod if it would place the total count over
// the maximum. // the maximum.
func NewMaxPDVolumeCountPredicate( func NewMaxPDVolumeCountPredicate(filterName string, csiNodeInfo CSINodeInfo, scInfo StorageClassInfo,
filterName string, scInfo StorageClassInfo, pvInfo PersistentVolumeInfo, pvcInfo PersistentVolumeClaimInfo) FitPredicate { pvInfo PersistentVolumeInfo, pvcInfo PersistentVolumeClaimInfo) FitPredicate {
var filter VolumeFilter var filter VolumeFilter
var volumeLimitKey v1.ResourceName var volumeLimitKey v1.ResourceName
@ -345,6 +351,7 @@ func NewMaxPDVolumeCountPredicate(
filter: filter, filter: filter,
volumeLimitKey: volumeLimitKey, volumeLimitKey: volumeLimitKey,
maxVolumeFunc: getMaxVolumeFunc(filterName), maxVolumeFunc: getMaxVolumeFunc(filterName),
csiNodeInfo: csiNodeInfo,
pvInfo: pvInfo, pvInfo: pvInfo,
pvcInfo: pvcInfo, pvcInfo: pvcInfo,
scInfo: scInfo, scInfo: scInfo,
@ -492,8 +499,20 @@ func (c *MaxPDVolumeCountChecker) predicate(pod *v1.Pod, meta PredicateMetadata,
return true, nil, nil return true, nil, nil
} }
// If a plugin has been migrated to a CSI driver, defer to the CSI predicate. node := nodeInfo.Node()
if c.filter.IsMigrated(nodeInfo.CSINode()) { if node == nil {
return false, nil, fmt.Errorf("node not found")
}
csiNode, err := c.csiNodeInfo.GetCSINodeInfo(node.Name)
if err != nil {
// we don't fail here because the CSINode object is only necessary
// for determining whether the migration is enabled or not
klog.V(5).Infof("Could not get a CSINode object for the node: %v", err)
}
// if a plugin has been migrated to a CSI driver, defer to the CSI predicate
if c.filter.IsMigrated(csiNode) {
return true, nil, nil return true, nil, nil
} }
@ -514,7 +533,7 @@ func (c *MaxPDVolumeCountChecker) predicate(pod *v1.Pod, meta PredicateMetadata,
} }
numNewVolumes := len(newVolumes) numNewVolumes := len(newVolumes)
maxAttachLimit := c.maxVolumeFunc(nodeInfo.Node()) maxAttachLimit := c.maxVolumeFunc(node)
if utilfeature.DefaultFeatureGate.Enabled(features.AttachVolumeLimit) { if utilfeature.DefaultFeatureGate.Enabled(features.AttachVolumeLimit) {
volumeLimits := nodeInfo.VolumeLimits() volumeLimits := nodeInfo.VolumeLimits()

View File

@ -19,8 +19,9 @@ package predicates
import ( import (
"fmt" "fmt"
"k8s.io/api/core/v1" v1 "k8s.io/api/core/v1"
storagev1 "k8s.io/api/storage/v1" storagev1 "k8s.io/api/storage/v1"
storagev1beta1 "k8s.io/api/storage/v1beta1"
) )
// FakePersistentVolumeClaimInfo declares a []v1.PersistentVolumeClaim type for testing. // FakePersistentVolumeClaimInfo declares a []v1.PersistentVolumeClaim type for testing.
@ -58,6 +59,15 @@ func (nodes FakeNodeListInfo) GetNodeInfo(nodeName string) (*v1.Node, error) {
return nil, fmt.Errorf("Unable to find node: %s", nodeName) return nil, fmt.Errorf("Unable to find node: %s", nodeName)
} }
// FakeCSINodeInfo declares a storagev1beta1.CSINode type for testing.
type FakeCSINodeInfo storagev1beta1.CSINode
// GetCSINodeInfo returns a fake CSINode object.
func (n FakeCSINodeInfo) GetCSINodeInfo(name string) (*storagev1beta1.CSINode, error) {
csiNode := storagev1beta1.CSINode(n)
return &csiNode, nil
}
// FakePersistentVolumeInfo declares a []v1.PersistentVolume type for testing. // FakePersistentVolumeInfo declares a []v1.PersistentVolume type for testing.
type FakePersistentVolumeInfo []v1.PersistentVolume type FakePersistentVolumeInfo []v1.PersistentVolume

View File

@ -61,33 +61,33 @@ func init() {
factory.RegisterFitPredicateFactory( factory.RegisterFitPredicateFactory(
predicates.MaxEBSVolumeCountPred, predicates.MaxEBSVolumeCountPred,
func(args factory.PluginFactoryArgs) predicates.FitPredicate { func(args factory.PluginFactoryArgs) predicates.FitPredicate {
return predicates.NewMaxPDVolumeCountPredicate(predicates.EBSVolumeFilterType, args.StorageClassInfo, args.PVInfo, args.PVCInfo) return predicates.NewMaxPDVolumeCountPredicate(predicates.EBSVolumeFilterType, args.CSINodeInfo, args.StorageClassInfo, args.PVInfo, args.PVCInfo)
}, },
) )
// Fit is determined by whether or not there would be too many GCE PD volumes attached to the node // Fit is determined by whether or not there would be too many GCE PD volumes attached to the node
factory.RegisterFitPredicateFactory( factory.RegisterFitPredicateFactory(
predicates.MaxGCEPDVolumeCountPred, predicates.MaxGCEPDVolumeCountPred,
func(args factory.PluginFactoryArgs) predicates.FitPredicate { func(args factory.PluginFactoryArgs) predicates.FitPredicate {
return predicates.NewMaxPDVolumeCountPredicate(predicates.GCEPDVolumeFilterType, args.StorageClassInfo, args.PVInfo, args.PVCInfo) return predicates.NewMaxPDVolumeCountPredicate(predicates.GCEPDVolumeFilterType, args.CSINodeInfo, args.StorageClassInfo, args.PVInfo, args.PVCInfo)
}, },
) )
// Fit is determined by whether or not there would be too many Azure Disk volumes attached to the node // Fit is determined by whether or not there would be too many Azure Disk volumes attached to the node
factory.RegisterFitPredicateFactory( factory.RegisterFitPredicateFactory(
predicates.MaxAzureDiskVolumeCountPred, predicates.MaxAzureDiskVolumeCountPred,
func(args factory.PluginFactoryArgs) predicates.FitPredicate { func(args factory.PluginFactoryArgs) predicates.FitPredicate {
return predicates.NewMaxPDVolumeCountPredicate(predicates.AzureDiskVolumeFilterType, args.StorageClassInfo, args.PVInfo, args.PVCInfo) return predicates.NewMaxPDVolumeCountPredicate(predicates.AzureDiskVolumeFilterType, args.CSINodeInfo, args.StorageClassInfo, args.PVInfo, args.PVCInfo)
}, },
) )
factory.RegisterFitPredicateFactory( factory.RegisterFitPredicateFactory(
predicates.MaxCSIVolumeCountPred, predicates.MaxCSIVolumeCountPred,
func(args factory.PluginFactoryArgs) predicates.FitPredicate { func(args factory.PluginFactoryArgs) predicates.FitPredicate {
return predicates.NewCSIMaxVolumeLimitPredicate(args.PVInfo, args.PVCInfo, args.StorageClassInfo) return predicates.NewCSIMaxVolumeLimitPredicate(args.CSINodeInfo, args.PVInfo, args.PVCInfo, args.StorageClassInfo)
}, },
) )
factory.RegisterFitPredicateFactory( factory.RegisterFitPredicateFactory(
predicates.MaxCinderVolumeCountPred, predicates.MaxCinderVolumeCountPred,
func(args factory.PluginFactoryArgs) predicates.FitPredicate { func(args factory.PluginFactoryArgs) predicates.FitPredicate {
return predicates.NewMaxPDVolumeCountPredicate(predicates.CinderVolumeFilterType, args.StorageClassInfo, args.PVInfo, args.PVCInfo) return predicates.NewMaxPDVolumeCountPredicate(predicates.CinderVolumeFilterType, args.CSINodeInfo, args.StorageClassInfo, args.PVInfo, args.PVCInfo)
}, },
) )

View File

@ -559,6 +559,7 @@ func (c *configFactory) getPluginArgs() (*PluginFactoryArgs, error) {
NodeLister: c.schedulerCache, NodeLister: c.schedulerCache,
PDBLister: c.pdbLister, PDBLister: c.pdbLister,
NodeInfo: c.schedulerCache, NodeInfo: c.schedulerCache,
CSINodeInfo: c.schedulerCache,
PVInfo: &predicates.CachedPersistentVolumeInfo{PersistentVolumeLister: c.pVLister}, PVInfo: &predicates.CachedPersistentVolumeInfo{PersistentVolumeLister: c.pVLister},
PVCInfo: &predicates.CachedPersistentVolumeClaimInfo{PersistentVolumeClaimLister: c.pVCLister}, PVCInfo: &predicates.CachedPersistentVolumeClaimInfo{PersistentVolumeClaimLister: c.pVCLister},
StorageClassInfo: &predicates.CachedStorageClassInfo{StorageClassLister: c.storageClassLister}, StorageClassInfo: &predicates.CachedStorageClassInfo{StorageClassLister: c.storageClassLister},

View File

@ -43,6 +43,7 @@ type PluginFactoryArgs struct {
NodeLister algorithm.NodeLister NodeLister algorithm.NodeLister
PDBLister algorithm.PDBLister PDBLister algorithm.PDBLister
NodeInfo predicates.NodeInfo NodeInfo predicates.NodeInfo
CSINodeInfo predicates.CSINodeInfo
PVInfo predicates.PersistentVolumeInfo PVInfo predicates.PersistentVolumeInfo
PVCInfo predicates.PersistentVolumeClaimInfo PVCInfo predicates.PersistentVolumeClaimInfo
StorageClassInfo predicates.StorageClassInfo StorageClassInfo predicates.StorageClassInfo

View File

@ -70,6 +70,7 @@ type schedulerCache struct {
// a map from pod key to podState. // a map from pod key to podState.
podStates map[string]*podState podStates map[string]*podState
nodes map[string]*nodeInfoListItem nodes map[string]*nodeInfoListItem
csiNodes map[string]*storagev1beta1.CSINode
// headNode points to the most recently updated NodeInfo in "nodes". It is the // headNode points to the most recently updated NodeInfo in "nodes". It is the
// head of the linked list. // head of the linked list.
headNode *nodeInfoListItem headNode *nodeInfoListItem
@ -109,6 +110,7 @@ func newSchedulerCache(ttl, period time.Duration, stop <-chan struct{}) *schedul
nodes: make(map[string]*nodeInfoListItem), nodes: make(map[string]*nodeInfoListItem),
nodeTree: newNodeTree(nil), nodeTree: newNodeTree(nil),
csiNodes: make(map[string]*storagev1beta1.CSINode),
assumedPods: make(map[string]bool), assumedPods: make(map[string]bool),
podStates: make(map[string]*podState), podStates: make(map[string]*podState),
imageStates: make(map[string]*imageState), imageStates: make(map[string]*imageState),
@ -574,13 +576,7 @@ func (cache *schedulerCache) AddCSINode(csiNode *storagev1beta1.CSINode) error {
cache.mu.Lock() cache.mu.Lock()
defer cache.mu.Unlock() defer cache.mu.Unlock()
n, ok := cache.nodes[csiNode.Name] cache.csiNodes[csiNode.Name] = csiNode
if !ok {
n = newNodeInfoListItem(schedulernodeinfo.NewNodeInfo())
cache.nodes[csiNode.Name] = n
}
n.info.SetCSINode(csiNode)
cache.moveNodeInfoToHead(csiNode.Name)
return nil return nil
} }
@ -588,13 +584,7 @@ func (cache *schedulerCache) UpdateCSINode(oldCSINode, newCSINode *storagev1beta
cache.mu.Lock() cache.mu.Lock()
defer cache.mu.Unlock() defer cache.mu.Unlock()
n, ok := cache.nodes[newCSINode.Name] cache.csiNodes[newCSINode.Name] = newCSINode
if !ok {
n = newNodeInfoListItem(schedulernodeinfo.NewNodeInfo())
cache.nodes[newCSINode.Name] = n
}
n.info.SetCSINode(newCSINode)
cache.moveNodeInfoToHead(newCSINode.Name)
return nil return nil
} }
@ -602,12 +592,11 @@ func (cache *schedulerCache) RemoveCSINode(csiNode *storagev1beta1.CSINode) erro
cache.mu.Lock() cache.mu.Lock()
defer cache.mu.Unlock() defer cache.mu.Unlock()
n, ok := cache.nodes[csiNode.Name] _, ok := cache.csiNodes[csiNode.Name]
if !ok { if !ok {
return fmt.Errorf("node %v is not found", csiNode.Name) return fmt.Errorf("csinode %v is not found", csiNode.Name)
} }
n.info.SetCSINode(nil) delete(cache.csiNodes, csiNode.Name)
cache.moveNodeInfoToHead(csiNode.Name)
return nil return nil
} }
@ -736,3 +725,15 @@ func (cache *schedulerCache) ListNodes() []*v1.Node {
} }
return nodes return nodes
} }
func (cache *schedulerCache) GetCSINodeInfo(nodeName string) (*storagev1beta1.CSINode, error) {
cache.mu.RLock()
defer cache.mu.RUnlock()
n, ok := cache.csiNodes[nodeName]
if !ok {
return nil, fmt.Errorf("error retrieving csinode '%v' from cache", nodeName)
}
return n, nil
}

View File

@ -114,3 +114,8 @@ func (c *Cache) GetNodeInfo(nodeName string) (*v1.Node, error) {
func (c *Cache) ListNodes() []*v1.Node { func (c *Cache) ListNodes() []*v1.Node {
return nil return nil
} }
// GetCSINodeInfo is a fake method for testing.
func (c *Cache) GetCSINodeInfo(nodeName string) (*storagev1beta1.CSINode, error) {
return nil, nil
}

View File

@ -17,7 +17,7 @@ limitations under the License.
package cache package cache
import ( import (
"k8s.io/api/core/v1" v1 "k8s.io/api/core/v1"
storagev1beta1 "k8s.io/api/storage/v1beta1" storagev1beta1 "k8s.io/api/storage/v1beta1"
"k8s.io/kubernetes/pkg/scheduler/algorithm" "k8s.io/kubernetes/pkg/scheduler/algorithm"
schedulernodeinfo "k8s.io/kubernetes/pkg/scheduler/nodeinfo" schedulernodeinfo "k8s.io/kubernetes/pkg/scheduler/nodeinfo"
@ -115,6 +115,9 @@ type Cache interface {
// GetNodeInfo returns the node object with node string. // GetNodeInfo returns the node object with node string.
GetNodeInfo(nodeName string) (*v1.Node, error) GetNodeInfo(nodeName string) (*v1.Node, error)
// GetCSINodeInfo returns the csinode object with the given name.
GetCSINodeInfo(nodeName string) (*storagev1beta1.CSINode, error)
// Snapshot takes a snapshot on current cache // Snapshot takes a snapshot on current cache
Snapshot() *Snapshot Snapshot() *Snapshot

View File

@ -13,9 +13,7 @@ go_library(
"//pkg/apis/core/v1/helper:go_default_library", "//pkg/apis/core/v1/helper:go_default_library",
"//pkg/features:go_default_library", "//pkg/features:go_default_library",
"//pkg/scheduler/algorithm/priorities/util:go_default_library", "//pkg/scheduler/algorithm/priorities/util:go_default_library",
"//pkg/volume/util:go_default_library",
"//staging/src/k8s.io/api/core/v1:go_default_library", "//staging/src/k8s.io/api/core/v1:go_default_library",
"//staging/src/k8s.io/api/storage/v1beta1:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/api/resource:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/api/resource:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/util/sets:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/util/sets:go_default_library",
"//staging/src/k8s.io/apiserver/pkg/util/feature:go_default_library", "//staging/src/k8s.io/apiserver/pkg/util/feature:go_default_library",

View File

@ -23,14 +23,12 @@ import (
"sync/atomic" "sync/atomic"
v1 "k8s.io/api/core/v1" v1 "k8s.io/api/core/v1"
storagev1beta1 "k8s.io/api/storage/v1beta1"
"k8s.io/apimachinery/pkg/api/resource" "k8s.io/apimachinery/pkg/api/resource"
utilfeature "k8s.io/apiserver/pkg/util/feature" utilfeature "k8s.io/apiserver/pkg/util/feature"
"k8s.io/klog" "k8s.io/klog"
v1helper "k8s.io/kubernetes/pkg/apis/core/v1/helper" v1helper "k8s.io/kubernetes/pkg/apis/core/v1/helper"
"k8s.io/kubernetes/pkg/features" "k8s.io/kubernetes/pkg/features"
priorityutil "k8s.io/kubernetes/pkg/scheduler/algorithm/priorities/util" priorityutil "k8s.io/kubernetes/pkg/scheduler/algorithm/priorities/util"
volumeutil "k8s.io/kubernetes/pkg/volume/util"
) )
var ( var (
@ -49,8 +47,7 @@ type ImageStateSummary struct {
// NodeInfo is node level aggregated information. // NodeInfo is node level aggregated information.
type NodeInfo struct { type NodeInfo struct {
// Overall node information. // Overall node information.
node *v1.Node node *v1.Node
csiNode *storagev1beta1.CSINode
pods []*v1.Pod pods []*v1.Pod
podsWithAffinity []*v1.Pod podsWithAffinity []*v1.Pod
@ -292,14 +289,6 @@ func (n *NodeInfo) Node() *v1.Node {
return n.node return n.node
} }
// CSINode returns overall CSI-related information about this node.
func (n *NodeInfo) CSINode() *storagev1beta1.CSINode {
if n == nil {
return nil
}
return n.csiNode
}
// Pods return all pods scheduled (including assumed to be) on this node. // Pods return all pods scheduled (including assumed to be) on this node.
func (n *NodeInfo) Pods() []*v1.Pod { func (n *NodeInfo) Pods() []*v1.Pod {
if n == nil { if n == nil {
@ -449,7 +438,6 @@ func (n *NodeInfo) SetGeneration(newGeneration int64) {
func (n *NodeInfo) Clone() *NodeInfo { func (n *NodeInfo) Clone() *NodeInfo {
clone := &NodeInfo{ clone := &NodeInfo{
node: n.node, node: n.node,
csiNode: n.csiNode,
requestedResource: n.requestedResource.Clone(), requestedResource: n.requestedResource.Clone(),
nonzeroRequest: n.nonzeroRequest.Clone(), nonzeroRequest: n.nonzeroRequest.Clone(),
allocatableResource: n.allocatableResource.Clone(), allocatableResource: n.allocatableResource.Clone(),
@ -487,24 +475,11 @@ func (n *NodeInfo) Clone() *NodeInfo {
// VolumeLimits returns volume limits associated with the node // VolumeLimits returns volume limits associated with the node
func (n *NodeInfo) VolumeLimits() map[v1.ResourceName]int64 { func (n *NodeInfo) VolumeLimits() map[v1.ResourceName]int64 {
volumeLimits := map[v1.ResourceName]int64{} volumeLimits := map[v1.ResourceName]int64{}
for k, v := range n.AllocatableResource().ScalarResources { for k, v := range n.AllocatableResource().ScalarResources {
if v1helper.IsAttachableVolumeResourceName(k) { if v1helper.IsAttachableVolumeResourceName(k) {
volumeLimits[k] = v volumeLimits[k] = v
} }
} }
if n.csiNode != nil {
for i := range n.csiNode.Spec.Drivers {
d := n.csiNode.Spec.Drivers[i]
if d.Allocatable != nil && d.Allocatable.Count != nil {
// TODO: drop GetCSIAttachLimitKey once we don't get values from Node object
k := v1.ResourceName(volumeutil.GetCSIAttachLimitKey(d.Name))
volumeLimits[k] = int64(*d.Allocatable.Count)
}
}
}
return volumeLimits return volumeLimits
} }
@ -688,11 +663,6 @@ func (n *NodeInfo) RemoveNode(node *v1.Node) error {
return nil return nil
} }
// SetCSINode sets the overall CSI-related node information.
func (n *NodeInfo) SetCSINode(csiNode *storagev1beta1.CSINode) {
n.csiNode = csiNode
}
// FilterOutPods receives a list of pods and filters out those whose node names // FilterOutPods receives a list of pods and filters out those whose node names
// are equal to the node of this NodeInfo, but are not found in the pods of this NodeInfo. // are equal to the node of this NodeInfo, but are not found in the pods of this NodeInfo.
// //