Support opaque integer resource accounting.

- Prevents kubelet from overwriting capacity during sync.
- Handles opaque integer resources in the scheduler.
  - Adds scheduler predicate tests for opaque resources.
- Validates opaque int resources:
  - Ensures supplied opaque int quantities in node capacity,
    node allocatable, pod request and pod limit are integers.
  - Adds tests for new validation logic (node update and pod spec).
- Added e2e tests for opaque integer resources.
This commit is contained in:
Connor Doyle 2016-09-26 08:11:31 -07:00
parent 1cba31af40
commit c93646e8da
11 changed files with 883 additions and 66 deletions

View File

@ -120,6 +120,22 @@ func IsStandardContainerResourceName(str string) bool {
return standardContainerResources.Has(str)
}
// IsOpaqueIntResourceName returns true if the resource name has the opaque
// integer resource prefix.
func IsOpaqueIntResourceName(name ResourceName) bool {
return strings.HasPrefix(string(name), ResourceOpaqueIntPrefix)
}
// OpaqueIntResourceName returns a ResourceName with the canonical opaque
// integer prefix prepended. If the argument already has the prefix, it is
// returned unmodified.
func OpaqueIntResourceName(name string) ResourceName {
if IsOpaqueIntResourceName(ResourceName(name)) {
return ResourceName(name)
}
return ResourceName(fmt.Sprintf("%s%s", ResourceOpaqueIntPrefix, name))
}
var standardLimitRangeTypes = sets.NewString(
string(LimitTypePod),
string(LimitTypeContainer),
@ -193,7 +209,7 @@ var integerResources = sets.NewString(
// IsIntegerResourceName returns true if the resource is measured in integer values
func IsIntegerResourceName(str string) bool {
return integerResources.Has(str)
return integerResources.Has(str) || IsOpaqueIntResourceName(ResourceName(str))
}
// NewDeleteOptions returns a DeleteOptions indicating the resource should

View File

@ -2631,6 +2631,11 @@ const (
// Number of Pods that may be running on this Node: see ResourcePods
)
const (
// Namespace prefix for opaque counted resources (alpha).
ResourceOpaqueIntPrefix = "pod.alpha.kubernetes.io/opaque-int-resource-"
)
// ResourceList is a set of (resource name, quantity) pairs.
type ResourceList map[ResourceName]resource.Quantity

View File

@ -2845,6 +2845,17 @@ func ValidateNodeUpdate(node, oldNode *api.Node) field.ErrorList {
// allErrs = append(allErrs, field.Invalid("status", node.Status, "must be empty"))
// }
// Validate resource quantities in capacity.
for k, v := range node.Status.Capacity {
resPath := field.NewPath("status", "capacity", string(k))
allErrs = append(allErrs, ValidateResourceQuantityValue(string(k), v, resPath)...)
}
// Validate resource quantities in allocatable.
for k, v := range node.Status.Allocatable {
resPath := field.NewPath("status", "allocatable", string(k))
allErrs = append(allErrs, ValidateResourceQuantityValue(string(k), v, resPath)...)
}
// Validte no duplicate addresses in node status.
addresses := make(map[api.NodeAddress]bool)
for i, address := range node.Status.Addresses {
@ -3236,9 +3247,10 @@ func ValidateResourceRequirements(requirements *api.ResourceRequirements, fldPat
fldPath := limPath.Key(string(resourceName))
// Validate resource name.
allErrs = append(allErrs, validateContainerResourceName(string(resourceName), fldPath)...)
if api.IsStandardResourceName(string(resourceName)) {
allErrs = append(allErrs, validateBasicResource(quantity, fldPath.Key(string(resourceName)))...)
}
// Validate resource quantity.
allErrs = append(allErrs, ValidateResourceQuantityValue(string(resourceName), quantity, fldPath)...)
// Check that request <= limit.
requestQuantity, exists := requirements.Requests[resourceName]
if exists {
@ -3254,10 +3266,10 @@ func ValidateResourceRequirements(requirements *api.ResourceRequirements, fldPat
fldPath := reqPath.Key(string(resourceName))
// Validate resource name.
allErrs = append(allErrs, validateContainerResourceName(string(resourceName), fldPath)...)
if api.IsStandardResourceName(string(resourceName)) {
allErrs = append(allErrs, validateBasicResource(quantity, fldPath.Key(string(resourceName)))...)
}
// Validate resource quantity.
allErrs = append(allErrs, ValidateResourceQuantityValue(string(resourceName), quantity, fldPath)...)
}
return allErrs
}

View File

@ -3665,6 +3665,52 @@ func TestValidatePod(t *testing.T) {
},
Spec: validPodSpec,
},
{ // valid opaque integer resources for init container
ObjectMeta: api.ObjectMeta{Name: "valid-opaque-int", Namespace: "ns"},
Spec: api.PodSpec{
InitContainers: []api.Container{
{
Name: "valid-opaque-int",
Image: "image",
ImagePullPolicy: "IfNotPresent",
Resources: api.ResourceRequirements{
Requests: api.ResourceList{
api.OpaqueIntResourceName("A"): resource.MustParse("10"),
},
Limits: api.ResourceList{
api.OpaqueIntResourceName("A"): resource.MustParse("20"),
},
},
},
},
Containers: []api.Container{{Name: "ctr", Image: "image", ImagePullPolicy: "IfNotPresent"}},
RestartPolicy: api.RestartPolicyAlways,
DNSPolicy: api.DNSClusterFirst,
},
},
{ // valid opaque integer resources for regular container
ObjectMeta: api.ObjectMeta{Name: "valid-opaque-int", Namespace: "ns"},
Spec: api.PodSpec{
InitContainers: []api.Container{{Name: "ctr", Image: "image", ImagePullPolicy: "IfNotPresent"}},
Containers: []api.Container{
{
Name: "valid-opaque-int",
Image: "image",
ImagePullPolicy: "IfNotPresent",
Resources: api.ResourceRequirements{
Requests: api.ResourceList{
api.OpaqueIntResourceName("A"): resource.MustParse("10"),
},
Limits: api.ResourceList{
api.OpaqueIntResourceName("A"): resource.MustParse("20"),
},
},
},
},
RestartPolicy: api.RestartPolicyAlways,
DNSPolicy: api.DNSClusterFirst,
},
},
}
for _, pod := range successCases {
if errs := ValidatePod(&pod); len(errs) != 0 {
@ -4155,6 +4201,112 @@ func TestValidatePod(t *testing.T) {
},
Spec: validPodSpec,
},
"invalid opaque integer resource requirement: request must be <= limit": {
ObjectMeta: api.ObjectMeta{Name: "123", Namespace: "ns"},
Spec: api.PodSpec{
Containers: []api.Container{
{
Name: "invalid",
Image: "image",
ImagePullPolicy: "IfNotPresent",
Resources: api.ResourceRequirements{
Requests: api.ResourceList{
api.OpaqueIntResourceName("A"): resource.MustParse("2"),
},
Limits: api.ResourceList{
api.OpaqueIntResourceName("A"): resource.MustParse("1"),
},
},
},
},
RestartPolicy: api.RestartPolicyAlways,
DNSPolicy: api.DNSClusterFirst,
},
},
"invalid fractional opaque integer resource in container request": {
ObjectMeta: api.ObjectMeta{Name: "123", Namespace: "ns"},
Spec: api.PodSpec{
Containers: []api.Container{
{
Name: "invalid",
Image: "image",
ImagePullPolicy: "IfNotPresent",
Resources: api.ResourceRequirements{
Requests: api.ResourceList{
api.OpaqueIntResourceName("A"): resource.MustParse("500m"),
},
},
},
},
RestartPolicy: api.RestartPolicyAlways,
DNSPolicy: api.DNSClusterFirst,
},
},
"invalid fractional opaque integer resource in init container request": {
ObjectMeta: api.ObjectMeta{Name: "123", Namespace: "ns"},
Spec: api.PodSpec{
InitContainers: []api.Container{
{
Name: "invalid",
Image: "image",
ImagePullPolicy: "IfNotPresent",
Resources: api.ResourceRequirements{
Requests: api.ResourceList{
api.OpaqueIntResourceName("A"): resource.MustParse("500m"),
},
},
},
},
Containers: []api.Container{{Name: "ctr", Image: "image", ImagePullPolicy: "IfNotPresent"}},
RestartPolicy: api.RestartPolicyAlways,
DNSPolicy: api.DNSClusterFirst,
},
},
"invalid fractional opaque integer resource in container limit": {
ObjectMeta: api.ObjectMeta{Name: "123", Namespace: "ns"},
Spec: api.PodSpec{
Containers: []api.Container{
{
Name: "invalid",
Image: "image",
ImagePullPolicy: "IfNotPresent",
Resources: api.ResourceRequirements{
Requests: api.ResourceList{
api.OpaqueIntResourceName("A"): resource.MustParse("5"),
},
Limits: api.ResourceList{
api.OpaqueIntResourceName("A"): resource.MustParse("2.5"),
},
},
},
},
RestartPolicy: api.RestartPolicyAlways,
DNSPolicy: api.DNSClusterFirst,
},
},
"invalid fractional opaque integer resource in init container limit": {
ObjectMeta: api.ObjectMeta{Name: "123", Namespace: "ns"},
Spec: api.PodSpec{
InitContainers: []api.Container{
{
Name: "invalid",
Image: "image",
ImagePullPolicy: "IfNotPresent",
Resources: api.ResourceRequirements{
Requests: api.ResourceList{
api.OpaqueIntResourceName("A"): resource.MustParse("5"),
},
Limits: api.ResourceList{
api.OpaqueIntResourceName("A"): resource.MustParse("2.5"),
},
},
},
},
Containers: []api.Container{{Name: "ctr", Image: "image", ImagePullPolicy: "IfNotPresent"}},
RestartPolicy: api.RestartPolicyAlways,
DNSPolicy: api.DNSClusterFirst,
},
},
}
for k, v := range errorCases {
if errs := ValidatePod(&v); len(errs) == 0 {
@ -6347,6 +6499,60 @@ func TestValidateNodeUpdate(t *testing.T) {
},
},
}, false},
{api.Node{
ObjectMeta: api.ObjectMeta{
Name: "valid-opaque-int-resources",
},
}, api.Node{
ObjectMeta: api.ObjectMeta{
Name: "valid-opaque-int-resources",
},
Status: api.NodeStatus{
Capacity: api.ResourceList{
api.ResourceName(api.ResourceCPU): resource.MustParse("10"),
api.ResourceName(api.ResourceMemory): resource.MustParse("10G"),
api.OpaqueIntResourceName("A"): resource.MustParse("5"),
api.OpaqueIntResourceName("B"): resource.MustParse("10"),
},
},
}, true},
{api.Node{
ObjectMeta: api.ObjectMeta{
Name: "invalid-fractional-opaque-int-capacity",
},
}, api.Node{
ObjectMeta: api.ObjectMeta{
Name: "invalid-fractional-opaque-int-capacity",
},
Status: api.NodeStatus{
Capacity: api.ResourceList{
api.ResourceName(api.ResourceCPU): resource.MustParse("10"),
api.ResourceName(api.ResourceMemory): resource.MustParse("10G"),
api.OpaqueIntResourceName("A"): resource.MustParse("500m"),
},
},
}, false},
{api.Node{
ObjectMeta: api.ObjectMeta{
Name: "invalid-fractional-opaque-int-allocatable",
},
}, api.Node{
ObjectMeta: api.ObjectMeta{
Name: "invalid-fractional-opaque-int-allocatable",
},
Status: api.NodeStatus{
Capacity: api.ResourceList{
api.ResourceName(api.ResourceCPU): resource.MustParse("10"),
api.ResourceName(api.ResourceMemory): resource.MustParse("10G"),
api.OpaqueIntResourceName("A"): resource.MustParse("5"),
},
Allocatable: api.ResourceList{
api.ResourceName(api.ResourceCPU): resource.MustParse("10"),
api.ResourceName(api.ResourceMemory): resource.MustParse("10G"),
api.OpaqueIntResourceName("A"): resource.MustParse("4.5"),
},
},
}, false},
}
for i, test := range tests {
test.oldNode.ObjectMeta.ResourceVersion = "1"

View File

@ -436,23 +436,32 @@ func (kl *Kubelet) setNodeAddress(node *api.Node) error {
}
func (kl *Kubelet) setNodeStatusMachineInfo(node *api.Node) {
// Note: avoid blindly overwriting the capacity in case opaque
// resources are being advertised.
if node.Status.Capacity == nil {
node.Status.Capacity = api.ResourceList{}
}
// TODO: Post NotReady if we cannot get MachineInfo from cAdvisor. This needs to start
// cAdvisor locally, e.g. for test-cmd.sh, and in integration test.
info, err := kl.GetCachedMachineInfo()
if err != nil {
// TODO(roberthbailey): This is required for test-cmd.sh to pass.
// See if the test should be updated instead.
node.Status.Capacity = api.ResourceList{
api.ResourceCPU: *resource.NewMilliQuantity(0, resource.DecimalSI),
api.ResourceMemory: resource.MustParse("0Gi"),
api.ResourcePods: *resource.NewQuantity(int64(kl.maxPods), resource.DecimalSI),
api.ResourceNvidiaGPU: *resource.NewQuantity(int64(kl.nvidiaGPUs), resource.DecimalSI),
}
node.Status.Capacity[api.ResourceCPU] = *resource.NewMilliQuantity(0, resource.DecimalSI)
node.Status.Capacity[api.ResourceMemory] = resource.MustParse("0Gi")
node.Status.Capacity[api.ResourcePods] = *resource.NewQuantity(int64(kl.maxPods), resource.DecimalSI)
node.Status.Capacity[api.ResourceNvidiaGPU] = *resource.NewQuantity(int64(kl.nvidiaGPUs), resource.DecimalSI)
glog.Errorf("Error getting machine info: %v", err)
} else {
node.Status.NodeInfo.MachineID = info.MachineID
node.Status.NodeInfo.SystemUUID = info.SystemUUID
node.Status.Capacity = cadvisor.CapacityFromMachineInfo(info)
for rName, rCap := range cadvisor.CapacityFromMachineInfo(info) {
node.Status.Capacity[rName] = rCap
}
if kl.podsPerCore > 0 {
node.Status.Capacity[api.ResourcePods] = *resource.NewQuantity(
int64(math.Min(float64(info.NumCores*kl.podsPerCore), float64(kl.maxPods))), resource.DecimalSI)

View File

@ -430,19 +430,53 @@ func (c *VolumeZoneChecker) predicate(pod *api.Pod, meta interface{}, nodeInfo *
func GetResourceRequest(pod *api.Pod) *schedulercache.Resource {
result := schedulercache.Resource{}
for _, container := range pod.Spec.Containers {
requests := container.Resources.Requests
result.Memory += requests.Memory().Value()
result.MilliCPU += requests.Cpu().MilliValue()
result.NvidiaGPU += requests.NvidiaGPU().Value()
for rName, rQuantity := range container.Resources.Requests {
switch rName {
case api.ResourceMemory:
result.Memory += rQuantity.Value()
case api.ResourceCPU:
result.MilliCPU += rQuantity.MilliValue()
case api.ResourceNvidiaGPU:
result.NvidiaGPU += rQuantity.Value()
default:
if api.IsOpaqueIntResourceName(rName) {
// Lazily allocate this map only if required.
if result.OpaqueIntResources == nil {
result.OpaqueIntResources = map[api.ResourceName]int64{}
}
result.OpaqueIntResources[rName] += rQuantity.Value()
}
}
}
}
// take max_resource(sum_pod, any_init_container)
for _, container := range pod.Spec.InitContainers {
requests := container.Resources.Requests
if mem := requests.Memory().Value(); mem > result.Memory {
result.Memory = mem
}
if cpu := requests.Cpu().MilliValue(); cpu > result.MilliCPU {
result.MilliCPU = cpu
for rName, rQuantity := range container.Resources.Requests {
switch rName {
case api.ResourceMemory:
if mem := rQuantity.Value(); mem > result.Memory {
result.Memory = mem
}
case api.ResourceCPU:
if cpu := rQuantity.MilliValue(); cpu > result.MilliCPU {
result.MilliCPU = cpu
}
case api.ResourceNvidiaGPU:
if gpu := rQuantity.Value(); gpu > result.NvidiaGPU {
result.NvidiaGPU = gpu
}
default:
if api.IsOpaqueIntResourceName(rName) {
// Lazily allocate this map only if required.
if result.OpaqueIntResources == nil {
result.OpaqueIntResources = map[api.ResourceName]int64{}
}
value := rQuantity.Value()
if value > result.OpaqueIntResources[rName] {
result.OpaqueIntResources[rName] = value
}
}
}
}
}
return &result
@ -471,7 +505,7 @@ func PodFitsResources(pod *api.Pod, meta interface{}, nodeInfo *schedulercache.N
// We couldn't parse metadata - fallback to computing it.
podRequest = GetResourceRequest(pod)
}
if podRequest.MilliCPU == 0 && podRequest.Memory == 0 && podRequest.NvidiaGPU == 0 {
if podRequest.MilliCPU == 0 && podRequest.Memory == 0 && podRequest.NvidiaGPU == 0 && len(podRequest.OpaqueIntResources) == 0 {
return len(predicateFails) == 0, predicateFails, nil
}
@ -485,6 +519,12 @@ func PodFitsResources(pod *api.Pod, meta interface{}, nodeInfo *schedulercache.N
if allocatable.NvidiaGPU < podRequest.NvidiaGPU+nodeInfo.RequestedResource().NvidiaGPU {
predicateFails = append(predicateFails, NewInsufficientResourceError(api.ResourceNvidiaGPU, podRequest.NvidiaGPU, nodeInfo.RequestedResource().NvidiaGPU, allocatable.NvidiaGPU))
}
for rName, rQuant := range podRequest.OpaqueIntResources {
if allocatable.OpaqueIntResources[rName] < rQuant+nodeInfo.RequestedResource().OpaqueIntResources[rName] {
predicateFails = append(predicateFails, NewInsufficientResourceError(rName, podRequest.OpaqueIntResources[rName], nodeInfo.RequestedResource().OpaqueIntResources[rName], allocatable.OpaqueIntResources[rName]))
}
}
if glog.V(10) {
// We explicitly don't do glog.V(10).Infof() to avoid computing all the parameters if this is
// not logged. There is visible performance gain from it.

View File

@ -74,23 +74,30 @@ func (pvs FakePersistentVolumeInfo) GetPersistentVolumeInfo(pvID string) (*api.P
return nil, fmt.Errorf("Unable to find persistent volume: %s", pvID)
}
func makeResources(milliCPU int64, memory int64, nvidiaGPUs int64, pods int64) api.NodeResources {
var (
opaqueResourceA = api.OpaqueIntResourceName("AAA")
opaqueResourceB = api.OpaqueIntResourceName("BBB")
)
func makeResources(milliCPU, memory, nvidiaGPUs, pods, opaqueA int64) api.NodeResources {
return api.NodeResources{
Capacity: api.ResourceList{
api.ResourceCPU: *resource.NewMilliQuantity(milliCPU, resource.DecimalSI),
api.ResourceMemory: *resource.NewQuantity(memory, resource.BinarySI),
api.ResourcePods: *resource.NewQuantity(pods, resource.DecimalSI),
api.ResourceNvidiaGPU: *resource.NewQuantity(nvidiaGPUs, resource.DecimalSI),
opaqueResourceA: *resource.NewQuantity(opaqueA, resource.DecimalSI),
},
}
}
func makeAllocatableResources(milliCPU int64, memory int64, nvidiaGPUs int64, pods int64) api.ResourceList {
func makeAllocatableResources(milliCPU, memory, nvidiaGPUs, pods, opaqueA int64) api.ResourceList {
return api.ResourceList{
api.ResourceCPU: *resource.NewMilliQuantity(milliCPU, resource.DecimalSI),
api.ResourceMemory: *resource.NewQuantity(memory, resource.BinarySI),
api.ResourcePods: *resource.NewQuantity(pods, resource.DecimalSI),
api.ResourceNvidiaGPU: *resource.NewQuantity(nvidiaGPUs, resource.DecimalSI),
opaqueResourceA: *resource.NewQuantity(opaqueA, resource.DecimalSI),
}
}
@ -98,13 +105,7 @@ func newResourcePod(usage ...schedulercache.Resource) *api.Pod {
containers := []api.Container{}
for _, req := range usage {
containers = append(containers, api.Container{
Resources: api.ResourceRequirements{
Requests: api.ResourceList{
api.ResourceCPU: *resource.NewMilliQuantity(req.MilliCPU, resource.DecimalSI),
api.ResourceMemory: *resource.NewQuantity(req.Memory, resource.BinarySI),
api.ResourceNvidiaGPU: *resource.NewQuantity(req.NvidiaGPU, resource.DecimalSI),
},
},
Resources: api.ResourceRequirements{Requests: req.ResourceList()},
})
}
return &api.Pod{
@ -233,10 +234,105 @@ func TestPodFitsResources(t *testing.T) {
fits: true,
test: "equal edge case for init container",
},
{
pod: newResourcePod(schedulercache.Resource{OpaqueIntResources: map[api.ResourceName]int64{opaqueResourceA: 1}}),
nodeInfo: schedulercache.NewNodeInfo(newResourcePod(schedulercache.Resource{})),
fits: true,
test: "opaque resource fits",
},
{
pod: newResourceInitPod(newResourcePod(schedulercache.Resource{}), schedulercache.Resource{OpaqueIntResources: map[api.ResourceName]int64{opaqueResourceA: 1}}),
nodeInfo: schedulercache.NewNodeInfo(newResourcePod(schedulercache.Resource{})),
fits: true,
test: "opaque resource fits for init container",
},
{
pod: newResourcePod(
schedulercache.Resource{MilliCPU: 1, Memory: 1, OpaqueIntResources: map[api.ResourceName]int64{opaqueResourceA: 10}}),
nodeInfo: schedulercache.NewNodeInfo(
newResourcePod(schedulercache.Resource{MilliCPU: 0, Memory: 0, OpaqueIntResources: map[api.ResourceName]int64{opaqueResourceA: 0}})),
fits: false,
test: "opaque resource capacity enforced",
reasons: []algorithm.PredicateFailureReason{NewInsufficientResourceError(opaqueResourceA, 10, 0, 5)},
},
{
pod: newResourceInitPod(newResourcePod(schedulercache.Resource{}),
schedulercache.Resource{MilliCPU: 1, Memory: 1, OpaqueIntResources: map[api.ResourceName]int64{opaqueResourceA: 10}}),
nodeInfo: schedulercache.NewNodeInfo(
newResourcePod(schedulercache.Resource{MilliCPU: 0, Memory: 0, OpaqueIntResources: map[api.ResourceName]int64{opaqueResourceA: 0}})),
fits: false,
test: "opaque resource capacity enforced for init container",
reasons: []algorithm.PredicateFailureReason{NewInsufficientResourceError(opaqueResourceA, 10, 0, 5)},
},
{
pod: newResourcePod(
schedulercache.Resource{MilliCPU: 1, Memory: 1, OpaqueIntResources: map[api.ResourceName]int64{opaqueResourceA: 1}}),
nodeInfo: schedulercache.NewNodeInfo(
newResourcePod(schedulercache.Resource{MilliCPU: 0, Memory: 0, OpaqueIntResources: map[api.ResourceName]int64{opaqueResourceA: 5}})),
fits: false,
test: "opaque resource allocatable enforced",
reasons: []algorithm.PredicateFailureReason{NewInsufficientResourceError(opaqueResourceA, 1, 5, 5)},
},
{
pod: newResourceInitPod(newResourcePod(schedulercache.Resource{}),
schedulercache.Resource{MilliCPU: 1, Memory: 1, OpaqueIntResources: map[api.ResourceName]int64{opaqueResourceA: 1}}),
nodeInfo: schedulercache.NewNodeInfo(
newResourcePod(schedulercache.Resource{MilliCPU: 0, Memory: 0, OpaqueIntResources: map[api.ResourceName]int64{opaqueResourceA: 5}})),
fits: false,
test: "opaque resource allocatable enforced for init container",
reasons: []algorithm.PredicateFailureReason{NewInsufficientResourceError(opaqueResourceA, 1, 5, 5)},
},
{
pod: newResourcePod(
schedulercache.Resource{MilliCPU: 1, Memory: 1, OpaqueIntResources: map[api.ResourceName]int64{opaqueResourceA: 3}},
schedulercache.Resource{MilliCPU: 1, Memory: 1, OpaqueIntResources: map[api.ResourceName]int64{opaqueResourceA: 3}}),
nodeInfo: schedulercache.NewNodeInfo(
newResourcePod(schedulercache.Resource{MilliCPU: 0, Memory: 0, OpaqueIntResources: map[api.ResourceName]int64{opaqueResourceA: 2}})),
fits: false,
test: "opaque resource allocatable enforced for multiple containers",
reasons: []algorithm.PredicateFailureReason{NewInsufficientResourceError(opaqueResourceA, 6, 2, 5)},
},
{
pod: newResourceInitPod(newResourcePod(schedulercache.Resource{}),
schedulercache.Resource{MilliCPU: 1, Memory: 1, OpaqueIntResources: map[api.ResourceName]int64{opaqueResourceA: 3}},
schedulercache.Resource{MilliCPU: 1, Memory: 1, OpaqueIntResources: map[api.ResourceName]int64{opaqueResourceA: 3}}),
nodeInfo: schedulercache.NewNodeInfo(
newResourcePod(schedulercache.Resource{MilliCPU: 0, Memory: 0, OpaqueIntResources: map[api.ResourceName]int64{opaqueResourceA: 2}})),
fits: true,
test: "opaque resource allocatable admits multiple init containers",
},
{
pod: newResourceInitPod(newResourcePod(schedulercache.Resource{}),
schedulercache.Resource{MilliCPU: 1, Memory: 1, OpaqueIntResources: map[api.ResourceName]int64{opaqueResourceA: 6}},
schedulercache.Resource{MilliCPU: 1, Memory: 1, OpaqueIntResources: map[api.ResourceName]int64{opaqueResourceA: 3}}),
nodeInfo: schedulercache.NewNodeInfo(
newResourcePod(schedulercache.Resource{MilliCPU: 0, Memory: 0, OpaqueIntResources: map[api.ResourceName]int64{opaqueResourceA: 2}})),
fits: false,
test: "opaque resource allocatable enforced for multiple init containers",
reasons: []algorithm.PredicateFailureReason{NewInsufficientResourceError(opaqueResourceA, 6, 2, 5)},
},
{
pod: newResourcePod(
schedulercache.Resource{MilliCPU: 1, Memory: 1, OpaqueIntResources: map[api.ResourceName]int64{opaqueResourceB: 1}}),
nodeInfo: schedulercache.NewNodeInfo(
newResourcePod(schedulercache.Resource{MilliCPU: 0, Memory: 0})),
fits: false,
test: "opaque resource allocatable enforced for unknown resource",
reasons: []algorithm.PredicateFailureReason{NewInsufficientResourceError(opaqueResourceB, 1, 0, 0)},
},
{
pod: newResourceInitPod(newResourcePod(schedulercache.Resource{}),
schedulercache.Resource{MilliCPU: 1, Memory: 1, OpaqueIntResources: map[api.ResourceName]int64{opaqueResourceB: 1}}),
nodeInfo: schedulercache.NewNodeInfo(
newResourcePod(schedulercache.Resource{MilliCPU: 0, Memory: 0})),
fits: false,
test: "opaque resource allocatable enforced for unknown resource for init container",
reasons: []algorithm.PredicateFailureReason{NewInsufficientResourceError(opaqueResourceB, 1, 0, 0)},
},
}
for _, test := range enoughPodsTests {
node := api.Node{Status: api.NodeStatus{Capacity: makeResources(10, 20, 0, 32).Capacity, Allocatable: makeAllocatableResources(10, 20, 0, 32)}}
node := api.Node{Status: api.NodeStatus{Capacity: makeResources(10, 20, 0, 32, 5).Capacity, Allocatable: makeAllocatableResources(10, 20, 0, 32, 5)}}
test.nodeInfo.SetNode(&node)
fits, reasons, err := PodFitsResources(test.pod, PredicateMetadata(test.pod, nil), test.nodeInfo)
if err != nil {
@ -291,7 +387,7 @@ func TestPodFitsResources(t *testing.T) {
},
}
for _, test := range notEnoughPodsTests {
node := api.Node{Status: api.NodeStatus{Capacity: api.ResourceList{}, Allocatable: makeAllocatableResources(10, 20, 0, 1)}}
node := api.Node{Status: api.NodeStatus{Capacity: api.ResourceList{}, Allocatable: makeAllocatableResources(10, 20, 0, 1, 0)}}
test.nodeInfo.SetNode(&node)
fits, reasons, err := PodFitsResources(test.pod, PredicateMetadata(test.pod, nil), test.nodeInfo)
if err != nil {
@ -1739,7 +1835,7 @@ func TestRunGeneralPredicates(t *testing.T) {
newResourcePod(schedulercache.Resource{MilliCPU: 9, Memory: 19})),
node: &api.Node{
ObjectMeta: api.ObjectMeta{Name: "machine1"},
Status: api.NodeStatus{Capacity: makeResources(10, 20, 0, 32).Capacity, Allocatable: makeAllocatableResources(10, 20, 0, 32)},
Status: api.NodeStatus{Capacity: makeResources(10, 20, 0, 32, 0).Capacity, Allocatable: makeAllocatableResources(10, 20, 0, 32, 0)},
},
fits: true,
wErr: nil,
@ -1751,7 +1847,7 @@ func TestRunGeneralPredicates(t *testing.T) {
newResourcePod(schedulercache.Resource{MilliCPU: 5, Memory: 19})),
node: &api.Node{
ObjectMeta: api.ObjectMeta{Name: "machine1"},
Status: api.NodeStatus{Capacity: makeResources(10, 20, 0, 32).Capacity, Allocatable: makeAllocatableResources(10, 20, 0, 32)},
Status: api.NodeStatus{Capacity: makeResources(10, 20, 0, 32, 0).Capacity, Allocatable: makeAllocatableResources(10, 20, 0, 32, 0)},
},
fits: false,
wErr: nil,
@ -1765,7 +1861,7 @@ func TestRunGeneralPredicates(t *testing.T) {
pod: &api.Pod{},
nodeInfo: schedulercache.NewNodeInfo(
newResourcePod(schedulercache.Resource{MilliCPU: 9, Memory: 19})),
node: &api.Node{Status: api.NodeStatus{Capacity: makeResources(10, 20, 1, 32).Capacity, Allocatable: makeAllocatableResources(10, 20, 1, 32)}},
node: &api.Node{Status: api.NodeStatus{Capacity: makeResources(10, 20, 1, 32, 0).Capacity, Allocatable: makeAllocatableResources(10, 20, 1, 32, 0)}},
fits: true,
wErr: nil,
test: "no resources/port/host requested always fits on GPU machine",
@ -1774,7 +1870,7 @@ func TestRunGeneralPredicates(t *testing.T) {
pod: newResourcePod(schedulercache.Resource{MilliCPU: 3, Memory: 1, NvidiaGPU: 1}),
nodeInfo: schedulercache.NewNodeInfo(
newResourcePod(schedulercache.Resource{MilliCPU: 5, Memory: 10, NvidiaGPU: 1})),
node: &api.Node{Status: api.NodeStatus{Capacity: makeResources(10, 20, 1, 32).Capacity, Allocatable: makeAllocatableResources(10, 20, 1, 32)}},
node: &api.Node{Status: api.NodeStatus{Capacity: makeResources(10, 20, 1, 32, 0).Capacity, Allocatable: makeAllocatableResources(10, 20, 1, 32, 0)}},
fits: false,
wErr: nil,
reasons: []algorithm.PredicateFailureReason{NewInsufficientResourceError(api.ResourceNvidiaGPU, 1, 1, 1)},
@ -1784,7 +1880,7 @@ func TestRunGeneralPredicates(t *testing.T) {
pod: newResourcePod(schedulercache.Resource{MilliCPU: 3, Memory: 1, NvidiaGPU: 1}),
nodeInfo: schedulercache.NewNodeInfo(
newResourcePod(schedulercache.Resource{MilliCPU: 5, Memory: 10, NvidiaGPU: 0})),
node: &api.Node{Status: api.NodeStatus{Capacity: makeResources(10, 20, 1, 32).Capacity, Allocatable: makeAllocatableResources(10, 20, 1, 32)}},
node: &api.Node{Status: api.NodeStatus{Capacity: makeResources(10, 20, 1, 32, 0).Capacity, Allocatable: makeAllocatableResources(10, 20, 1, 32, 0)}},
fits: true,
wErr: nil,
test: "enough GPU resource",
@ -1798,7 +1894,7 @@ func TestRunGeneralPredicates(t *testing.T) {
nodeInfo: schedulercache.NewNodeInfo(),
node: &api.Node{
ObjectMeta: api.ObjectMeta{Name: "machine1"},
Status: api.NodeStatus{Capacity: makeResources(10, 20, 0, 32).Capacity, Allocatable: makeAllocatableResources(10, 20, 0, 32)},
Status: api.NodeStatus{Capacity: makeResources(10, 20, 0, 32, 0).Capacity, Allocatable: makeAllocatableResources(10, 20, 0, 32, 0)},
},
fits: false,
wErr: nil,
@ -1810,7 +1906,7 @@ func TestRunGeneralPredicates(t *testing.T) {
nodeInfo: schedulercache.NewNodeInfo(newPodWithPort(123)),
node: &api.Node{
ObjectMeta: api.ObjectMeta{Name: "machine1"},
Status: api.NodeStatus{Capacity: makeResources(10, 20, 0, 32).Capacity, Allocatable: makeAllocatableResources(10, 20, 0, 32)},
Status: api.NodeStatus{Capacity: makeResources(10, 20, 0, 32, 0).Capacity, Allocatable: makeAllocatableResources(10, 20, 0, 32, 0)},
},
fits: false,
wErr: nil,
@ -2897,7 +2993,7 @@ func TestPodSchedulesOnNodeWithMemoryPressureCondition(t *testing.T) {
ImagePullPolicy: "Always",
// at least one requirement -> burstable pod
Resources: api.ResourceRequirements{
Requests: makeAllocatableResources(100, 100, 100, 100),
Requests: makeAllocatableResources(100, 100, 100, 100, 0),
},
},
},

View File

@ -21,6 +21,7 @@ go_library(
tags = ["automanaged"],
deps = [
"//pkg/api:go_default_library",
"//pkg/api/resource:go_default_library",
"//pkg/client/cache:go_default_library",
"//pkg/labels:go_default_library",
"//pkg/util/wait:go_default_library",

View File

@ -22,6 +22,7 @@ import (
"github.com/golang/glog"
"k8s.io/kubernetes/pkg/api"
"k8s.io/kubernetes/pkg/api/resource"
clientcache "k8s.io/kubernetes/pkg/client/cache"
priorityutil "k8s.io/kubernetes/plugin/pkg/scheduler/algorithm/priorities/util"
)
@ -55,9 +56,22 @@ type NodeInfo struct {
// Resource is a collection of compute resource.
type Resource struct {
MilliCPU int64
Memory int64
NvidiaGPU int64
MilliCPU int64
Memory int64
NvidiaGPU int64
OpaqueIntResources map[api.ResourceName]int64
}
func (r *Resource) ResourceList() api.ResourceList {
result := api.ResourceList{
api.ResourceCPU: *resource.NewMilliQuantity(r.MilliCPU, resource.DecimalSI),
api.ResourceMemory: *resource.NewQuantity(r.Memory, resource.BinarySI),
api.ResourceNvidiaGPU: *resource.NewQuantity(r.NvidiaGPU, resource.DecimalSI),
}
for rName, rQuant := range r.OpaqueIntResources {
result[rName] = *resource.NewQuantity(rQuant, resource.DecimalSI)
}
return result
}
// NewNodeInfo returns a ready to use empty NodeInfo object.
@ -169,10 +183,17 @@ func hasPodAffinityConstraints(pod *api.Pod) bool {
// addPod adds pod information to this NodeInfo.
func (n *NodeInfo) addPod(pod *api.Pod) {
cpu, mem, nvidia_gpu, non0_cpu, non0_mem := calculateResource(pod)
n.requestedResource.MilliCPU += cpu
n.requestedResource.Memory += mem
n.requestedResource.NvidiaGPU += nvidia_gpu
// cpu, mem, nvidia_gpu, non0_cpu, non0_mem := calculateResource(pod)
res, non0_cpu, non0_mem := calculateResource(pod)
n.requestedResource.MilliCPU += res.MilliCPU
n.requestedResource.Memory += res.Memory
n.requestedResource.NvidiaGPU += res.NvidiaGPU
if n.requestedResource.OpaqueIntResources == nil && len(res.OpaqueIntResources) > 0 {
n.requestedResource.OpaqueIntResources = map[api.ResourceName]int64{}
}
for rName, rQuant := range res.OpaqueIntResources {
n.requestedResource.OpaqueIntResources[rName] += rQuant
}
n.nonzeroRequest.MilliCPU += non0_cpu
n.nonzeroRequest.Memory += non0_mem
n.pods = append(n.pods, pod)
@ -213,10 +234,17 @@ func (n *NodeInfo) removePod(pod *api.Pod) error {
n.pods[i] = n.pods[len(n.pods)-1]
n.pods = n.pods[:len(n.pods)-1]
// reduce the resource data
cpu, mem, nvidia_gpu, non0_cpu, non0_mem := calculateResource(pod)
n.requestedResource.MilliCPU -= cpu
n.requestedResource.Memory -= mem
n.requestedResource.NvidiaGPU -= nvidia_gpu
res, non0_cpu, non0_mem := calculateResource(pod)
n.requestedResource.MilliCPU -= res.MilliCPU
n.requestedResource.Memory -= res.Memory
n.requestedResource.NvidiaGPU -= res.NvidiaGPU
if len(res.OpaqueIntResources) > 0 && n.requestedResource.OpaqueIntResources == nil {
n.requestedResource.OpaqueIntResources = map[api.ResourceName]int64{}
}
for rName, rQuant := range res.OpaqueIntResources {
n.requestedResource.OpaqueIntResources[rName] -= rQuant
}
n.nonzeroRequest.MilliCPU -= non0_cpu
n.nonzeroRequest.Memory -= non0_mem
n.generation++
@ -226,17 +254,31 @@ func (n *NodeInfo) removePod(pod *api.Pod) error {
return fmt.Errorf("no corresponding pod %s in pods of node %s", pod.Name, n.node.Name)
}
func calculateResource(pod *api.Pod) (cpu int64, mem int64, nvidia_gpu int64, non0_cpu int64, non0_mem int64) {
func calculateResource(pod *api.Pod) (res Resource, non0_cpu int64, non0_mem int64) {
for _, c := range pod.Spec.Containers {
req := c.Resources.Requests
cpu += req.Cpu().MilliValue()
mem += req.Memory().Value()
nvidia_gpu += req.NvidiaGPU().Value()
for rName, rQuant := range c.Resources.Requests {
switch rName {
case api.ResourceCPU:
res.MilliCPU += rQuant.MilliValue()
case api.ResourceMemory:
res.Memory += rQuant.Value()
case api.ResourceNvidiaGPU:
res.NvidiaGPU += rQuant.Value()
default:
if api.IsOpaqueIntResourceName(rName) {
// Lazily allocate opaque resource map.
if res.OpaqueIntResources == nil {
res.OpaqueIntResources = map[api.ResourceName]int64{}
}
res.OpaqueIntResources[rName] += rQuant.Value()
}
}
}
non0_cpu_req, non0_mem_req := priorityutil.GetNonzeroRequests(&req)
non0_cpu_req, non0_mem_req := priorityutil.GetNonzeroRequests(&c.Resources.Requests)
non0_cpu += non0_cpu_req
non0_mem += non0_mem_req
// No non-zero resources for GPUs
// No non-zero resources for GPUs or opaque resources.
}
return
}
@ -244,10 +286,26 @@ func calculateResource(pod *api.Pod) (cpu int64, mem int64, nvidia_gpu int64, no
// Sets the overall node information.
func (n *NodeInfo) SetNode(node *api.Node) error {
n.node = node
n.allocatableResource.MilliCPU = node.Status.Allocatable.Cpu().MilliValue()
n.allocatableResource.Memory = node.Status.Allocatable.Memory().Value()
n.allocatableResource.NvidiaGPU = node.Status.Allocatable.NvidiaGPU().Value()
n.allowedPodNumber = int(node.Status.Allocatable.Pods().Value())
for rName, rQuant := range node.Status.Allocatable {
switch rName {
case api.ResourceCPU:
n.allocatableResource.MilliCPU = rQuant.MilliValue()
case api.ResourceMemory:
n.allocatableResource.Memory = rQuant.Value()
case api.ResourceNvidiaGPU:
n.allocatableResource.NvidiaGPU = rQuant.Value()
case api.ResourcePods:
n.allowedPodNumber = int(rQuant.Value())
default:
if api.IsOpaqueIntResourceName(rName) {
// Lazily allocate opaque resource map.
if n.allocatableResource.OpaqueIntResources == nil {
n.allocatableResource.OpaqueIntResources = map[api.ResourceName]int64{}
}
n.allocatableResource.OpaqueIntResources[rName] = rQuant.Value()
}
}
}
n.generation++
return nil
}

View File

@ -72,6 +72,7 @@ go_library(
"networking_perf.go",
"node_problem_detector.go",
"nodeoutofdisk.go",
"opaque_resource.go",
"pd.go",
"persistent_volumes.go",
"petset.go",

373
test/e2e/opaque_resource.go Normal file
View File

@ -0,0 +1,373 @@
/*
Copyright 2016 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package e2e
import (
"fmt"
"strings"
"sync"
"time"
"k8s.io/kubernetes/pkg/api"
"k8s.io/kubernetes/pkg/api/resource"
"k8s.io/kubernetes/pkg/client/cache"
"k8s.io/kubernetes/pkg/fields"
"k8s.io/kubernetes/pkg/runtime"
"k8s.io/kubernetes/pkg/util/system"
"k8s.io/kubernetes/pkg/util/wait"
"k8s.io/kubernetes/pkg/watch"
"k8s.io/kubernetes/test/e2e/framework"
. "github.com/onsi/ginkgo"
. "github.com/onsi/gomega"
)
var _ = framework.KubeDescribe("Opaque resources [Feature:OpaqueResources]", func() {
f := framework.NewDefaultFramework("opaque-resource")
opaqueResName := api.OpaqueIntResourceName("foo")
var node *api.Node
BeforeEach(func() {
if node == nil {
// Priming invocation; select the first non-master node.
nodes, err := f.ClientSet.Core().Nodes().List(api.ListOptions{})
Expect(err).NotTo(HaveOccurred())
for _, n := range nodes.Items {
if !system.IsMasterNode(&n) {
node = &n
break
}
}
if node == nil {
Fail("unable to select a non-master node")
}
}
removeOpaqueResource(f, node.Name, opaqueResName)
addOpaqueResource(f, node.Name, opaqueResName)
})
It("should not break pods that do not consume opaque integer resources.", func() {
By("Creating a vanilla pod")
requests := api.ResourceList{api.ResourceCPU: resource.MustParse("0.1")}
limits := api.ResourceList{api.ResourceCPU: resource.MustParse("0.2")}
pod := newTestPod(f, "without-oir", requests, limits)
By("Observing an event that indicates the pod was scheduled")
action := func() error {
_, err := f.ClientSet.Core().Pods(f.Namespace.Name).Create(pod)
return err
}
predicate := func(e *api.Event) bool {
return e.Type == api.EventTypeNormal &&
e.Reason == "Scheduled" &&
// Here we don't check for the bound node name since it can land on
// any one (this pod doesn't require any of the opaque resource.)
strings.Contains(e.Message, fmt.Sprintf("Successfully assigned %v", pod.Name))
}
success, err := observeEventAfterAction(f, predicate, action)
Expect(err).NotTo(HaveOccurred())
Expect(success).To(Equal(true))
})
It("should schedule pods that do consume opaque integer resources.", func() {
By("Creating a pod that requires less of the opaque resource than is allocatable on a node.")
requests := api.ResourceList{
api.ResourceCPU: resource.MustParse("0.1"),
opaqueResName: resource.MustParse("1"),
}
limits := api.ResourceList{
api.ResourceCPU: resource.MustParse("0.2"),
opaqueResName: resource.MustParse("2"),
}
pod := newTestPod(f, "min-oir", requests, limits)
By("Observing an event that indicates the pod was scheduled")
action := func() error {
_, err := f.ClientSet.Core().Pods(f.Namespace.Name).Create(pod)
return err
}
predicate := func(e *api.Event) bool {
return e.Type == api.EventTypeNormal &&
e.Reason == "Scheduled" &&
strings.Contains(e.Message, fmt.Sprintf("Successfully assigned %v to %v", pod.Name, node.Name))
}
success, err := observeEventAfterAction(f, predicate, action)
Expect(err).NotTo(HaveOccurred())
Expect(success).To(Equal(true))
})
It("should not schedule pods that exceed the available amount of opaque integer resource.", func() {
By("Creating a pod that requires more of the opaque resource than is allocatable on any node")
requests := api.ResourceList{opaqueResName: resource.MustParse("6")}
limits := api.ResourceList{}
By("Observing an event that indicates the pod was not scheduled")
action := func() error {
_, err := f.ClientSet.Core().Pods(f.Namespace.Name).Create(newTestPod(f, "over-max-oir", requests, limits))
return err
}
predicate := func(e *api.Event) bool {
return e.Type == "Warning" &&
e.Reason == "FailedScheduling" &&
strings.Contains(e.Message, "failed to fit in any node")
}
success, err := observeEventAfterAction(f, predicate, action)
Expect(err).NotTo(HaveOccurred())
Expect(success).To(Equal(true))
})
It("should account opaque integer resources in pods with multiple containers.", func() {
By("Creating a pod with two containers that together require less of the opaque resource than is allocatable on a node")
requests := api.ResourceList{opaqueResName: resource.MustParse("1")}
limits := api.ResourceList{}
image := framework.GetPauseImageName(f.ClientSet)
// This pod consumes 2 "foo" resources.
pod := &api.Pod{
ObjectMeta: api.ObjectMeta{
Name: "mult-container-oir",
},
Spec: api.PodSpec{
Containers: []api.Container{
{
Name: "pause",
Image: image,
Resources: api.ResourceRequirements{
Requests: requests,
Limits: limits,
},
},
{
Name: "pause-sidecar",
Image: image,
Resources: api.ResourceRequirements{
Requests: requests,
Limits: limits,
},
},
},
},
}
By("Observing an event that indicates the pod was scheduled")
action := func() error {
_, err := f.ClientSet.Core().Pods(f.Namespace.Name).Create(pod)
return err
}
predicate := func(e *api.Event) bool {
return e.Type == api.EventTypeNormal &&
e.Reason == "Scheduled" &&
strings.Contains(e.Message, fmt.Sprintf("Successfully assigned %v to %v", pod.Name, node.Name))
}
success, err := observeEventAfterAction(f, predicate, action)
Expect(err).NotTo(HaveOccurred())
Expect(success).To(Equal(true))
By("Creating a pod with two containers that together require more of the opaque resource than is allocatable on any node")
requests = api.ResourceList{opaqueResName: resource.MustParse("3")}
limits = api.ResourceList{}
// This pod consumes 6 "foo" resources.
pod = &api.Pod{
ObjectMeta: api.ObjectMeta{
Name: "mult-container-over-max-oir",
},
Spec: api.PodSpec{
Containers: []api.Container{
{
Name: "pause",
Image: image,
Resources: api.ResourceRequirements{
Requests: requests,
Limits: limits,
},
},
{
Name: "pause-sidecar",
Image: image,
Resources: api.ResourceRequirements{
Requests: requests,
Limits: limits,
},
},
},
},
}
By("Observing an event that indicates the pod was not scheduled")
action = func() error {
_, err = f.ClientSet.Core().Pods(f.Namespace.Name).Create(pod)
return err
}
predicate = func(e *api.Event) bool {
return e.Type == "Warning" &&
e.Reason == "FailedScheduling" &&
strings.Contains(e.Message, "failed to fit in any node")
}
success, err = observeEventAfterAction(f, predicate, action)
Expect(err).NotTo(HaveOccurred())
Expect(success).To(Equal(true))
})
})
// Adds the opaque resource to a node.
func addOpaqueResource(f *framework.Framework, nodeName string, opaqueResName api.ResourceName) {
action := func() error {
patch := []byte(fmt.Sprintf(`[{"op": "add", "path": "/status/capacity/%s", "value": "5"}]`, escapeForJSONPatch(opaqueResName)))
return f.ClientSet.Core().RESTClient().Patch(api.JSONPatchType).Resource("nodes").Name(nodeName).SubResource("status").Body(patch).Do().Error()
}
predicate := func(n *api.Node) bool {
capacity, foundCap := n.Status.Capacity[opaqueResName]
allocatable, foundAlloc := n.Status.Allocatable[opaqueResName]
return foundCap && capacity.MilliValue() == int64(5000) &&
foundAlloc && allocatable.MilliValue() == int64(5000)
}
success, err := observeNodeUpdateAfterAction(f, nodeName, predicate, action)
Expect(err).NotTo(HaveOccurred())
Expect(success).To(Equal(true))
}
// Removes the opaque resource from a node.
func removeOpaqueResource(f *framework.Framework, nodeName string, opaqueResName api.ResourceName) {
action := func() error {
patch := []byte(fmt.Sprintf(`[{"op": "remove", "path": "/status/capacity/%s"}]`, escapeForJSONPatch(opaqueResName)))
f.ClientSet.Core().RESTClient().Patch(api.JSONPatchType).Resource("nodes").Name(nodeName).SubResource("status").Body(patch).Do()
return nil // Ignore error -- the opaque resource may not exist.
}
predicate := func(n *api.Node) bool {
_, foundCap := n.Status.Capacity[opaqueResName]
_, foundAlloc := n.Status.Allocatable[opaqueResName]
return !foundCap && !foundAlloc
}
success, err := observeNodeUpdateAfterAction(f, nodeName, predicate, action)
Expect(err).NotTo(HaveOccurred())
Expect(success).To(Equal(true))
}
func escapeForJSONPatch(resName api.ResourceName) string {
// Escape forward slashes in the resource name per the JSON Pointer spec.
// See https://tools.ietf.org/html/rfc6901#section-3
return strings.Replace(string(resName), "/", "~1", -1)
}
// Returns true if a node update matching the predicate was emitted from the
// system after performing the supplied action.
func observeNodeUpdateAfterAction(f *framework.Framework, nodeName string, nodePredicate func(*api.Node) bool, action func() error) (bool, error) {
observedMatchingNode := false
nodeSelector := fields.OneTermEqualSelector("metadata.name", nodeName)
informerStartedChan := make(chan struct{})
var informerStartedGuard sync.Once
_, controller := cache.NewInformer(
&cache.ListWatch{
ListFunc: func(options api.ListOptions) (runtime.Object, error) {
options.FieldSelector = nodeSelector
ls, err := f.ClientSet.Core().Nodes().List(options)
return ls, err
},
WatchFunc: func(options api.ListOptions) (watch.Interface, error) {
options.FieldSelector = nodeSelector
w, err := f.ClientSet.Core().Nodes().Watch(options)
// Signal parent goroutine that watching has begun.
informerStartedGuard.Do(func() { close(informerStartedChan) })
return w, err
},
},
&api.Node{},
0,
cache.ResourceEventHandlerFuncs{
UpdateFunc: func(oldObj, newObj interface{}) {
n, ok := newObj.(*api.Node)
Expect(ok).To(Equal(true))
if nodePredicate(n) {
observedMatchingNode = true
}
},
},
)
// Start the informer and block this goroutine waiting for the started signal.
informerStopChan := make(chan struct{})
defer func() { close(informerStopChan) }()
go controller.Run(informerStopChan)
<-informerStartedChan
// Invoke the action function.
err := action()
if err != nil {
return false, err
}
// Poll whether the informer has found a matching node update with a timeout.
// Wait up 2 minutes polling every second.
timeout := 2 * time.Minute
interval := 1 * time.Second
err = wait.Poll(interval, timeout, func() (bool, error) {
return observedMatchingNode, nil
})
return err == nil, err
}
// Returns true if an event matching the predicate was emitted from the system
// after performing the supplied action.
func observeEventAfterAction(f *framework.Framework, eventPredicate func(*api.Event) bool, action func() error) (bool, error) {
observedMatchingEvent := false
// Create an informer to list/watch events from the test framework namespace.
_, controller := cache.NewInformer(
&cache.ListWatch{
ListFunc: func(options api.ListOptions) (runtime.Object, error) {
ls, err := f.ClientSet.Core().Events(f.Namespace.Name).List(options)
return ls, err
},
WatchFunc: func(options api.ListOptions) (watch.Interface, error) {
w, err := f.ClientSet.Core().Events(f.Namespace.Name).Watch(options)
return w, err
},
},
&api.Event{},
0,
cache.ResourceEventHandlerFuncs{
AddFunc: func(obj interface{}) {
e, ok := obj.(*api.Event)
By(fmt.Sprintf("Considering event: \nType = [%s], Reason = [%s], Message = [%s]", e.Type, e.Reason, e.Message))
Expect(ok).To(Equal(true))
if ok && eventPredicate(e) {
observedMatchingEvent = true
}
},
},
)
informerStopChan := make(chan struct{})
defer func() { close(informerStopChan) }()
go controller.Run(informerStopChan)
// Invoke the action function.
err := action()
if err != nil {
return false, err
}
// Poll whether the informer has found a matching event with a timeout.
// Wait up 2 minutes polling every second.
timeout := 2 * time.Minute
interval := 1 * time.Second
err = wait.Poll(interval, timeout, func() (bool, error) {
return observedMatchingEvent, nil
})
return err == nil, err
}