Merge pull request #86601 from angao/no-disk-conflict

move NoDiskConflict predicate to its filter plugin
This commit is contained in:
Kubernetes Prow Robot 2019-12-25 15:27:38 -08:00 committed by GitHub
commit c84b1a8a10
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 88 additions and 324 deletions

View File

@ -147,73 +147,6 @@ func Ordering() []string {
// The failure information is given by the error.
type FitPredicate func(pod *v1.Pod, meta Metadata, nodeInfo *schedulernodeinfo.NodeInfo) (bool, []PredicateFailureReason, error)
func isVolumeConflict(volume v1.Volume, pod *v1.Pod) bool {
// fast path if there is no conflict checking targets.
if volume.GCEPersistentDisk == nil && volume.AWSElasticBlockStore == nil && volume.RBD == nil && volume.ISCSI == nil {
return false
}
for _, existingVolume := range pod.Spec.Volumes {
// Same GCE disk mounted by multiple pods conflicts unless all pods mount it read-only.
if volume.GCEPersistentDisk != nil && existingVolume.GCEPersistentDisk != nil {
disk, existingDisk := volume.GCEPersistentDisk, existingVolume.GCEPersistentDisk
if disk.PDName == existingDisk.PDName && !(disk.ReadOnly && existingDisk.ReadOnly) {
return true
}
}
if volume.AWSElasticBlockStore != nil && existingVolume.AWSElasticBlockStore != nil {
if volume.AWSElasticBlockStore.VolumeID == existingVolume.AWSElasticBlockStore.VolumeID {
return true
}
}
if volume.ISCSI != nil && existingVolume.ISCSI != nil {
iqn := volume.ISCSI.IQN
eiqn := existingVolume.ISCSI.IQN
// two ISCSI volumes are same, if they share the same iqn. As iscsi volumes are of type
// RWO or ROX, we could permit only one RW mount. Same iscsi volume mounted by multiple Pods
// conflict unless all other pods mount as read only.
if iqn == eiqn && !(volume.ISCSI.ReadOnly && existingVolume.ISCSI.ReadOnly) {
return true
}
}
if volume.RBD != nil && existingVolume.RBD != nil {
mon, pool, image := volume.RBD.CephMonitors, volume.RBD.RBDPool, volume.RBD.RBDImage
emon, epool, eimage := existingVolume.RBD.CephMonitors, existingVolume.RBD.RBDPool, existingVolume.RBD.RBDImage
// two RBDs images are the same if they share the same Ceph monitor, are in the same RADOS Pool, and have the same image name
// only one read-write mount is permitted for the same RBD image.
// same RBD image mounted by multiple Pods conflicts unless all Pods mount the image read-only
if haveOverlap(mon, emon) && pool == epool && image == eimage && !(volume.RBD.ReadOnly && existingVolume.RBD.ReadOnly) {
return true
}
}
}
return false
}
// NoDiskConflict evaluates if a pod can fit due to the volumes it requests, and those that
// are already mounted. If there is already a volume mounted on that node, another pod that uses the same volume
// can't be scheduled there.
// This is GCE, Amazon EBS, ISCSI and Ceph RBD specific for now:
// - GCE PD allows multiple mounts as long as they're all read-only
// - AWS EBS forbids any two pods mounting the same volume ID
// - Ceph RBD forbids if any two pods share at least same monitor, and match pool and image, and the image is read-only
// - ISCSI forbids if any two pods share at least same IQN and ISCSI volume is read-only
// TODO: migrate this into some per-volume specific code?
func NoDiskConflict(pod *v1.Pod, meta Metadata, nodeInfo *schedulernodeinfo.NodeInfo) (bool, []PredicateFailureReason, error) {
for _, v := range pod.Spec.Volumes {
for _, ev := range nodeInfo.Pods() {
if isVolumeConflict(v, ev) {
return false, []PredicateFailureReason{ErrDiskConflict}, nil
}
}
}
return true, nil, nil
}
// MaxPDVolumeCountChecker contains information to check the max number of volumes for a predicate.
type MaxPDVolumeCountChecker struct {
filter VolumeFilter
@ -963,25 +896,6 @@ func PodFitsHostPortsPredicate(pod *v1.Pod, meta []*v1.ContainerPort, nodeInfo *
return true, nil, nil
}
// haveOverlap searches two arrays and returns true if they have at least one common element; returns false otherwise.
func haveOverlap(a1, a2 []string) bool {
if len(a1) > len(a2) {
a1, a2 = a2, a1
}
m := map[string]bool{}
for _, val := range a1 {
m[val] = true
}
for _, val := range a2 {
if _, ok := m[val]; ok {
return true
}
}
return false
}
// GeneralPredicates checks a group of predicates that the kubelet cares about.
// DEPRECATED: this exist only because kubelet uses it. We should change kubelet to execute the individual predicates it requires.
func GeneralPredicates(pod *v1.Pod, meta Metadata, nodeInfo *schedulernodeinfo.NodeInfo) (bool, []PredicateFailureReason, error) {

View File

@ -721,238 +721,6 @@ func TestPodFitsHostPorts(t *testing.T) {
}
}
func TestGCEDiskConflicts(t *testing.T) {
volState := v1.PodSpec{
Volumes: []v1.Volume{
{
VolumeSource: v1.VolumeSource{
GCEPersistentDisk: &v1.GCEPersistentDiskVolumeSource{
PDName: "foo",
},
},
},
},
}
volState2 := v1.PodSpec{
Volumes: []v1.Volume{
{
VolumeSource: v1.VolumeSource{
GCEPersistentDisk: &v1.GCEPersistentDiskVolumeSource{
PDName: "bar",
},
},
},
},
}
tests := []struct {
pod *v1.Pod
nodeInfo *schedulernodeinfo.NodeInfo
isOk bool
name string
}{
{&v1.Pod{}, schedulernodeinfo.NewNodeInfo(), true, "nothing"},
{&v1.Pod{}, schedulernodeinfo.NewNodeInfo(&v1.Pod{Spec: volState}), true, "one state"},
{&v1.Pod{Spec: volState}, schedulernodeinfo.NewNodeInfo(&v1.Pod{Spec: volState}), false, "same state"},
{&v1.Pod{Spec: volState2}, schedulernodeinfo.NewNodeInfo(&v1.Pod{Spec: volState}), true, "different state"},
}
expectedFailureReasons := []PredicateFailureReason{ErrDiskConflict}
for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
ok, reasons, err := NoDiskConflict(test.pod, nil, test.nodeInfo)
if err != nil {
t.Errorf("unexpected error: %v", err)
}
if !ok && !reflect.DeepEqual(reasons, expectedFailureReasons) {
t.Errorf("unexpected failure reasons: %v, want: %v", reasons, expectedFailureReasons)
}
if test.isOk && !ok {
t.Errorf("expected ok, got none. %v %s", test.pod, test.nodeInfo)
}
if !test.isOk && ok {
t.Errorf("expected no ok, got one. %v %s", test.pod, test.nodeInfo)
}
})
}
}
func TestAWSDiskConflicts(t *testing.T) {
volState := v1.PodSpec{
Volumes: []v1.Volume{
{
VolumeSource: v1.VolumeSource{
AWSElasticBlockStore: &v1.AWSElasticBlockStoreVolumeSource{
VolumeID: "foo",
},
},
},
},
}
volState2 := v1.PodSpec{
Volumes: []v1.Volume{
{
VolumeSource: v1.VolumeSource{
AWSElasticBlockStore: &v1.AWSElasticBlockStoreVolumeSource{
VolumeID: "bar",
},
},
},
},
}
tests := []struct {
pod *v1.Pod
nodeInfo *schedulernodeinfo.NodeInfo
isOk bool
name string
}{
{&v1.Pod{}, schedulernodeinfo.NewNodeInfo(), true, "nothing"},
{&v1.Pod{}, schedulernodeinfo.NewNodeInfo(&v1.Pod{Spec: volState}), true, "one state"},
{&v1.Pod{Spec: volState}, schedulernodeinfo.NewNodeInfo(&v1.Pod{Spec: volState}), false, "same state"},
{&v1.Pod{Spec: volState2}, schedulernodeinfo.NewNodeInfo(&v1.Pod{Spec: volState}), true, "different state"},
}
expectedFailureReasons := []PredicateFailureReason{ErrDiskConflict}
for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
ok, reasons, err := NoDiskConflict(test.pod, nil, test.nodeInfo)
if err != nil {
t.Errorf("unexpected error: %v", err)
}
if !ok && !reflect.DeepEqual(reasons, expectedFailureReasons) {
t.Errorf("unexpected failure reasons: %v, want: %v", reasons, expectedFailureReasons)
}
if test.isOk && !ok {
t.Errorf("expected ok, got none. %v %s", test.pod, test.nodeInfo)
}
if !test.isOk && ok {
t.Errorf("expected no ok, got one. %v %s", test.pod, test.nodeInfo)
}
})
}
}
func TestRBDDiskConflicts(t *testing.T) {
volState := v1.PodSpec{
Volumes: []v1.Volume{
{
VolumeSource: v1.VolumeSource{
RBD: &v1.RBDVolumeSource{
CephMonitors: []string{"a", "b"},
RBDPool: "foo",
RBDImage: "bar",
FSType: "ext4",
},
},
},
},
}
volState2 := v1.PodSpec{
Volumes: []v1.Volume{
{
VolumeSource: v1.VolumeSource{
RBD: &v1.RBDVolumeSource{
CephMonitors: []string{"c", "d"},
RBDPool: "foo",
RBDImage: "bar",
FSType: "ext4",
},
},
},
},
}
tests := []struct {
pod *v1.Pod
nodeInfo *schedulernodeinfo.NodeInfo
isOk bool
name string
}{
{&v1.Pod{}, schedulernodeinfo.NewNodeInfo(), true, "nothing"},
{&v1.Pod{}, schedulernodeinfo.NewNodeInfo(&v1.Pod{Spec: volState}), true, "one state"},
{&v1.Pod{Spec: volState}, schedulernodeinfo.NewNodeInfo(&v1.Pod{Spec: volState}), false, "same state"},
{&v1.Pod{Spec: volState2}, schedulernodeinfo.NewNodeInfo(&v1.Pod{Spec: volState}), true, "different state"},
}
expectedFailureReasons := []PredicateFailureReason{ErrDiskConflict}
for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
ok, reasons, err := NoDiskConflict(test.pod, nil, test.nodeInfo)
if err != nil {
t.Errorf("unexpected error: %v", err)
}
if !ok && !reflect.DeepEqual(reasons, expectedFailureReasons) {
t.Errorf("unexpected failure reasons: %v, want: %v", reasons, expectedFailureReasons)
}
if test.isOk && !ok {
t.Errorf("expected ok, got none. %v %s", test.pod, test.nodeInfo)
}
if !test.isOk && ok {
t.Errorf("expected no ok, got one. %v %s", test.pod, test.nodeInfo)
}
})
}
}
func TestISCSIDiskConflicts(t *testing.T) {
volState := v1.PodSpec{
Volumes: []v1.Volume{
{
VolumeSource: v1.VolumeSource{
ISCSI: &v1.ISCSIVolumeSource{
TargetPortal: "127.0.0.1:3260",
IQN: "iqn.2016-12.server:storage.target01",
FSType: "ext4",
Lun: 0,
},
},
},
},
}
volState2 := v1.PodSpec{
Volumes: []v1.Volume{
{
VolumeSource: v1.VolumeSource{
ISCSI: &v1.ISCSIVolumeSource{
TargetPortal: "127.0.0.1:3260",
IQN: "iqn.2017-12.server:storage.target01",
FSType: "ext4",
Lun: 0,
},
},
},
},
}
tests := []struct {
pod *v1.Pod
nodeInfo *schedulernodeinfo.NodeInfo
isOk bool
name string
}{
{&v1.Pod{}, schedulernodeinfo.NewNodeInfo(), true, "nothing"},
{&v1.Pod{}, schedulernodeinfo.NewNodeInfo(&v1.Pod{Spec: volState}), true, "one state"},
{&v1.Pod{Spec: volState}, schedulernodeinfo.NewNodeInfo(&v1.Pod{Spec: volState}), false, "same state"},
{&v1.Pod{Spec: volState2}, schedulernodeinfo.NewNodeInfo(&v1.Pod{Spec: volState}), true, "different state"},
}
expectedFailureReasons := []PredicateFailureReason{ErrDiskConflict}
for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
ok, reasons, err := NoDiskConflict(test.pod, nil, test.nodeInfo)
if err != nil {
t.Errorf("unexpected error: %v", err)
}
if !ok && !reflect.DeepEqual(reasons, expectedFailureReasons) {
t.Errorf("unexpected failure reasons: %v, want: %v", reasons, expectedFailureReasons)
}
if test.isOk && !ok {
t.Errorf("expected ok, got none. %v %s", test.pod, test.nodeInfo)
}
if !test.isOk && ok {
t.Errorf("expected no ok, got one. %v %s", test.pod, test.nodeInfo)
}
})
}
}
// TODO: Add test case for RequiredDuringSchedulingRequiredDuringExecution after it's implemented.
func TestPodFitsSelector(t *testing.T) {
tests := []struct {

View File

@ -117,7 +117,12 @@ func init() {
)
// Fit is determined by non-conflicting disk volumes.
scheduler.RegisterFitPredicate(predicates.NoDiskConflictPred, predicates.NoDiskConflict)
scheduler.RegisterFitPredicateFactory(
predicates.NoDiskConflictPred,
func(args scheduler.AlgorithmFactoryArgs) predicates.FitPredicate {
return nil
},
)
// GeneralPredicates are the predicates that are enforced by all Kubernetes components
// (e.g. kubelet and all schedulers)

View File

@ -7,7 +7,6 @@ go_library(
visibility = ["//visibility:public"],
deps = [
"//pkg/scheduler/algorithm/predicates:go_default_library",
"//pkg/scheduler/framework/plugins/migration:go_default_library",
"//pkg/scheduler/framework/v1alpha1:go_default_library",
"//pkg/scheduler/nodeinfo:go_default_library",
"//staging/src/k8s.io/api/core/v1:go_default_library",

View File

@ -22,7 +22,6 @@ import (
v1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/kubernetes/pkg/scheduler/algorithm/predicates"
"k8s.io/kubernetes/pkg/scheduler/framework/plugins/migration"
framework "k8s.io/kubernetes/pkg/scheduler/framework/v1alpha1"
"k8s.io/kubernetes/pkg/scheduler/nodeinfo"
)
@ -40,11 +39,90 @@ func (pl *VolumeRestrictions) Name() string {
return Name
}
func isVolumeConflict(volume v1.Volume, pod *v1.Pod) bool {
// fast path if there is no conflict checking targets.
if volume.GCEPersistentDisk == nil && volume.AWSElasticBlockStore == nil && volume.RBD == nil && volume.ISCSI == nil {
return false
}
for _, existingVolume := range pod.Spec.Volumes {
// Same GCE disk mounted by multiple pods conflicts unless all pods mount it read-only.
if volume.GCEPersistentDisk != nil && existingVolume.GCEPersistentDisk != nil {
disk, existingDisk := volume.GCEPersistentDisk, existingVolume.GCEPersistentDisk
if disk.PDName == existingDisk.PDName && !(disk.ReadOnly && existingDisk.ReadOnly) {
return true
}
}
if volume.AWSElasticBlockStore != nil && existingVolume.AWSElasticBlockStore != nil {
if volume.AWSElasticBlockStore.VolumeID == existingVolume.AWSElasticBlockStore.VolumeID {
return true
}
}
if volume.ISCSI != nil && existingVolume.ISCSI != nil {
iqn := volume.ISCSI.IQN
eiqn := existingVolume.ISCSI.IQN
// two ISCSI volumes are same, if they share the same iqn. As iscsi volumes are of type
// RWO or ROX, we could permit only one RW mount. Same iscsi volume mounted by multiple Pods
// conflict unless all other pods mount as read only.
if iqn == eiqn && !(volume.ISCSI.ReadOnly && existingVolume.ISCSI.ReadOnly) {
return true
}
}
if volume.RBD != nil && existingVolume.RBD != nil {
mon, pool, image := volume.RBD.CephMonitors, volume.RBD.RBDPool, volume.RBD.RBDImage
emon, epool, eimage := existingVolume.RBD.CephMonitors, existingVolume.RBD.RBDPool, existingVolume.RBD.RBDImage
// two RBDs images are the same if they share the same Ceph monitor, are in the same RADOS Pool, and have the same image name
// only one read-write mount is permitted for the same RBD image.
// same RBD image mounted by multiple Pods conflicts unless all Pods mount the image read-only
if haveOverlap(mon, emon) && pool == epool && image == eimage && !(volume.RBD.ReadOnly && existingVolume.RBD.ReadOnly) {
return true
}
}
}
return false
}
// haveOverlap searches two arrays and returns true if they have at least one common element; returns false otherwise.
func haveOverlap(a1, a2 []string) bool {
if len(a1) > len(a2) {
a1, a2 = a2, a1
}
m := map[string]bool{}
for _, val := range a1 {
m[val] = true
}
for _, val := range a2 {
if _, ok := m[val]; ok {
return true
}
}
return false
}
// Filter invoked at the filter extension point.
// It evaluates if a pod can fit due to the volumes it requests, and those that
// are already mounted. If there is already a volume mounted on that node, another pod that uses the same volume
// can't be scheduled there.
// This is GCE, Amazon EBS, ISCSI and Ceph RBD specific for now:
// - GCE PD allows multiple mounts as long as they're all read-only
// - AWS EBS forbids any two pods mounting the same volume ID
// - Ceph RBD forbids if any two pods share at least same monitor, and match pool and image, and the image is read-only
// - ISCSI forbids if any two pods share at least same IQN and ISCSI volume is read-only
func (pl *VolumeRestrictions) Filter(ctx context.Context, _ *framework.CycleState, pod *v1.Pod, nodeInfo *nodeinfo.NodeInfo) *framework.Status {
// metadata is not needed for NoDiskConflict
_, reasons, err := predicates.NoDiskConflict(pod, nil, nodeInfo)
return migration.PredicateResultToFrameworkStatus(reasons, err)
for _, v := range pod.Spec.Volumes {
for _, ev := range nodeInfo.Pods() {
if isVolumeConflict(v, ev) {
return framework.NewStatus(framework.Unschedulable, predicates.ErrDiskConflict.GetReason())
}
}
}
return nil
}
// New initializes a new plugin and returns it.