Cache Allocatable Resources

This commit is contained in:
Wojciech Tyczynski 2016-07-12 16:30:26 +02:00
parent 58c201834c
commit c929d95884
5 changed files with 82 additions and 44 deletions

View File

@ -491,22 +491,18 @@ func PodFitsResources(pod *api.Pod, meta interface{}, nodeInfo *schedulercache.N
return true, nil
}
allocatable := node.Status.Allocatable
totalMilliCPU := allocatable.Cpu().MilliValue()
totalMemory := allocatable.Memory().Value()
totalNvidiaGPU := allocatable.NvidiaGPU().Value()
if totalMilliCPU < podRequest.milliCPU+nodeInfo.RequestedResource().MilliCPU {
allocatable := nodeInfo.AllocatableResource()
if allocatable.MilliCPU < podRequest.milliCPU+nodeInfo.RequestedResource().MilliCPU {
return false,
newInsufficientResourceError(cpuResourceName, podRequest.milliCPU, nodeInfo.RequestedResource().MilliCPU, totalMilliCPU)
newInsufficientResourceError(cpuResourceName, podRequest.milliCPU, nodeInfo.RequestedResource().MilliCPU, allocatable.MilliCPU)
}
if totalMemory < podRequest.memory+nodeInfo.RequestedResource().Memory {
if allocatable.Memory < podRequest.memory+nodeInfo.RequestedResource().Memory {
return false,
newInsufficientResourceError(memoryResourceName, podRequest.memory, nodeInfo.RequestedResource().Memory, totalMemory)
newInsufficientResourceError(memoryResourceName, podRequest.memory, nodeInfo.RequestedResource().Memory, allocatable.Memory)
}
if totalNvidiaGPU < podRequest.nvidiaGPU+nodeInfo.RequestedResource().NvidiaGPU {
if allocatable.NvidiaGPU < podRequest.nvidiaGPU+nodeInfo.RequestedResource().NvidiaGPU {
return false,
newInsufficientResourceError(nvidiaGpuResourceName, podRequest.nvidiaGPU, nodeInfo.RequestedResource().NvidiaGPU, totalNvidiaGPU)
newInsufficientResourceError(nvidiaGpuResourceName, podRequest.nvidiaGPU, nodeInfo.RequestedResource().NvidiaGPU, allocatable.NvidiaGPU)
}
if glog.V(10) {
// We explicitly don't do glog.V(10).Infof() to avoid computing all the parameters if this is

View File

@ -57,22 +57,20 @@ func calculateScore(requested int64, capacity int64, node string) int64 {
// 'pods' is a list of pods currently scheduled on the node.
// TODO: Use Node() from nodeInfo instead of passing it.
func calculateResourceOccupancy(pod *api.Pod, podRequests *schedulercache.Resource, node *api.Node, nodeInfo *schedulercache.NodeInfo) schedulerapi.HostPriority {
capacityMilliCPU := node.Status.Allocatable.Cpu().MilliValue()
capacityMemory := node.Status.Allocatable.Memory().Value()
allocatableResources := nodeInfo.AllocatableResource()
totalResources := *podRequests
totalResources.MilliCPU += nodeInfo.NonZeroRequest().MilliCPU
totalResources.Memory += nodeInfo.NonZeroRequest().Memory
cpuScore := calculateScore(totalResources.MilliCPU, capacityMilliCPU, node.Name)
memoryScore := calculateScore(totalResources.Memory, capacityMemory, node.Name)
cpuScore := calculateScore(totalResources.MilliCPU, allocatableResources.MilliCPU, node.Name)
memoryScore := calculateScore(totalResources.Memory, allocatableResources.Memory, node.Name)
if glog.V(10) {
// We explicitly don't do glog.V(10).Infof() to avoid computing all the parameters if this is
// not logged. There is visible performance gain from it.
glog.V(10).Infof(
"%v -> %v: Least Requested Priority, capacity %d millicores %d memory bytes, total request %d millicores %d memory bytes, score %d CPU %d memory",
pod.Name, node.Name,
capacityMilliCPU, capacityMemory,
allocatableResources.MilliCPU, allocatableResources.Memory,
totalResources.MilliCPU, totalResources.Memory,
cpuScore, memoryScore,
)
@ -239,15 +237,13 @@ func BalancedResourceAllocation(pod *api.Pod, nodeNameToInfo map[string]*schedul
// TODO: Use Node() from nodeInfo instead of passing it.
func calculateBalancedResourceAllocation(pod *api.Pod, podRequests *schedulercache.Resource, node *api.Node, nodeInfo *schedulercache.NodeInfo) schedulerapi.HostPriority {
capacityMilliCPU := node.Status.Allocatable.Cpu().MilliValue()
capacityMemory := node.Status.Allocatable.Memory().Value()
allocatableResources := nodeInfo.AllocatableResource()
totalResources := *podRequests
totalResources.MilliCPU += nodeInfo.NonZeroRequest().MilliCPU
totalResources.Memory += nodeInfo.NonZeroRequest().Memory
cpuFraction := fractionOfCapacity(totalResources.MilliCPU, capacityMilliCPU)
memoryFraction := fractionOfCapacity(totalResources.Memory, capacityMemory)
cpuFraction := fractionOfCapacity(totalResources.MilliCPU, allocatableResources.MilliCPU)
memoryFraction := fractionOfCapacity(totalResources.Memory, allocatableResources.Memory)
score := int(0)
if cpuFraction >= 1 || memoryFraction >= 1 {
// if requested >= capacity, the corresponding host should never be preferrred.
@ -266,7 +262,7 @@ func calculateBalancedResourceAllocation(pod *api.Pod, podRequests *schedulercac
glog.V(10).Infof(
"%v -> %v: Balanced Resource Allocation, capacity %d millicores %d memory bytes, total request %d millicores %d memory bytes, score %d",
pod.Name, node.Name,
capacityMilliCPU, capacityMemory,
allocatableResources.MilliCPU, allocatableResources.Memory,
totalResources.MilliCPU, totalResources.Memory,
score,
)

View File

@ -138,6 +138,12 @@ func TestZeroRequest(t *testing.T) {
const expectedPriority int = 25
for _, test := range tests {
nodeNameToInfo := schedulercache.CreateNodeNameToInfoMap(test.pods)
for _, node := range test.nodes {
if _, ok := nodeNameToInfo[node.Name]; !ok {
nodeNameToInfo[node.Name] = schedulercache.NewNodeInfo()
}
nodeNameToInfo[node.Name].SetNode(node)
}
list, err := scheduler.PrioritizeNodes(
test.pod,
nodeNameToInfo,
@ -389,6 +395,12 @@ func TestLeastRequested(t *testing.T) {
for _, test := range tests {
nodeNameToInfo := schedulercache.CreateNodeNameToInfoMap(test.pods)
for _, node := range test.nodes {
if _, ok := nodeNameToInfo[node.Name]; !ok {
nodeNameToInfo[node.Name] = schedulercache.NewNodeInfo()
}
nodeNameToInfo[node.Name].SetNode(node)
}
list, err := LeastRequestedPriority(test.pod, nodeNameToInfo, algorithm.FakeNodeLister(test.nodes))
if err != nil {
t.Errorf("unexpected error: %v", err)
@ -722,6 +734,12 @@ func TestBalancedResourceAllocation(t *testing.T) {
for _, test := range tests {
nodeNameToInfo := schedulercache.CreateNodeNameToInfoMap(test.pods)
for _, node := range test.nodes {
if _, ok := nodeNameToInfo[node.Name]; !ok {
nodeNameToInfo[node.Name] = schedulercache.NewNodeInfo()
}
nodeNameToInfo[node.Name].SetNode(node)
}
list, err := BalancedResourceAllocation(test.pod, nodeNameToInfo, algorithm.FakeNodeLister(test.nodes))
if err != nil {
t.Errorf("unexpected error: %v", err)

View File

@ -64,7 +64,8 @@ func TestAssumePodScheduled(t *testing.T) {
MilliCPU: 100,
Memory: 500,
},
pods: []*api.Pod{testPods[0]},
allocatableResource: &Resource{},
pods: []*api.Pod{testPods[0]},
},
}, {
pods: []*api.Pod{testPods[1], testPods[2]},
@ -77,7 +78,8 @@ func TestAssumePodScheduled(t *testing.T) {
MilliCPU: 300,
Memory: 1524,
},
pods: []*api.Pod{testPods[1], testPods[2]},
allocatableResource: &Resource{},
pods: []*api.Pod{testPods[1], testPods[2]},
},
}, { // test non-zero request
pods: []*api.Pod{testPods[3]},
@ -90,7 +92,8 @@ func TestAssumePodScheduled(t *testing.T) {
MilliCPU: priorityutil.DefaultMilliCpuRequest,
Memory: priorityutil.DefaultMemoryRequest,
},
pods: []*api.Pod{testPods[3]},
allocatableResource: &Resource{},
pods: []*api.Pod{testPods[3]},
},
}}
@ -147,7 +150,8 @@ func TestExpirePod(t *testing.T) {
MilliCPU: 200,
Memory: 1024,
},
pods: []*api.Pod{testPods[1]},
allocatableResource: &Resource{},
pods: []*api.Pod{testPods[1]},
},
}}
@ -194,7 +198,8 @@ func TestAddPodWillConfirm(t *testing.T) {
MilliCPU: 100,
Memory: 500,
},
pods: []*api.Pod{testPods[0]},
allocatableResource: &Resource{},
pods: []*api.Pod{testPods[0]},
},
}}
@ -237,7 +242,8 @@ func TestAddPodAfterExpiration(t *testing.T) {
MilliCPU: 100,
Memory: 500,
},
pods: []*api.Pod{basePod},
allocatableResource: &Resource{},
pods: []*api.Pod{basePod},
},
}}
@ -288,7 +294,8 @@ func TestUpdatePod(t *testing.T) {
MilliCPU: 200,
Memory: 1024,
},
pods: []*api.Pod{testPods[1]},
allocatableResource: &Resource{},
pods: []*api.Pod{testPods[1]},
}, {
requestedResource: &Resource{
MilliCPU: 100,
@ -298,7 +305,8 @@ func TestUpdatePod(t *testing.T) {
MilliCPU: 100,
Memory: 500,
},
pods: []*api.Pod{testPods[0]},
allocatableResource: &Resource{},
pods: []*api.Pod{testPods[0]},
}},
}}
@ -351,7 +359,8 @@ func TestExpireAddUpdatePod(t *testing.T) {
MilliCPU: 200,
Memory: 1024,
},
pods: []*api.Pod{testPods[1]},
allocatableResource: &Resource{},
pods: []*api.Pod{testPods[1]},
}, {
requestedResource: &Resource{
MilliCPU: 100,
@ -361,7 +370,8 @@ func TestExpireAddUpdatePod(t *testing.T) {
MilliCPU: 100,
Memory: 500,
},
pods: []*api.Pod{testPods[0]},
allocatableResource: &Resource{},
pods: []*api.Pod{testPods[0]},
}},
}}
@ -414,7 +424,8 @@ func TestRemovePod(t *testing.T) {
MilliCPU: 100,
Memory: 500,
},
pods: []*api.Pod{basePod},
allocatableResource: &Resource{},
pods: []*api.Pod{basePod},
},
}}

View File

@ -39,6 +39,9 @@ type NodeInfo struct {
requestedResource *Resource
pods []*api.Pod
nonzeroRequest *Resource
// We store allocatedResources (which is Node.Status.Allocatable.*) explicitly
// as int64, to avoid conversions and accessing map.
allocatableResource *Resource
// We store allowedPodNumber (which is Node.Status.Allocatable.Pods().Value())
// explicitly as int, to avoid conversions and improve performance.
allowedPodNumber int
@ -60,10 +63,11 @@ type Resource struct {
// the returned object.
func NewNodeInfo(pods ...*api.Pod) *NodeInfo {
ni := &NodeInfo{
requestedResource: &Resource{},
nonzeroRequest: &Resource{},
allowedPodNumber: 0,
generation: 0,
requestedResource: &Resource{},
nonzeroRequest: &Resource{},
allocatableResource: &Resource{},
allowedPodNumber: 0,
generation: 0,
}
for _, pod := range pods {
ni.addPod(pod)
@ -110,15 +114,24 @@ func (n *NodeInfo) NonZeroRequest() Resource {
return *n.nonzeroRequest
}
// AllocatableResource returns allocatable resources on a given node.
func (n *NodeInfo) AllocatableResource() Resource {
if n == nil {
return emptyResource
}
return *n.allocatableResource
}
func (n *NodeInfo) Clone() *NodeInfo {
pods := append([]*api.Pod(nil), n.pods...)
clone := &NodeInfo{
node: n.node,
requestedResource: &(*n.requestedResource),
nonzeroRequest: &(*n.nonzeroRequest),
allowedPodNumber: n.allowedPodNumber,
pods: pods,
generation: n.generation,
node: n.node,
requestedResource: &(*n.requestedResource),
nonzeroRequest: &(*n.nonzeroRequest),
allocatableResource: &(*n.allocatableResource),
allowedPodNumber: n.allowedPodNumber,
pods: pods,
generation: n.generation,
}
return clone
}
@ -193,6 +206,9 @@ func calculateResource(pod *api.Pod) (cpu int64, mem int64, nvidia_gpu int64, no
// Sets the overall node information.
func (n *NodeInfo) SetNode(node *api.Node) error {
n.node = node
n.allocatableResource.MilliCPU = node.Status.Allocatable.Cpu().MilliValue()
n.allocatableResource.Memory = node.Status.Allocatable.Memory().Value()
n.allocatableResource.NvidiaGPU = node.Status.Allocatable.NvidiaGPU().Value()
n.allowedPodNumber = int(node.Status.Allocatable.Pods().Value())
n.generation++
return nil
@ -205,6 +221,7 @@ func (n *NodeInfo) RemoveNode(node *api.Node) error {
// and thus can potentially be observed later, even though they happened before
// node removal. This is handled correctly in cache.go file.
n.node = nil
n.allocatableResource = &Resource{}
n.allowedPodNumber = 0
n.generation++
return nil