From c93646e8da79c8531a750fa26463bfffb4931f27 Mon Sep 17 00:00:00 2001 From: Connor Doyle Date: Mon, 26 Sep 2016 08:11:31 -0700 Subject: [PATCH] Support opaque integer resource accounting. - Prevents kubelet from overwriting capacity during sync. - Handles opaque integer resources in the scheduler. - Adds scheduler predicate tests for opaque resources. - Validates opaque int resources: - Ensures supplied opaque int quantities in node capacity, node allocatable, pod request and pod limit are integers. - Adds tests for new validation logic (node update and pod spec). - Added e2e tests for opaque integer resources. --- pkg/api/helpers.go | 18 +- pkg/api/types.go | 5 + pkg/api/validation/validation.go | 24 +- pkg/api/validation/validation_test.go | 206 ++++++++++ pkg/kubelet/kubelet_node_status.go | 23 +- .../algorithm/predicates/predicates.go | 62 ++- .../algorithm/predicates/predicates_test.go | 134 ++++++- plugin/pkg/scheduler/schedulercache/BUILD | 1 + .../pkg/scheduler/schedulercache/node_info.go | 102 +++-- test/e2e/BUILD | 1 + test/e2e/opaque_resource.go | 373 ++++++++++++++++++ 11 files changed, 883 insertions(+), 66 deletions(-) mode change 100755 => 100644 plugin/pkg/scheduler/algorithm/predicates/predicates_test.go create mode 100644 test/e2e/opaque_resource.go diff --git a/pkg/api/helpers.go b/pkg/api/helpers.go index 3de57d4b9d2..58d1a33b532 100644 --- a/pkg/api/helpers.go +++ b/pkg/api/helpers.go @@ -120,6 +120,22 @@ func IsStandardContainerResourceName(str string) bool { return standardContainerResources.Has(str) } +// IsOpaqueIntResourceName returns true if the resource name has the opaque +// integer resource prefix. +func IsOpaqueIntResourceName(name ResourceName) bool { + return strings.HasPrefix(string(name), ResourceOpaqueIntPrefix) +} + +// OpaqueIntResourceName returns a ResourceName with the canonical opaque +// integer prefix prepended. If the argument already has the prefix, it is +// returned unmodified. +func OpaqueIntResourceName(name string) ResourceName { + if IsOpaqueIntResourceName(ResourceName(name)) { + return ResourceName(name) + } + return ResourceName(fmt.Sprintf("%s%s", ResourceOpaqueIntPrefix, name)) +} + var standardLimitRangeTypes = sets.NewString( string(LimitTypePod), string(LimitTypeContainer), @@ -193,7 +209,7 @@ var integerResources = sets.NewString( // IsIntegerResourceName returns true if the resource is measured in integer values func IsIntegerResourceName(str string) bool { - return integerResources.Has(str) + return integerResources.Has(str) || IsOpaqueIntResourceName(ResourceName(str)) } // NewDeleteOptions returns a DeleteOptions indicating the resource should diff --git a/pkg/api/types.go b/pkg/api/types.go index d379cc40786..ee44868ae5f 100644 --- a/pkg/api/types.go +++ b/pkg/api/types.go @@ -2631,6 +2631,11 @@ const ( // Number of Pods that may be running on this Node: see ResourcePods ) +const ( + // Namespace prefix for opaque counted resources (alpha). + ResourceOpaqueIntPrefix = "pod.alpha.kubernetes.io/opaque-int-resource-" +) + // ResourceList is a set of (resource name, quantity) pairs. type ResourceList map[ResourceName]resource.Quantity diff --git a/pkg/api/validation/validation.go b/pkg/api/validation/validation.go index cc9689eee8f..871ff7b22b7 100644 --- a/pkg/api/validation/validation.go +++ b/pkg/api/validation/validation.go @@ -2845,6 +2845,17 @@ func ValidateNodeUpdate(node, oldNode *api.Node) field.ErrorList { // allErrs = append(allErrs, field.Invalid("status", node.Status, "must be empty")) // } + // Validate resource quantities in capacity. + for k, v := range node.Status.Capacity { + resPath := field.NewPath("status", "capacity", string(k)) + allErrs = append(allErrs, ValidateResourceQuantityValue(string(k), v, resPath)...) + } + // Validate resource quantities in allocatable. + for k, v := range node.Status.Allocatable { + resPath := field.NewPath("status", "allocatable", string(k)) + allErrs = append(allErrs, ValidateResourceQuantityValue(string(k), v, resPath)...) + } + // Validte no duplicate addresses in node status. addresses := make(map[api.NodeAddress]bool) for i, address := range node.Status.Addresses { @@ -3236,9 +3247,10 @@ func ValidateResourceRequirements(requirements *api.ResourceRequirements, fldPat fldPath := limPath.Key(string(resourceName)) // Validate resource name. allErrs = append(allErrs, validateContainerResourceName(string(resourceName), fldPath)...) - if api.IsStandardResourceName(string(resourceName)) { - allErrs = append(allErrs, validateBasicResource(quantity, fldPath.Key(string(resourceName)))...) - } + + // Validate resource quantity. + allErrs = append(allErrs, ValidateResourceQuantityValue(string(resourceName), quantity, fldPath)...) + // Check that request <= limit. requestQuantity, exists := requirements.Requests[resourceName] if exists { @@ -3254,10 +3266,10 @@ func ValidateResourceRequirements(requirements *api.ResourceRequirements, fldPat fldPath := reqPath.Key(string(resourceName)) // Validate resource name. allErrs = append(allErrs, validateContainerResourceName(string(resourceName), fldPath)...) - if api.IsStandardResourceName(string(resourceName)) { - allErrs = append(allErrs, validateBasicResource(quantity, fldPath.Key(string(resourceName)))...) - } + // Validate resource quantity. + allErrs = append(allErrs, ValidateResourceQuantityValue(string(resourceName), quantity, fldPath)...) } + return allErrs } diff --git a/pkg/api/validation/validation_test.go b/pkg/api/validation/validation_test.go index 425ee031780..5630526b7be 100644 --- a/pkg/api/validation/validation_test.go +++ b/pkg/api/validation/validation_test.go @@ -3665,6 +3665,52 @@ func TestValidatePod(t *testing.T) { }, Spec: validPodSpec, }, + { // valid opaque integer resources for init container + ObjectMeta: api.ObjectMeta{Name: "valid-opaque-int", Namespace: "ns"}, + Spec: api.PodSpec{ + InitContainers: []api.Container{ + { + Name: "valid-opaque-int", + Image: "image", + ImagePullPolicy: "IfNotPresent", + Resources: api.ResourceRequirements{ + Requests: api.ResourceList{ + api.OpaqueIntResourceName("A"): resource.MustParse("10"), + }, + Limits: api.ResourceList{ + api.OpaqueIntResourceName("A"): resource.MustParse("20"), + }, + }, + }, + }, + Containers: []api.Container{{Name: "ctr", Image: "image", ImagePullPolicy: "IfNotPresent"}}, + RestartPolicy: api.RestartPolicyAlways, + DNSPolicy: api.DNSClusterFirst, + }, + }, + { // valid opaque integer resources for regular container + ObjectMeta: api.ObjectMeta{Name: "valid-opaque-int", Namespace: "ns"}, + Spec: api.PodSpec{ + InitContainers: []api.Container{{Name: "ctr", Image: "image", ImagePullPolicy: "IfNotPresent"}}, + Containers: []api.Container{ + { + Name: "valid-opaque-int", + Image: "image", + ImagePullPolicy: "IfNotPresent", + Resources: api.ResourceRequirements{ + Requests: api.ResourceList{ + api.OpaqueIntResourceName("A"): resource.MustParse("10"), + }, + Limits: api.ResourceList{ + api.OpaqueIntResourceName("A"): resource.MustParse("20"), + }, + }, + }, + }, + RestartPolicy: api.RestartPolicyAlways, + DNSPolicy: api.DNSClusterFirst, + }, + }, } for _, pod := range successCases { if errs := ValidatePod(&pod); len(errs) != 0 { @@ -4155,6 +4201,112 @@ func TestValidatePod(t *testing.T) { }, Spec: validPodSpec, }, + "invalid opaque integer resource requirement: request must be <= limit": { + ObjectMeta: api.ObjectMeta{Name: "123", Namespace: "ns"}, + Spec: api.PodSpec{ + Containers: []api.Container{ + { + Name: "invalid", + Image: "image", + ImagePullPolicy: "IfNotPresent", + Resources: api.ResourceRequirements{ + Requests: api.ResourceList{ + api.OpaqueIntResourceName("A"): resource.MustParse("2"), + }, + Limits: api.ResourceList{ + api.OpaqueIntResourceName("A"): resource.MustParse("1"), + }, + }, + }, + }, + RestartPolicy: api.RestartPolicyAlways, + DNSPolicy: api.DNSClusterFirst, + }, + }, + "invalid fractional opaque integer resource in container request": { + ObjectMeta: api.ObjectMeta{Name: "123", Namespace: "ns"}, + Spec: api.PodSpec{ + Containers: []api.Container{ + { + Name: "invalid", + Image: "image", + ImagePullPolicy: "IfNotPresent", + Resources: api.ResourceRequirements{ + Requests: api.ResourceList{ + api.OpaqueIntResourceName("A"): resource.MustParse("500m"), + }, + }, + }, + }, + RestartPolicy: api.RestartPolicyAlways, + DNSPolicy: api.DNSClusterFirst, + }, + }, + "invalid fractional opaque integer resource in init container request": { + ObjectMeta: api.ObjectMeta{Name: "123", Namespace: "ns"}, + Spec: api.PodSpec{ + InitContainers: []api.Container{ + { + Name: "invalid", + Image: "image", + ImagePullPolicy: "IfNotPresent", + Resources: api.ResourceRequirements{ + Requests: api.ResourceList{ + api.OpaqueIntResourceName("A"): resource.MustParse("500m"), + }, + }, + }, + }, + Containers: []api.Container{{Name: "ctr", Image: "image", ImagePullPolicy: "IfNotPresent"}}, + RestartPolicy: api.RestartPolicyAlways, + DNSPolicy: api.DNSClusterFirst, + }, + }, + "invalid fractional opaque integer resource in container limit": { + ObjectMeta: api.ObjectMeta{Name: "123", Namespace: "ns"}, + Spec: api.PodSpec{ + Containers: []api.Container{ + { + Name: "invalid", + Image: "image", + ImagePullPolicy: "IfNotPresent", + Resources: api.ResourceRequirements{ + Requests: api.ResourceList{ + api.OpaqueIntResourceName("A"): resource.MustParse("5"), + }, + Limits: api.ResourceList{ + api.OpaqueIntResourceName("A"): resource.MustParse("2.5"), + }, + }, + }, + }, + RestartPolicy: api.RestartPolicyAlways, + DNSPolicy: api.DNSClusterFirst, + }, + }, + "invalid fractional opaque integer resource in init container limit": { + ObjectMeta: api.ObjectMeta{Name: "123", Namespace: "ns"}, + Spec: api.PodSpec{ + InitContainers: []api.Container{ + { + Name: "invalid", + Image: "image", + ImagePullPolicy: "IfNotPresent", + Resources: api.ResourceRequirements{ + Requests: api.ResourceList{ + api.OpaqueIntResourceName("A"): resource.MustParse("5"), + }, + Limits: api.ResourceList{ + api.OpaqueIntResourceName("A"): resource.MustParse("2.5"), + }, + }, + }, + }, + Containers: []api.Container{{Name: "ctr", Image: "image", ImagePullPolicy: "IfNotPresent"}}, + RestartPolicy: api.RestartPolicyAlways, + DNSPolicy: api.DNSClusterFirst, + }, + }, } for k, v := range errorCases { if errs := ValidatePod(&v); len(errs) == 0 { @@ -6347,6 +6499,60 @@ func TestValidateNodeUpdate(t *testing.T) { }, }, }, false}, + {api.Node{ + ObjectMeta: api.ObjectMeta{ + Name: "valid-opaque-int-resources", + }, + }, api.Node{ + ObjectMeta: api.ObjectMeta{ + Name: "valid-opaque-int-resources", + }, + Status: api.NodeStatus{ + Capacity: api.ResourceList{ + api.ResourceName(api.ResourceCPU): resource.MustParse("10"), + api.ResourceName(api.ResourceMemory): resource.MustParse("10G"), + api.OpaqueIntResourceName("A"): resource.MustParse("5"), + api.OpaqueIntResourceName("B"): resource.MustParse("10"), + }, + }, + }, true}, + {api.Node{ + ObjectMeta: api.ObjectMeta{ + Name: "invalid-fractional-opaque-int-capacity", + }, + }, api.Node{ + ObjectMeta: api.ObjectMeta{ + Name: "invalid-fractional-opaque-int-capacity", + }, + Status: api.NodeStatus{ + Capacity: api.ResourceList{ + api.ResourceName(api.ResourceCPU): resource.MustParse("10"), + api.ResourceName(api.ResourceMemory): resource.MustParse("10G"), + api.OpaqueIntResourceName("A"): resource.MustParse("500m"), + }, + }, + }, false}, + {api.Node{ + ObjectMeta: api.ObjectMeta{ + Name: "invalid-fractional-opaque-int-allocatable", + }, + }, api.Node{ + ObjectMeta: api.ObjectMeta{ + Name: "invalid-fractional-opaque-int-allocatable", + }, + Status: api.NodeStatus{ + Capacity: api.ResourceList{ + api.ResourceName(api.ResourceCPU): resource.MustParse("10"), + api.ResourceName(api.ResourceMemory): resource.MustParse("10G"), + api.OpaqueIntResourceName("A"): resource.MustParse("5"), + }, + Allocatable: api.ResourceList{ + api.ResourceName(api.ResourceCPU): resource.MustParse("10"), + api.ResourceName(api.ResourceMemory): resource.MustParse("10G"), + api.OpaqueIntResourceName("A"): resource.MustParse("4.5"), + }, + }, + }, false}, } for i, test := range tests { test.oldNode.ObjectMeta.ResourceVersion = "1" diff --git a/pkg/kubelet/kubelet_node_status.go b/pkg/kubelet/kubelet_node_status.go index ee16cbeb6b4..6e554816154 100644 --- a/pkg/kubelet/kubelet_node_status.go +++ b/pkg/kubelet/kubelet_node_status.go @@ -436,23 +436,32 @@ func (kl *Kubelet) setNodeAddress(node *api.Node) error { } func (kl *Kubelet) setNodeStatusMachineInfo(node *api.Node) { + // Note: avoid blindly overwriting the capacity in case opaque + // resources are being advertised. + if node.Status.Capacity == nil { + node.Status.Capacity = api.ResourceList{} + } + // TODO: Post NotReady if we cannot get MachineInfo from cAdvisor. This needs to start // cAdvisor locally, e.g. for test-cmd.sh, and in integration test. info, err := kl.GetCachedMachineInfo() if err != nil { // TODO(roberthbailey): This is required for test-cmd.sh to pass. // See if the test should be updated instead. - node.Status.Capacity = api.ResourceList{ - api.ResourceCPU: *resource.NewMilliQuantity(0, resource.DecimalSI), - api.ResourceMemory: resource.MustParse("0Gi"), - api.ResourcePods: *resource.NewQuantity(int64(kl.maxPods), resource.DecimalSI), - api.ResourceNvidiaGPU: *resource.NewQuantity(int64(kl.nvidiaGPUs), resource.DecimalSI), - } + node.Status.Capacity[api.ResourceCPU] = *resource.NewMilliQuantity(0, resource.DecimalSI) + node.Status.Capacity[api.ResourceMemory] = resource.MustParse("0Gi") + node.Status.Capacity[api.ResourcePods] = *resource.NewQuantity(int64(kl.maxPods), resource.DecimalSI) + node.Status.Capacity[api.ResourceNvidiaGPU] = *resource.NewQuantity(int64(kl.nvidiaGPUs), resource.DecimalSI) + glog.Errorf("Error getting machine info: %v", err) } else { node.Status.NodeInfo.MachineID = info.MachineID node.Status.NodeInfo.SystemUUID = info.SystemUUID - node.Status.Capacity = cadvisor.CapacityFromMachineInfo(info) + + for rName, rCap := range cadvisor.CapacityFromMachineInfo(info) { + node.Status.Capacity[rName] = rCap + } + if kl.podsPerCore > 0 { node.Status.Capacity[api.ResourcePods] = *resource.NewQuantity( int64(math.Min(float64(info.NumCores*kl.podsPerCore), float64(kl.maxPods))), resource.DecimalSI) diff --git a/plugin/pkg/scheduler/algorithm/predicates/predicates.go b/plugin/pkg/scheduler/algorithm/predicates/predicates.go index e57a975e485..ab8ce5ad16d 100644 --- a/plugin/pkg/scheduler/algorithm/predicates/predicates.go +++ b/plugin/pkg/scheduler/algorithm/predicates/predicates.go @@ -430,19 +430,53 @@ func (c *VolumeZoneChecker) predicate(pod *api.Pod, meta interface{}, nodeInfo * func GetResourceRequest(pod *api.Pod) *schedulercache.Resource { result := schedulercache.Resource{} for _, container := range pod.Spec.Containers { - requests := container.Resources.Requests - result.Memory += requests.Memory().Value() - result.MilliCPU += requests.Cpu().MilliValue() - result.NvidiaGPU += requests.NvidiaGPU().Value() + for rName, rQuantity := range container.Resources.Requests { + switch rName { + case api.ResourceMemory: + result.Memory += rQuantity.Value() + case api.ResourceCPU: + result.MilliCPU += rQuantity.MilliValue() + case api.ResourceNvidiaGPU: + result.NvidiaGPU += rQuantity.Value() + default: + if api.IsOpaqueIntResourceName(rName) { + // Lazily allocate this map only if required. + if result.OpaqueIntResources == nil { + result.OpaqueIntResources = map[api.ResourceName]int64{} + } + result.OpaqueIntResources[rName] += rQuantity.Value() + } + } + } } // take max_resource(sum_pod, any_init_container) for _, container := range pod.Spec.InitContainers { - requests := container.Resources.Requests - if mem := requests.Memory().Value(); mem > result.Memory { - result.Memory = mem - } - if cpu := requests.Cpu().MilliValue(); cpu > result.MilliCPU { - result.MilliCPU = cpu + for rName, rQuantity := range container.Resources.Requests { + switch rName { + case api.ResourceMemory: + if mem := rQuantity.Value(); mem > result.Memory { + result.Memory = mem + } + case api.ResourceCPU: + if cpu := rQuantity.MilliValue(); cpu > result.MilliCPU { + result.MilliCPU = cpu + } + case api.ResourceNvidiaGPU: + if gpu := rQuantity.Value(); gpu > result.NvidiaGPU { + result.NvidiaGPU = gpu + } + default: + if api.IsOpaqueIntResourceName(rName) { + // Lazily allocate this map only if required. + if result.OpaqueIntResources == nil { + result.OpaqueIntResources = map[api.ResourceName]int64{} + } + value := rQuantity.Value() + if value > result.OpaqueIntResources[rName] { + result.OpaqueIntResources[rName] = value + } + } + } } } return &result @@ -471,7 +505,7 @@ func PodFitsResources(pod *api.Pod, meta interface{}, nodeInfo *schedulercache.N // We couldn't parse metadata - fallback to computing it. podRequest = GetResourceRequest(pod) } - if podRequest.MilliCPU == 0 && podRequest.Memory == 0 && podRequest.NvidiaGPU == 0 { + if podRequest.MilliCPU == 0 && podRequest.Memory == 0 && podRequest.NvidiaGPU == 0 && len(podRequest.OpaqueIntResources) == 0 { return len(predicateFails) == 0, predicateFails, nil } @@ -485,6 +519,12 @@ func PodFitsResources(pod *api.Pod, meta interface{}, nodeInfo *schedulercache.N if allocatable.NvidiaGPU < podRequest.NvidiaGPU+nodeInfo.RequestedResource().NvidiaGPU { predicateFails = append(predicateFails, NewInsufficientResourceError(api.ResourceNvidiaGPU, podRequest.NvidiaGPU, nodeInfo.RequestedResource().NvidiaGPU, allocatable.NvidiaGPU)) } + for rName, rQuant := range podRequest.OpaqueIntResources { + if allocatable.OpaqueIntResources[rName] < rQuant+nodeInfo.RequestedResource().OpaqueIntResources[rName] { + predicateFails = append(predicateFails, NewInsufficientResourceError(rName, podRequest.OpaqueIntResources[rName], nodeInfo.RequestedResource().OpaqueIntResources[rName], allocatable.OpaqueIntResources[rName])) + } + } + if glog.V(10) { // We explicitly don't do glog.V(10).Infof() to avoid computing all the parameters if this is // not logged. There is visible performance gain from it. diff --git a/plugin/pkg/scheduler/algorithm/predicates/predicates_test.go b/plugin/pkg/scheduler/algorithm/predicates/predicates_test.go old mode 100755 new mode 100644 index 9c394ea34ce..a876d6af177 --- a/plugin/pkg/scheduler/algorithm/predicates/predicates_test.go +++ b/plugin/pkg/scheduler/algorithm/predicates/predicates_test.go @@ -74,23 +74,30 @@ func (pvs FakePersistentVolumeInfo) GetPersistentVolumeInfo(pvID string) (*api.P return nil, fmt.Errorf("Unable to find persistent volume: %s", pvID) } -func makeResources(milliCPU int64, memory int64, nvidiaGPUs int64, pods int64) api.NodeResources { +var ( + opaqueResourceA = api.OpaqueIntResourceName("AAA") + opaqueResourceB = api.OpaqueIntResourceName("BBB") +) + +func makeResources(milliCPU, memory, nvidiaGPUs, pods, opaqueA int64) api.NodeResources { return api.NodeResources{ Capacity: api.ResourceList{ api.ResourceCPU: *resource.NewMilliQuantity(milliCPU, resource.DecimalSI), api.ResourceMemory: *resource.NewQuantity(memory, resource.BinarySI), api.ResourcePods: *resource.NewQuantity(pods, resource.DecimalSI), api.ResourceNvidiaGPU: *resource.NewQuantity(nvidiaGPUs, resource.DecimalSI), + opaqueResourceA: *resource.NewQuantity(opaqueA, resource.DecimalSI), }, } } -func makeAllocatableResources(milliCPU int64, memory int64, nvidiaGPUs int64, pods int64) api.ResourceList { +func makeAllocatableResources(milliCPU, memory, nvidiaGPUs, pods, opaqueA int64) api.ResourceList { return api.ResourceList{ api.ResourceCPU: *resource.NewMilliQuantity(milliCPU, resource.DecimalSI), api.ResourceMemory: *resource.NewQuantity(memory, resource.BinarySI), api.ResourcePods: *resource.NewQuantity(pods, resource.DecimalSI), api.ResourceNvidiaGPU: *resource.NewQuantity(nvidiaGPUs, resource.DecimalSI), + opaqueResourceA: *resource.NewQuantity(opaqueA, resource.DecimalSI), } } @@ -98,13 +105,7 @@ func newResourcePod(usage ...schedulercache.Resource) *api.Pod { containers := []api.Container{} for _, req := range usage { containers = append(containers, api.Container{ - Resources: api.ResourceRequirements{ - Requests: api.ResourceList{ - api.ResourceCPU: *resource.NewMilliQuantity(req.MilliCPU, resource.DecimalSI), - api.ResourceMemory: *resource.NewQuantity(req.Memory, resource.BinarySI), - api.ResourceNvidiaGPU: *resource.NewQuantity(req.NvidiaGPU, resource.DecimalSI), - }, - }, + Resources: api.ResourceRequirements{Requests: req.ResourceList()}, }) } return &api.Pod{ @@ -233,10 +234,105 @@ func TestPodFitsResources(t *testing.T) { fits: true, test: "equal edge case for init container", }, + { + pod: newResourcePod(schedulercache.Resource{OpaqueIntResources: map[api.ResourceName]int64{opaqueResourceA: 1}}), + nodeInfo: schedulercache.NewNodeInfo(newResourcePod(schedulercache.Resource{})), + fits: true, + test: "opaque resource fits", + }, + { + pod: newResourceInitPod(newResourcePod(schedulercache.Resource{}), schedulercache.Resource{OpaqueIntResources: map[api.ResourceName]int64{opaqueResourceA: 1}}), + nodeInfo: schedulercache.NewNodeInfo(newResourcePod(schedulercache.Resource{})), + fits: true, + test: "opaque resource fits for init container", + }, + { + pod: newResourcePod( + schedulercache.Resource{MilliCPU: 1, Memory: 1, OpaqueIntResources: map[api.ResourceName]int64{opaqueResourceA: 10}}), + nodeInfo: schedulercache.NewNodeInfo( + newResourcePod(schedulercache.Resource{MilliCPU: 0, Memory: 0, OpaqueIntResources: map[api.ResourceName]int64{opaqueResourceA: 0}})), + fits: false, + test: "opaque resource capacity enforced", + reasons: []algorithm.PredicateFailureReason{NewInsufficientResourceError(opaqueResourceA, 10, 0, 5)}, + }, + { + pod: newResourceInitPod(newResourcePod(schedulercache.Resource{}), + schedulercache.Resource{MilliCPU: 1, Memory: 1, OpaqueIntResources: map[api.ResourceName]int64{opaqueResourceA: 10}}), + nodeInfo: schedulercache.NewNodeInfo( + newResourcePod(schedulercache.Resource{MilliCPU: 0, Memory: 0, OpaqueIntResources: map[api.ResourceName]int64{opaqueResourceA: 0}})), + fits: false, + test: "opaque resource capacity enforced for init container", + reasons: []algorithm.PredicateFailureReason{NewInsufficientResourceError(opaqueResourceA, 10, 0, 5)}, + }, + { + pod: newResourcePod( + schedulercache.Resource{MilliCPU: 1, Memory: 1, OpaqueIntResources: map[api.ResourceName]int64{opaqueResourceA: 1}}), + nodeInfo: schedulercache.NewNodeInfo( + newResourcePod(schedulercache.Resource{MilliCPU: 0, Memory: 0, OpaqueIntResources: map[api.ResourceName]int64{opaqueResourceA: 5}})), + fits: false, + test: "opaque resource allocatable enforced", + reasons: []algorithm.PredicateFailureReason{NewInsufficientResourceError(opaqueResourceA, 1, 5, 5)}, + }, + { + pod: newResourceInitPod(newResourcePod(schedulercache.Resource{}), + schedulercache.Resource{MilliCPU: 1, Memory: 1, OpaqueIntResources: map[api.ResourceName]int64{opaqueResourceA: 1}}), + nodeInfo: schedulercache.NewNodeInfo( + newResourcePod(schedulercache.Resource{MilliCPU: 0, Memory: 0, OpaqueIntResources: map[api.ResourceName]int64{opaqueResourceA: 5}})), + fits: false, + test: "opaque resource allocatable enforced for init container", + reasons: []algorithm.PredicateFailureReason{NewInsufficientResourceError(opaqueResourceA, 1, 5, 5)}, + }, + { + pod: newResourcePod( + schedulercache.Resource{MilliCPU: 1, Memory: 1, OpaqueIntResources: map[api.ResourceName]int64{opaqueResourceA: 3}}, + schedulercache.Resource{MilliCPU: 1, Memory: 1, OpaqueIntResources: map[api.ResourceName]int64{opaqueResourceA: 3}}), + nodeInfo: schedulercache.NewNodeInfo( + newResourcePod(schedulercache.Resource{MilliCPU: 0, Memory: 0, OpaqueIntResources: map[api.ResourceName]int64{opaqueResourceA: 2}})), + fits: false, + test: "opaque resource allocatable enforced for multiple containers", + reasons: []algorithm.PredicateFailureReason{NewInsufficientResourceError(opaqueResourceA, 6, 2, 5)}, + }, + { + pod: newResourceInitPod(newResourcePod(schedulercache.Resource{}), + schedulercache.Resource{MilliCPU: 1, Memory: 1, OpaqueIntResources: map[api.ResourceName]int64{opaqueResourceA: 3}}, + schedulercache.Resource{MilliCPU: 1, Memory: 1, OpaqueIntResources: map[api.ResourceName]int64{opaqueResourceA: 3}}), + nodeInfo: schedulercache.NewNodeInfo( + newResourcePod(schedulercache.Resource{MilliCPU: 0, Memory: 0, OpaqueIntResources: map[api.ResourceName]int64{opaqueResourceA: 2}})), + fits: true, + test: "opaque resource allocatable admits multiple init containers", + }, + { + pod: newResourceInitPod(newResourcePod(schedulercache.Resource{}), + schedulercache.Resource{MilliCPU: 1, Memory: 1, OpaqueIntResources: map[api.ResourceName]int64{opaqueResourceA: 6}}, + schedulercache.Resource{MilliCPU: 1, Memory: 1, OpaqueIntResources: map[api.ResourceName]int64{opaqueResourceA: 3}}), + nodeInfo: schedulercache.NewNodeInfo( + newResourcePod(schedulercache.Resource{MilliCPU: 0, Memory: 0, OpaqueIntResources: map[api.ResourceName]int64{opaqueResourceA: 2}})), + fits: false, + test: "opaque resource allocatable enforced for multiple init containers", + reasons: []algorithm.PredicateFailureReason{NewInsufficientResourceError(opaqueResourceA, 6, 2, 5)}, + }, + { + pod: newResourcePod( + schedulercache.Resource{MilliCPU: 1, Memory: 1, OpaqueIntResources: map[api.ResourceName]int64{opaqueResourceB: 1}}), + nodeInfo: schedulercache.NewNodeInfo( + newResourcePod(schedulercache.Resource{MilliCPU: 0, Memory: 0})), + fits: false, + test: "opaque resource allocatable enforced for unknown resource", + reasons: []algorithm.PredicateFailureReason{NewInsufficientResourceError(opaqueResourceB, 1, 0, 0)}, + }, + { + pod: newResourceInitPod(newResourcePod(schedulercache.Resource{}), + schedulercache.Resource{MilliCPU: 1, Memory: 1, OpaqueIntResources: map[api.ResourceName]int64{opaqueResourceB: 1}}), + nodeInfo: schedulercache.NewNodeInfo( + newResourcePod(schedulercache.Resource{MilliCPU: 0, Memory: 0})), + fits: false, + test: "opaque resource allocatable enforced for unknown resource for init container", + reasons: []algorithm.PredicateFailureReason{NewInsufficientResourceError(opaqueResourceB, 1, 0, 0)}, + }, } for _, test := range enoughPodsTests { - node := api.Node{Status: api.NodeStatus{Capacity: makeResources(10, 20, 0, 32).Capacity, Allocatable: makeAllocatableResources(10, 20, 0, 32)}} + node := api.Node{Status: api.NodeStatus{Capacity: makeResources(10, 20, 0, 32, 5).Capacity, Allocatable: makeAllocatableResources(10, 20, 0, 32, 5)}} test.nodeInfo.SetNode(&node) fits, reasons, err := PodFitsResources(test.pod, PredicateMetadata(test.pod, nil), test.nodeInfo) if err != nil { @@ -291,7 +387,7 @@ func TestPodFitsResources(t *testing.T) { }, } for _, test := range notEnoughPodsTests { - node := api.Node{Status: api.NodeStatus{Capacity: api.ResourceList{}, Allocatable: makeAllocatableResources(10, 20, 0, 1)}} + node := api.Node{Status: api.NodeStatus{Capacity: api.ResourceList{}, Allocatable: makeAllocatableResources(10, 20, 0, 1, 0)}} test.nodeInfo.SetNode(&node) fits, reasons, err := PodFitsResources(test.pod, PredicateMetadata(test.pod, nil), test.nodeInfo) if err != nil { @@ -1739,7 +1835,7 @@ func TestRunGeneralPredicates(t *testing.T) { newResourcePod(schedulercache.Resource{MilliCPU: 9, Memory: 19})), node: &api.Node{ ObjectMeta: api.ObjectMeta{Name: "machine1"}, - Status: api.NodeStatus{Capacity: makeResources(10, 20, 0, 32).Capacity, Allocatable: makeAllocatableResources(10, 20, 0, 32)}, + Status: api.NodeStatus{Capacity: makeResources(10, 20, 0, 32, 0).Capacity, Allocatable: makeAllocatableResources(10, 20, 0, 32, 0)}, }, fits: true, wErr: nil, @@ -1751,7 +1847,7 @@ func TestRunGeneralPredicates(t *testing.T) { newResourcePod(schedulercache.Resource{MilliCPU: 5, Memory: 19})), node: &api.Node{ ObjectMeta: api.ObjectMeta{Name: "machine1"}, - Status: api.NodeStatus{Capacity: makeResources(10, 20, 0, 32).Capacity, Allocatable: makeAllocatableResources(10, 20, 0, 32)}, + Status: api.NodeStatus{Capacity: makeResources(10, 20, 0, 32, 0).Capacity, Allocatable: makeAllocatableResources(10, 20, 0, 32, 0)}, }, fits: false, wErr: nil, @@ -1765,7 +1861,7 @@ func TestRunGeneralPredicates(t *testing.T) { pod: &api.Pod{}, nodeInfo: schedulercache.NewNodeInfo( newResourcePod(schedulercache.Resource{MilliCPU: 9, Memory: 19})), - node: &api.Node{Status: api.NodeStatus{Capacity: makeResources(10, 20, 1, 32).Capacity, Allocatable: makeAllocatableResources(10, 20, 1, 32)}}, + node: &api.Node{Status: api.NodeStatus{Capacity: makeResources(10, 20, 1, 32, 0).Capacity, Allocatable: makeAllocatableResources(10, 20, 1, 32, 0)}}, fits: true, wErr: nil, test: "no resources/port/host requested always fits on GPU machine", @@ -1774,7 +1870,7 @@ func TestRunGeneralPredicates(t *testing.T) { pod: newResourcePod(schedulercache.Resource{MilliCPU: 3, Memory: 1, NvidiaGPU: 1}), nodeInfo: schedulercache.NewNodeInfo( newResourcePod(schedulercache.Resource{MilliCPU: 5, Memory: 10, NvidiaGPU: 1})), - node: &api.Node{Status: api.NodeStatus{Capacity: makeResources(10, 20, 1, 32).Capacity, Allocatable: makeAllocatableResources(10, 20, 1, 32)}}, + node: &api.Node{Status: api.NodeStatus{Capacity: makeResources(10, 20, 1, 32, 0).Capacity, Allocatable: makeAllocatableResources(10, 20, 1, 32, 0)}}, fits: false, wErr: nil, reasons: []algorithm.PredicateFailureReason{NewInsufficientResourceError(api.ResourceNvidiaGPU, 1, 1, 1)}, @@ -1784,7 +1880,7 @@ func TestRunGeneralPredicates(t *testing.T) { pod: newResourcePod(schedulercache.Resource{MilliCPU: 3, Memory: 1, NvidiaGPU: 1}), nodeInfo: schedulercache.NewNodeInfo( newResourcePod(schedulercache.Resource{MilliCPU: 5, Memory: 10, NvidiaGPU: 0})), - node: &api.Node{Status: api.NodeStatus{Capacity: makeResources(10, 20, 1, 32).Capacity, Allocatable: makeAllocatableResources(10, 20, 1, 32)}}, + node: &api.Node{Status: api.NodeStatus{Capacity: makeResources(10, 20, 1, 32, 0).Capacity, Allocatable: makeAllocatableResources(10, 20, 1, 32, 0)}}, fits: true, wErr: nil, test: "enough GPU resource", @@ -1798,7 +1894,7 @@ func TestRunGeneralPredicates(t *testing.T) { nodeInfo: schedulercache.NewNodeInfo(), node: &api.Node{ ObjectMeta: api.ObjectMeta{Name: "machine1"}, - Status: api.NodeStatus{Capacity: makeResources(10, 20, 0, 32).Capacity, Allocatable: makeAllocatableResources(10, 20, 0, 32)}, + Status: api.NodeStatus{Capacity: makeResources(10, 20, 0, 32, 0).Capacity, Allocatable: makeAllocatableResources(10, 20, 0, 32, 0)}, }, fits: false, wErr: nil, @@ -1810,7 +1906,7 @@ func TestRunGeneralPredicates(t *testing.T) { nodeInfo: schedulercache.NewNodeInfo(newPodWithPort(123)), node: &api.Node{ ObjectMeta: api.ObjectMeta{Name: "machine1"}, - Status: api.NodeStatus{Capacity: makeResources(10, 20, 0, 32).Capacity, Allocatable: makeAllocatableResources(10, 20, 0, 32)}, + Status: api.NodeStatus{Capacity: makeResources(10, 20, 0, 32, 0).Capacity, Allocatable: makeAllocatableResources(10, 20, 0, 32, 0)}, }, fits: false, wErr: nil, @@ -2897,7 +2993,7 @@ func TestPodSchedulesOnNodeWithMemoryPressureCondition(t *testing.T) { ImagePullPolicy: "Always", // at least one requirement -> burstable pod Resources: api.ResourceRequirements{ - Requests: makeAllocatableResources(100, 100, 100, 100), + Requests: makeAllocatableResources(100, 100, 100, 100, 0), }, }, }, diff --git a/plugin/pkg/scheduler/schedulercache/BUILD b/plugin/pkg/scheduler/schedulercache/BUILD index 285a2d9c806..6bd9ab1ad73 100644 --- a/plugin/pkg/scheduler/schedulercache/BUILD +++ b/plugin/pkg/scheduler/schedulercache/BUILD @@ -21,6 +21,7 @@ go_library( tags = ["automanaged"], deps = [ "//pkg/api:go_default_library", + "//pkg/api/resource:go_default_library", "//pkg/client/cache:go_default_library", "//pkg/labels:go_default_library", "//pkg/util/wait:go_default_library", diff --git a/plugin/pkg/scheduler/schedulercache/node_info.go b/plugin/pkg/scheduler/schedulercache/node_info.go index 615de96b802..8b98ca8cacf 100644 --- a/plugin/pkg/scheduler/schedulercache/node_info.go +++ b/plugin/pkg/scheduler/schedulercache/node_info.go @@ -22,6 +22,7 @@ import ( "github.com/golang/glog" "k8s.io/kubernetes/pkg/api" + "k8s.io/kubernetes/pkg/api/resource" clientcache "k8s.io/kubernetes/pkg/client/cache" priorityutil "k8s.io/kubernetes/plugin/pkg/scheduler/algorithm/priorities/util" ) @@ -55,9 +56,22 @@ type NodeInfo struct { // Resource is a collection of compute resource. type Resource struct { - MilliCPU int64 - Memory int64 - NvidiaGPU int64 + MilliCPU int64 + Memory int64 + NvidiaGPU int64 + OpaqueIntResources map[api.ResourceName]int64 +} + +func (r *Resource) ResourceList() api.ResourceList { + result := api.ResourceList{ + api.ResourceCPU: *resource.NewMilliQuantity(r.MilliCPU, resource.DecimalSI), + api.ResourceMemory: *resource.NewQuantity(r.Memory, resource.BinarySI), + api.ResourceNvidiaGPU: *resource.NewQuantity(r.NvidiaGPU, resource.DecimalSI), + } + for rName, rQuant := range r.OpaqueIntResources { + result[rName] = *resource.NewQuantity(rQuant, resource.DecimalSI) + } + return result } // NewNodeInfo returns a ready to use empty NodeInfo object. @@ -169,10 +183,17 @@ func hasPodAffinityConstraints(pod *api.Pod) bool { // addPod adds pod information to this NodeInfo. func (n *NodeInfo) addPod(pod *api.Pod) { - cpu, mem, nvidia_gpu, non0_cpu, non0_mem := calculateResource(pod) - n.requestedResource.MilliCPU += cpu - n.requestedResource.Memory += mem - n.requestedResource.NvidiaGPU += nvidia_gpu + // cpu, mem, nvidia_gpu, non0_cpu, non0_mem := calculateResource(pod) + res, non0_cpu, non0_mem := calculateResource(pod) + n.requestedResource.MilliCPU += res.MilliCPU + n.requestedResource.Memory += res.Memory + n.requestedResource.NvidiaGPU += res.NvidiaGPU + if n.requestedResource.OpaqueIntResources == nil && len(res.OpaqueIntResources) > 0 { + n.requestedResource.OpaqueIntResources = map[api.ResourceName]int64{} + } + for rName, rQuant := range res.OpaqueIntResources { + n.requestedResource.OpaqueIntResources[rName] += rQuant + } n.nonzeroRequest.MilliCPU += non0_cpu n.nonzeroRequest.Memory += non0_mem n.pods = append(n.pods, pod) @@ -213,10 +234,17 @@ func (n *NodeInfo) removePod(pod *api.Pod) error { n.pods[i] = n.pods[len(n.pods)-1] n.pods = n.pods[:len(n.pods)-1] // reduce the resource data - cpu, mem, nvidia_gpu, non0_cpu, non0_mem := calculateResource(pod) - n.requestedResource.MilliCPU -= cpu - n.requestedResource.Memory -= mem - n.requestedResource.NvidiaGPU -= nvidia_gpu + res, non0_cpu, non0_mem := calculateResource(pod) + + n.requestedResource.MilliCPU -= res.MilliCPU + n.requestedResource.Memory -= res.Memory + n.requestedResource.NvidiaGPU -= res.NvidiaGPU + if len(res.OpaqueIntResources) > 0 && n.requestedResource.OpaqueIntResources == nil { + n.requestedResource.OpaqueIntResources = map[api.ResourceName]int64{} + } + for rName, rQuant := range res.OpaqueIntResources { + n.requestedResource.OpaqueIntResources[rName] -= rQuant + } n.nonzeroRequest.MilliCPU -= non0_cpu n.nonzeroRequest.Memory -= non0_mem n.generation++ @@ -226,17 +254,31 @@ func (n *NodeInfo) removePod(pod *api.Pod) error { return fmt.Errorf("no corresponding pod %s in pods of node %s", pod.Name, n.node.Name) } -func calculateResource(pod *api.Pod) (cpu int64, mem int64, nvidia_gpu int64, non0_cpu int64, non0_mem int64) { +func calculateResource(pod *api.Pod) (res Resource, non0_cpu int64, non0_mem int64) { for _, c := range pod.Spec.Containers { - req := c.Resources.Requests - cpu += req.Cpu().MilliValue() - mem += req.Memory().Value() - nvidia_gpu += req.NvidiaGPU().Value() + for rName, rQuant := range c.Resources.Requests { + switch rName { + case api.ResourceCPU: + res.MilliCPU += rQuant.MilliValue() + case api.ResourceMemory: + res.Memory += rQuant.Value() + case api.ResourceNvidiaGPU: + res.NvidiaGPU += rQuant.Value() + default: + if api.IsOpaqueIntResourceName(rName) { + // Lazily allocate opaque resource map. + if res.OpaqueIntResources == nil { + res.OpaqueIntResources = map[api.ResourceName]int64{} + } + res.OpaqueIntResources[rName] += rQuant.Value() + } + } + } - non0_cpu_req, non0_mem_req := priorityutil.GetNonzeroRequests(&req) + non0_cpu_req, non0_mem_req := priorityutil.GetNonzeroRequests(&c.Resources.Requests) non0_cpu += non0_cpu_req non0_mem += non0_mem_req - // No non-zero resources for GPUs + // No non-zero resources for GPUs or opaque resources. } return } @@ -244,10 +286,26 @@ func calculateResource(pod *api.Pod) (cpu int64, mem int64, nvidia_gpu int64, no // Sets the overall node information. func (n *NodeInfo) SetNode(node *api.Node) error { n.node = node - n.allocatableResource.MilliCPU = node.Status.Allocatable.Cpu().MilliValue() - n.allocatableResource.Memory = node.Status.Allocatable.Memory().Value() - n.allocatableResource.NvidiaGPU = node.Status.Allocatable.NvidiaGPU().Value() - n.allowedPodNumber = int(node.Status.Allocatable.Pods().Value()) + for rName, rQuant := range node.Status.Allocatable { + switch rName { + case api.ResourceCPU: + n.allocatableResource.MilliCPU = rQuant.MilliValue() + case api.ResourceMemory: + n.allocatableResource.Memory = rQuant.Value() + case api.ResourceNvidiaGPU: + n.allocatableResource.NvidiaGPU = rQuant.Value() + case api.ResourcePods: + n.allowedPodNumber = int(rQuant.Value()) + default: + if api.IsOpaqueIntResourceName(rName) { + // Lazily allocate opaque resource map. + if n.allocatableResource.OpaqueIntResources == nil { + n.allocatableResource.OpaqueIntResources = map[api.ResourceName]int64{} + } + n.allocatableResource.OpaqueIntResources[rName] = rQuant.Value() + } + } + } n.generation++ return nil } diff --git a/test/e2e/BUILD b/test/e2e/BUILD index 248a7def67a..833447d61ee 100644 --- a/test/e2e/BUILD +++ b/test/e2e/BUILD @@ -72,6 +72,7 @@ go_library( "networking_perf.go", "node_problem_detector.go", "nodeoutofdisk.go", + "opaque_resource.go", "pd.go", "persistent_volumes.go", "petset.go", diff --git a/test/e2e/opaque_resource.go b/test/e2e/opaque_resource.go new file mode 100644 index 00000000000..28750309158 --- /dev/null +++ b/test/e2e/opaque_resource.go @@ -0,0 +1,373 @@ +/* +Copyright 2016 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package e2e + +import ( + "fmt" + "strings" + "sync" + "time" + + "k8s.io/kubernetes/pkg/api" + "k8s.io/kubernetes/pkg/api/resource" + "k8s.io/kubernetes/pkg/client/cache" + "k8s.io/kubernetes/pkg/fields" + "k8s.io/kubernetes/pkg/runtime" + "k8s.io/kubernetes/pkg/util/system" + "k8s.io/kubernetes/pkg/util/wait" + "k8s.io/kubernetes/pkg/watch" + "k8s.io/kubernetes/test/e2e/framework" + + . "github.com/onsi/ginkgo" + . "github.com/onsi/gomega" +) + +var _ = framework.KubeDescribe("Opaque resources [Feature:OpaqueResources]", func() { + f := framework.NewDefaultFramework("opaque-resource") + opaqueResName := api.OpaqueIntResourceName("foo") + var node *api.Node + + BeforeEach(func() { + if node == nil { + // Priming invocation; select the first non-master node. + nodes, err := f.ClientSet.Core().Nodes().List(api.ListOptions{}) + Expect(err).NotTo(HaveOccurred()) + for _, n := range nodes.Items { + if !system.IsMasterNode(&n) { + node = &n + break + } + } + if node == nil { + Fail("unable to select a non-master node") + } + } + + removeOpaqueResource(f, node.Name, opaqueResName) + addOpaqueResource(f, node.Name, opaqueResName) + }) + + It("should not break pods that do not consume opaque integer resources.", func() { + By("Creating a vanilla pod") + requests := api.ResourceList{api.ResourceCPU: resource.MustParse("0.1")} + limits := api.ResourceList{api.ResourceCPU: resource.MustParse("0.2")} + pod := newTestPod(f, "without-oir", requests, limits) + + By("Observing an event that indicates the pod was scheduled") + action := func() error { + _, err := f.ClientSet.Core().Pods(f.Namespace.Name).Create(pod) + return err + } + predicate := func(e *api.Event) bool { + return e.Type == api.EventTypeNormal && + e.Reason == "Scheduled" && + // Here we don't check for the bound node name since it can land on + // any one (this pod doesn't require any of the opaque resource.) + strings.Contains(e.Message, fmt.Sprintf("Successfully assigned %v", pod.Name)) + } + success, err := observeEventAfterAction(f, predicate, action) + Expect(err).NotTo(HaveOccurred()) + Expect(success).To(Equal(true)) + }) + + It("should schedule pods that do consume opaque integer resources.", func() { + By("Creating a pod that requires less of the opaque resource than is allocatable on a node.") + requests := api.ResourceList{ + api.ResourceCPU: resource.MustParse("0.1"), + opaqueResName: resource.MustParse("1"), + } + limits := api.ResourceList{ + api.ResourceCPU: resource.MustParse("0.2"), + opaqueResName: resource.MustParse("2"), + } + pod := newTestPod(f, "min-oir", requests, limits) + + By("Observing an event that indicates the pod was scheduled") + action := func() error { + _, err := f.ClientSet.Core().Pods(f.Namespace.Name).Create(pod) + return err + } + predicate := func(e *api.Event) bool { + return e.Type == api.EventTypeNormal && + e.Reason == "Scheduled" && + strings.Contains(e.Message, fmt.Sprintf("Successfully assigned %v to %v", pod.Name, node.Name)) + } + success, err := observeEventAfterAction(f, predicate, action) + Expect(err).NotTo(HaveOccurred()) + Expect(success).To(Equal(true)) + }) + + It("should not schedule pods that exceed the available amount of opaque integer resource.", func() { + By("Creating a pod that requires more of the opaque resource than is allocatable on any node") + requests := api.ResourceList{opaqueResName: resource.MustParse("6")} + limits := api.ResourceList{} + + By("Observing an event that indicates the pod was not scheduled") + action := func() error { + _, err := f.ClientSet.Core().Pods(f.Namespace.Name).Create(newTestPod(f, "over-max-oir", requests, limits)) + return err + } + predicate := func(e *api.Event) bool { + return e.Type == "Warning" && + e.Reason == "FailedScheduling" && + strings.Contains(e.Message, "failed to fit in any node") + } + success, err := observeEventAfterAction(f, predicate, action) + Expect(err).NotTo(HaveOccurred()) + Expect(success).To(Equal(true)) + }) + + It("should account opaque integer resources in pods with multiple containers.", func() { + By("Creating a pod with two containers that together require less of the opaque resource than is allocatable on a node") + requests := api.ResourceList{opaqueResName: resource.MustParse("1")} + limits := api.ResourceList{} + image := framework.GetPauseImageName(f.ClientSet) + // This pod consumes 2 "foo" resources. + pod := &api.Pod{ + ObjectMeta: api.ObjectMeta{ + Name: "mult-container-oir", + }, + Spec: api.PodSpec{ + Containers: []api.Container{ + { + Name: "pause", + Image: image, + Resources: api.ResourceRequirements{ + Requests: requests, + Limits: limits, + }, + }, + { + Name: "pause-sidecar", + Image: image, + Resources: api.ResourceRequirements{ + Requests: requests, + Limits: limits, + }, + }, + }, + }, + } + + By("Observing an event that indicates the pod was scheduled") + action := func() error { + _, err := f.ClientSet.Core().Pods(f.Namespace.Name).Create(pod) + return err + } + predicate := func(e *api.Event) bool { + return e.Type == api.EventTypeNormal && + e.Reason == "Scheduled" && + strings.Contains(e.Message, fmt.Sprintf("Successfully assigned %v to %v", pod.Name, node.Name)) + } + success, err := observeEventAfterAction(f, predicate, action) + Expect(err).NotTo(HaveOccurred()) + Expect(success).To(Equal(true)) + + By("Creating a pod with two containers that together require more of the opaque resource than is allocatable on any node") + requests = api.ResourceList{opaqueResName: resource.MustParse("3")} + limits = api.ResourceList{} + // This pod consumes 6 "foo" resources. + pod = &api.Pod{ + ObjectMeta: api.ObjectMeta{ + Name: "mult-container-over-max-oir", + }, + Spec: api.PodSpec{ + Containers: []api.Container{ + { + Name: "pause", + Image: image, + Resources: api.ResourceRequirements{ + Requests: requests, + Limits: limits, + }, + }, + { + Name: "pause-sidecar", + Image: image, + Resources: api.ResourceRequirements{ + Requests: requests, + Limits: limits, + }, + }, + }, + }, + } + + By("Observing an event that indicates the pod was not scheduled") + action = func() error { + _, err = f.ClientSet.Core().Pods(f.Namespace.Name).Create(pod) + return err + } + predicate = func(e *api.Event) bool { + return e.Type == "Warning" && + e.Reason == "FailedScheduling" && + strings.Contains(e.Message, "failed to fit in any node") + } + success, err = observeEventAfterAction(f, predicate, action) + Expect(err).NotTo(HaveOccurred()) + Expect(success).To(Equal(true)) + }) +}) + +// Adds the opaque resource to a node. +func addOpaqueResource(f *framework.Framework, nodeName string, opaqueResName api.ResourceName) { + action := func() error { + patch := []byte(fmt.Sprintf(`[{"op": "add", "path": "/status/capacity/%s", "value": "5"}]`, escapeForJSONPatch(opaqueResName))) + return f.ClientSet.Core().RESTClient().Patch(api.JSONPatchType).Resource("nodes").Name(nodeName).SubResource("status").Body(patch).Do().Error() + } + predicate := func(n *api.Node) bool { + capacity, foundCap := n.Status.Capacity[opaqueResName] + allocatable, foundAlloc := n.Status.Allocatable[opaqueResName] + return foundCap && capacity.MilliValue() == int64(5000) && + foundAlloc && allocatable.MilliValue() == int64(5000) + } + success, err := observeNodeUpdateAfterAction(f, nodeName, predicate, action) + Expect(err).NotTo(HaveOccurred()) + Expect(success).To(Equal(true)) +} + +// Removes the opaque resource from a node. +func removeOpaqueResource(f *framework.Framework, nodeName string, opaqueResName api.ResourceName) { + action := func() error { + patch := []byte(fmt.Sprintf(`[{"op": "remove", "path": "/status/capacity/%s"}]`, escapeForJSONPatch(opaqueResName))) + f.ClientSet.Core().RESTClient().Patch(api.JSONPatchType).Resource("nodes").Name(nodeName).SubResource("status").Body(patch).Do() + return nil // Ignore error -- the opaque resource may not exist. + } + predicate := func(n *api.Node) bool { + _, foundCap := n.Status.Capacity[opaqueResName] + _, foundAlloc := n.Status.Allocatable[opaqueResName] + return !foundCap && !foundAlloc + } + success, err := observeNodeUpdateAfterAction(f, nodeName, predicate, action) + Expect(err).NotTo(HaveOccurred()) + Expect(success).To(Equal(true)) +} + +func escapeForJSONPatch(resName api.ResourceName) string { + // Escape forward slashes in the resource name per the JSON Pointer spec. + // See https://tools.ietf.org/html/rfc6901#section-3 + return strings.Replace(string(resName), "/", "~1", -1) +} + +// Returns true if a node update matching the predicate was emitted from the +// system after performing the supplied action. +func observeNodeUpdateAfterAction(f *framework.Framework, nodeName string, nodePredicate func(*api.Node) bool, action func() error) (bool, error) { + observedMatchingNode := false + nodeSelector := fields.OneTermEqualSelector("metadata.name", nodeName) + informerStartedChan := make(chan struct{}) + var informerStartedGuard sync.Once + + _, controller := cache.NewInformer( + &cache.ListWatch{ + ListFunc: func(options api.ListOptions) (runtime.Object, error) { + options.FieldSelector = nodeSelector + ls, err := f.ClientSet.Core().Nodes().List(options) + return ls, err + }, + WatchFunc: func(options api.ListOptions) (watch.Interface, error) { + options.FieldSelector = nodeSelector + w, err := f.ClientSet.Core().Nodes().Watch(options) + // Signal parent goroutine that watching has begun. + informerStartedGuard.Do(func() { close(informerStartedChan) }) + return w, err + }, + }, + &api.Node{}, + 0, + cache.ResourceEventHandlerFuncs{ + UpdateFunc: func(oldObj, newObj interface{}) { + n, ok := newObj.(*api.Node) + Expect(ok).To(Equal(true)) + if nodePredicate(n) { + observedMatchingNode = true + } + }, + }, + ) + + // Start the informer and block this goroutine waiting for the started signal. + informerStopChan := make(chan struct{}) + defer func() { close(informerStopChan) }() + go controller.Run(informerStopChan) + <-informerStartedChan + + // Invoke the action function. + err := action() + if err != nil { + return false, err + } + + // Poll whether the informer has found a matching node update with a timeout. + // Wait up 2 minutes polling every second. + timeout := 2 * time.Minute + interval := 1 * time.Second + err = wait.Poll(interval, timeout, func() (bool, error) { + return observedMatchingNode, nil + }) + return err == nil, err +} + +// Returns true if an event matching the predicate was emitted from the system +// after performing the supplied action. +func observeEventAfterAction(f *framework.Framework, eventPredicate func(*api.Event) bool, action func() error) (bool, error) { + observedMatchingEvent := false + + // Create an informer to list/watch events from the test framework namespace. + _, controller := cache.NewInformer( + &cache.ListWatch{ + ListFunc: func(options api.ListOptions) (runtime.Object, error) { + ls, err := f.ClientSet.Core().Events(f.Namespace.Name).List(options) + return ls, err + }, + WatchFunc: func(options api.ListOptions) (watch.Interface, error) { + w, err := f.ClientSet.Core().Events(f.Namespace.Name).Watch(options) + return w, err + }, + }, + &api.Event{}, + 0, + cache.ResourceEventHandlerFuncs{ + AddFunc: func(obj interface{}) { + e, ok := obj.(*api.Event) + By(fmt.Sprintf("Considering event: \nType = [%s], Reason = [%s], Message = [%s]", e.Type, e.Reason, e.Message)) + Expect(ok).To(Equal(true)) + if ok && eventPredicate(e) { + observedMatchingEvent = true + } + }, + }, + ) + + informerStopChan := make(chan struct{}) + defer func() { close(informerStopChan) }() + go controller.Run(informerStopChan) + + // Invoke the action function. + err := action() + if err != nil { + return false, err + } + + // Poll whether the informer has found a matching event with a timeout. + // Wait up 2 minutes polling every second. + timeout := 2 * time.Minute + interval := 1 * time.Second + err = wait.Poll(interval, timeout, func() (bool, error) { + return observedMatchingEvent, nil + }) + return err == nil, err +}