diff --git a/docs/devel/scheduler_algorithm.md b/docs/devel/scheduler_algorithm.md index 00a812a55c9..786666caae4 100755 --- a/docs/devel/scheduler_algorithm.md +++ b/docs/devel/scheduler_algorithm.md @@ -47,6 +47,8 @@ The purpose of filtering the nodes is to filter out the nodes that do not meet c - `PodFitsHost`: Filter out all nodes except the one specified in the PodSpec's NodeName field. - `PodSelectorMatches`: Check if the labels of the node match the labels specified in the Pod's `nodeSelector` field ([Here](../user-guide/node-selection/) is an example of how to use `nodeSelector` field). - `CheckNodeLabelPresence`: Check if all the specified labels exist on a node or not, regardless of the value. +- `MaxEBSVolumeCount`: Ensure that the number of attached ElasticBlockStore volumes does not exceed a maximum value (by default, 39, since Amazon recommends a maximum of 40 with one of those 40 reserved for the root volume -- see [Amazon's documentation](http://docs.aws.amazon.com/AWSEC2/latest/UserGuide/volume_limits.html#linux-specific-volume-limits)). The maximum value can be controlled by setting the `KUBE_MAX_PD_VOLS` environment variable. +- `MaxGCEPDVolumeCount`: Ensure that the number of attached GCE PersistentDisk volumes does not exceed a maximum value (by default, 16, which is the maximum GCE allows -- see [GCE's documentation](https://cloud.google.com/compute/docs/disks/persistent-disks#limits_for_predefined_machine_types)). The maximum value can be controlled by setting the `KUBE_MAX_PD_VOLS` environment variable. The details of the above predicates can be found in [plugin/pkg/scheduler/algorithm/predicates/predicates.go](http://releases.k8s.io/HEAD/plugin/pkg/scheduler/algorithm/predicates/predicates.go). All predicates mentioned above can be used in combination to perform a sophisticated filtering policy. Kubernetes uses some, but not all, of these predicates by default. You can see which ones are used by default in [plugin/pkg/scheduler/algorithmprovider/defaults/defaults.go](http://releases.k8s.io/HEAD/plugin/pkg/scheduler/algorithmprovider/defaults/defaults.go). diff --git a/plugin/pkg/scheduler/algorithm/predicates/predicates.go b/plugin/pkg/scheduler/algorithm/predicates/predicates.go index f45c248ba61..4b3aedca8ed 100644 --- a/plugin/pkg/scheduler/algorithm/predicates/predicates.go +++ b/plugin/pkg/scheduler/algorithm/predicates/predicates.go @@ -145,6 +145,141 @@ func NoDiskConflict(pod *api.Pod, existingPods []*api.Pod, node string) (bool, e return true, nil } +type MaxPDVolumeCountChecker struct { + filter VolumeFilter + maxVolumes int + pvInfo PersistentVolumeInfo + pvcInfo PersistentVolumeClaimInfo +} + +// VolumeFilter contains information on how to filter PD Volumes when checking PD Volume caps +type VolumeFilter struct { + // Filter normal volumes + FilterVolume func(vol *api.Volume) (id string, relevant bool) + FilterPersistentVolume func(pv *api.PersistentVolume) (id string, relevant bool) +} + +// NewMaxPDVolumeCountPredicate creates a predicate which evaluates whether a pod can fit based on the +// number of volumes which match a filter that it requests, and those that are already present. The +// maximum number is configurable to accommodate different systems. +// +// The predicate looks for both volumes used directly, as well as PVC volumes that are backed by relevant volume +// types, counts the number of unique volumes, and rejects the new pod if it would place the total count over +// the maximum. +func NewMaxPDVolumeCountPredicate(filter VolumeFilter, maxVolumes int, pvInfo PersistentVolumeInfo, pvcInfo PersistentVolumeClaimInfo) algorithm.FitPredicate { + c := &MaxPDVolumeCountChecker{ + filter: filter, + maxVolumes: maxVolumes, + pvInfo: pvInfo, + pvcInfo: pvcInfo, + } + + return c.predicate +} + +func (c *MaxPDVolumeCountChecker) filterVolumes(volumes []api.Volume, namespace string, filteredVolumes map[string]bool) error { + for _, vol := range volumes { + if id, ok := c.filter.FilterVolume(&vol); ok { + filteredVolumes[id] = true + } else if vol.PersistentVolumeClaim != nil { + pvcName := vol.PersistentVolumeClaim.ClaimName + if pvcName == "" { + return fmt.Errorf("PersistentVolumeClaim had no name: %q", pvcName) + } + pvc, err := c.pvcInfo.GetPersistentVolumeClaimInfo(namespace, pvcName) + if err != nil { + return err + } + + pvName := pvc.Spec.VolumeName + if pvName == "" { + return fmt.Errorf("PersistentVolumeClaim is not bound: %q", pvcName) + } + + pv, err := c.pvInfo.GetPersistentVolumeInfo(pvName) + if err != nil { + return err + } + + if id, ok := c.filter.FilterPersistentVolume(pv); ok { + filteredVolumes[id] = true + } + } + } + + return nil +} + +func (c *MaxPDVolumeCountChecker) predicate(pod *api.Pod, existingPods []*api.Pod, node string) (bool, error) { + newVolumes := make(map[string]bool) + if err := c.filterVolumes(pod.Spec.Volumes, pod.Namespace, newVolumes); err != nil { + return false, err + } + + // quick return + if len(newVolumes) == 0 { + return true, nil + } + + // count unique volumes + existingVolumes := make(map[string]bool) + for _, existingPod := range existingPods { + if err := c.filterVolumes(existingPod.Spec.Volumes, existingPod.Namespace, existingVolumes); err != nil { + return false, err + } + } + numExistingVolumes := len(existingVolumes) + + // filter out already-mounted volumes + for k := range existingVolumes { + if _, ok := newVolumes[k]; ok { + delete(newVolumes, k) + } + } + + numNewVolumes := len(newVolumes) + + if numExistingVolumes+numNewVolumes > c.maxVolumes { + return false, nil + } + + return true, nil +} + +// EBSVolumeFilter is a VolumeFilter for filtering AWS ElasticBlockStore Volumes +var EBSVolumeFilter VolumeFilter = VolumeFilter{ + FilterVolume: func(vol *api.Volume) (string, bool) { + if vol.AWSElasticBlockStore != nil { + return vol.AWSElasticBlockStore.VolumeID, true + } + return "", false + }, + + FilterPersistentVolume: func(pv *api.PersistentVolume) (string, bool) { + if pv.Spec.AWSElasticBlockStore != nil { + return pv.Spec.AWSElasticBlockStore.VolumeID, true + } + return "", false + }, +} + +// GCEPDVolumeFilter is a VolumeFilter for filtering GCE PersistentDisk Volumes +var GCEPDVolumeFilter VolumeFilter = VolumeFilter{ + FilterVolume: func(vol *api.Volume) (string, bool) { + if vol.GCEPersistentDisk != nil { + return vol.GCEPersistentDisk.PDName, true + } + return "", false + }, + + FilterPersistentVolume: func(pv *api.PersistentVolume) (string, bool) { + if pv.Spec.GCEPersistentDisk != nil { + return pv.Spec.GCEPersistentDisk.PDName, true + } + return "", false + }, +} + type VolumeZoneChecker struct { nodeInfo NodeInfo pvInfo PersistentVolumeInfo diff --git a/plugin/pkg/scheduler/algorithm/predicates/predicates_test.go b/plugin/pkg/scheduler/algorithm/predicates/predicates_test.go index 5dc1a148cee..0ce09a1a941 100644 --- a/plugin/pkg/scheduler/algorithm/predicates/predicates_test.go +++ b/plugin/pkg/scheduler/algorithm/predicates/predicates_test.go @@ -44,6 +44,28 @@ func (nodes FakeNodeListInfo) GetNodeInfo(nodeName string) (*api.Node, error) { return nil, fmt.Errorf("Unable to find node: %s", nodeName) } +type FakePersistentVolumeClaimInfo []api.PersistentVolumeClaim + +func (pvcs FakePersistentVolumeClaimInfo) GetPersistentVolumeClaimInfo(namespace string, pvcID string) (*api.PersistentVolumeClaim, error) { + for _, pvc := range pvcs { + if pvc.Name == pvcID && pvc.Namespace == namespace { + return &pvc, nil + } + } + return nil, fmt.Errorf("Unable to find persistent volume claim: %s/%s", namespace, pvcID) +} + +type FakePersistentVolumeInfo []api.PersistentVolume + +func (pvs FakePersistentVolumeInfo) GetPersistentVolumeInfo(pvID string) (*api.PersistentVolume, error) { + for _, pv := range pvs { + if pv.Name == pvID { + return &pv, nil + } + } + return nil, fmt.Errorf("Unable to find persistent volume: %s", pvID) +} + func makeResources(milliCPU int64, memory int64, pods int64) api.NodeResources { return api.NodeResources{ Capacity: api.ResourceList{ @@ -771,3 +793,224 @@ func TestServiceAffinity(t *testing.T) { } } } + +func TestEBSVolumeCountConflicts(t *testing.T) { + oneVolPod := &api.Pod{ + Spec: api.PodSpec{ + Volumes: []api.Volume{ + { + VolumeSource: api.VolumeSource{ + AWSElasticBlockStore: &api.AWSElasticBlockStoreVolumeSource{VolumeID: "ovp"}, + }, + }, + }, + }, + } + ebsPVCPod := &api.Pod{ + Spec: api.PodSpec{ + Volumes: []api.Volume{ + { + VolumeSource: api.VolumeSource{ + PersistentVolumeClaim: &api.PersistentVolumeClaimVolumeSource{ + ClaimName: "someEBSVol", + }, + }, + }, + }, + }, + } + splitPVCPod := &api.Pod{ + Spec: api.PodSpec{ + Volumes: []api.Volume{ + { + VolumeSource: api.VolumeSource{ + PersistentVolumeClaim: &api.PersistentVolumeClaimVolumeSource{ + ClaimName: "someNonEBSVol", + }, + }, + }, + { + VolumeSource: api.VolumeSource{ + PersistentVolumeClaim: &api.PersistentVolumeClaimVolumeSource{ + ClaimName: "someEBSVol", + }, + }, + }, + }, + }, + } + twoVolPod := &api.Pod{ + Spec: api.PodSpec{ + Volumes: []api.Volume{ + { + VolumeSource: api.VolumeSource{ + AWSElasticBlockStore: &api.AWSElasticBlockStoreVolumeSource{VolumeID: "tvp1"}, + }, + }, + { + VolumeSource: api.VolumeSource{ + AWSElasticBlockStore: &api.AWSElasticBlockStoreVolumeSource{VolumeID: "tvp2"}, + }, + }, + }, + }, + } + splitVolsPod := &api.Pod{ + Spec: api.PodSpec{ + Volumes: []api.Volume{ + { + VolumeSource: api.VolumeSource{ + HostPath: &api.HostPathVolumeSource{}, + }, + }, + { + VolumeSource: api.VolumeSource{ + AWSElasticBlockStore: &api.AWSElasticBlockStoreVolumeSource{VolumeID: "svp"}, + }, + }, + }, + }, + } + nonApplicablePod := &api.Pod{ + Spec: api.PodSpec{ + Volumes: []api.Volume{ + { + VolumeSource: api.VolumeSource{ + HostPath: &api.HostPathVolumeSource{}, + }, + }, + }, + }, + } + emptyPod := &api.Pod{ + Spec: api.PodSpec{}, + } + + tests := []struct { + newPod *api.Pod + existingPods []*api.Pod + maxVols int + fits bool + test string + }{ + { + newPod: oneVolPod, + existingPods: []*api.Pod{twoVolPod, oneVolPod}, + maxVols: 4, + fits: true, + test: "fits when node capacity >= new pod's EBS volumes", + }, + { + newPod: twoVolPod, + existingPods: []*api.Pod{oneVolPod}, + maxVols: 2, + fits: false, + test: "doesn't fit when node capacity < new pod's EBS volumes", + }, + { + newPod: splitVolsPod, + existingPods: []*api.Pod{twoVolPod}, + maxVols: 3, + fits: true, + test: "new pod's count ignores non-EBS volumes", + }, + { + newPod: twoVolPod, + existingPods: []*api.Pod{splitVolsPod, nonApplicablePod, emptyPod}, + maxVols: 3, + fits: true, + test: "existing pods' counts ignore non-EBS volumes", + }, + { + newPod: ebsPVCPod, + existingPods: []*api.Pod{splitVolsPod, nonApplicablePod, emptyPod}, + maxVols: 3, + fits: true, + test: "new pod's count considers PVCs backed by EBS volumes", + }, + { + newPod: splitPVCPod, + existingPods: []*api.Pod{splitVolsPod, oneVolPod}, + maxVols: 3, + fits: true, + test: "new pod's count ignores PVCs not backed by EBS volumes", + }, + { + newPod: twoVolPod, + existingPods: []*api.Pod{oneVolPod, ebsPVCPod}, + maxVols: 3, + fits: false, + test: "existing pods' counts considers PVCs backed by EBS volumes", + }, + { + newPod: twoVolPod, + existingPods: []*api.Pod{oneVolPod, twoVolPod, ebsPVCPod}, + maxVols: 4, + fits: true, + test: "already-mounted EBS volumes are always ok to allow", + }, + { + newPod: splitVolsPod, + existingPods: []*api.Pod{oneVolPod, oneVolPod, ebsPVCPod}, + maxVols: 3, + fits: true, + test: "the same EBS volumes are not counted multiple times", + }, + } + + pvInfo := FakePersistentVolumeInfo{ + { + ObjectMeta: api.ObjectMeta{Name: "someEBSVol"}, + Spec: api.PersistentVolumeSpec{ + PersistentVolumeSource: api.PersistentVolumeSource{ + AWSElasticBlockStore: &api.AWSElasticBlockStoreVolumeSource{}, + }, + }, + }, + { + ObjectMeta: api.ObjectMeta{Name: "someNonEBSVol"}, + Spec: api.PersistentVolumeSpec{ + PersistentVolumeSource: api.PersistentVolumeSource{}, + }, + }, + } + + pvcInfo := FakePersistentVolumeClaimInfo{ + { + ObjectMeta: api.ObjectMeta{Name: "someEBSVol"}, + Spec: api.PersistentVolumeClaimSpec{VolumeName: "someEBSVol"}, + }, + { + ObjectMeta: api.ObjectMeta{Name: "someNonEBSVol"}, + Spec: api.PersistentVolumeClaimSpec{VolumeName: "someNonEBSVol"}, + }, + } + + filter := VolumeFilter{ + FilterVolume: func(vol *api.Volume) (string, bool) { + if vol.AWSElasticBlockStore != nil { + return vol.AWSElasticBlockStore.VolumeID, true + } + return "", false + }, + + FilterPersistentVolume: func(pv *api.PersistentVolume) (string, bool) { + if pv.Spec.AWSElasticBlockStore != nil { + return pv.Spec.AWSElasticBlockStore.VolumeID, true + } + return "", false + }, + } + + for _, test := range tests { + pred := NewMaxPDVolumeCountPredicate(filter, test.maxVols, pvInfo, pvcInfo) + fits, err := pred(test.newPod, test.existingPods, "some-node") + if err != nil { + t.Errorf("unexpected error: %v", err) + } + + if fits != test.fits { + t.Errorf("%s: expected %v, got %v", test.test, test.fits, fits) + } + } +} diff --git a/plugin/pkg/scheduler/algorithmprovider/defaults/defaults.go b/plugin/pkg/scheduler/algorithmprovider/defaults/defaults.go index 7704dd4e260..5a347b2a50c 100644 --- a/plugin/pkg/scheduler/algorithmprovider/defaults/defaults.go +++ b/plugin/pkg/scheduler/algorithmprovider/defaults/defaults.go @@ -18,14 +18,41 @@ limitations under the License. package defaults import ( + "os" + "strconv" + "k8s.io/kubernetes/pkg/util/sets" "k8s.io/kubernetes/plugin/pkg/scheduler" "k8s.io/kubernetes/plugin/pkg/scheduler/algorithm" "k8s.io/kubernetes/plugin/pkg/scheduler/algorithm/predicates" "k8s.io/kubernetes/plugin/pkg/scheduler/algorithm/priorities" "k8s.io/kubernetes/plugin/pkg/scheduler/factory" + + "github.com/golang/glog" ) +// Amazon reccomends having no more that 40 volumes attached to an instance, +// and at least one of those is for the system root volume. +const DefaultMaxEBSVolumes = 39 + +// GCE instances can have up to 16 PD volumes attached. +const DefaultMaxGCEPDVolumes = 16 + +// getMaxVols checks the max PD volumes environment variable, otherwise returning a default value +func getMaxVols(defaultVal int) int { + if rawMaxVols := os.Getenv("KUBE_MAX_PD_VOLS"); rawMaxVols != "" { + if parsedMaxVols, err := strconv.Atoi(rawMaxVols); err != nil { + glog.Errorf("Unable to parse maxiumum PD volumes value, using default of %v: %v", defaultVal, err) + } else if parsedMaxVols <= 0 { + glog.Errorf("Maximum PD volumes must be a positive value, using default of %v", defaultVal) + } else { + return parsedMaxVols + } + } + + return defaultVal +} + func init() { factory.RegisterAlgorithmProvider(factory.DefaultProvider, defaultPredicates(), defaultPriorities()) // EqualPriority is a prioritizer function that gives an equal weight of one to all nodes @@ -80,6 +107,26 @@ func defaultPredicates() sets.String { ), // Fit is determined by the presence of the Host parameter and a string match factory.RegisterFitPredicate("HostName", predicates.PodFitsHost), + + // Fit is determined by whether or not there would be too many AWS EBS volumes attached to the node + factory.RegisterFitPredicateFactory( + "MaxEBSVolumeCount", + func(args factory.PluginFactoryArgs) algorithm.FitPredicate { + // TODO: allow for generically parameterized scheduler predicates, because this is a bit ugly + maxVols := getMaxVols(DefaultMaxEBSVolumes) + return predicates.NewMaxPDVolumeCountPredicate(predicates.EBSVolumeFilter, maxVols, args.PVInfo, args.PVCInfo) + }, + ), + + // Fit is determined by whether or not there would be too many GCE PD volumes attached to the node + factory.RegisterFitPredicateFactory( + "MaxGCEPDVolumeCount", + func(args factory.PluginFactoryArgs) algorithm.FitPredicate { + // TODO: allow for generically parameterized scheduler predicates, because this is a bit ugly + maxVols := getMaxVols(DefaultMaxGCEPDVolumes) + return predicates.NewMaxPDVolumeCountPredicate(predicates.GCEPDVolumeFilter, maxVols, args.PVInfo, args.PVCInfo) + }, + ), ) }