[PATCH] Use nodename as key

This commit is contained in:
Harry Zhang 2018-03-02 00:34:55 -08:00 committed by Harry Zhang
parent ea5f0b1de2
commit 7a7f9dccd0
7 changed files with 105 additions and 59 deletions

View File

@ -212,8 +212,8 @@ type ExtenderPreemptionArgs struct {
// Pod being scheduled
Pod *v1.Pod
// Victims map generated by scheduler preemption phase
// Only set NodeNameToMetaVictims if ExtenderConfig.NodeCacheCapable == true. Otherwise, only set NodeToVictims.
NodeToVictims map[*v1.Node]*Victims
// Only set NodeNameToMetaVictims if ExtenderConfig.NodeCacheCapable == true. Otherwise, only set NodeNameToVictims.
NodeNameToVictims map[string]*Victims
NodeNameToMetaVictims map[string]*MetaVictims
}

View File

@ -199,9 +199,9 @@ type ExtenderPreemptionArgs struct {
// Pod being scheduled
Pod *apiv1.Pod `json:"pod"`
// Victims map generated by scheduler preemption phase
// Only set NodeNameToMetaVictims if ExtenderConfig.NodeCacheCapable == true. Otherwise, only set NodeToVictims.
NodeToVictims map[*apiv1.Node]*Victims `json:"nodeToVictims,omitempty"`
NodeNameToMetaVictims map[string]*MetaVictims `json:"nodeNameToMetaVictims,omitempty"`
// Only set NodeNameToMetaVictims if ExtenderConfig.NodeCacheCapable == true. Otherwise, only set NodeNameToVictims.
NodeNameToVictims map[string]*Victims `json:"nodeToVictims,omitempty"`
NodeNameToMetaVictims map[string]*MetaVictims `json:"nodeNameToMetaVictims,omitempty"`
}
// Victims represents:

View File

@ -208,11 +208,16 @@ func (in *ExtenderPreemptionArgs) DeepCopyInto(out *ExtenderPreemptionArgs) {
(*in).DeepCopyInto(*out)
}
}
if in.NodeToVictims != nil {
in, out := &in.NodeToVictims, &out.NodeToVictims
*out = make(map[*core_v1.Node]*Victims, len(*in))
for range *in {
// FIXME: Copying unassignable keys unsupported *core_v1.Node
if in.NodeNameToVictims != nil {
in, out := &in.NodeNameToVictims, &out.NodeNameToVictims
*out = make(map[string]*Victims, len(*in))
for key, val := range *in {
if val == nil {
(*out)[key] = nil
} else {
(*out)[key] = new(Victims)
val.DeepCopyInto((*out)[key])
}
}
}
if in.NodeNameToMetaVictims != nil {

View File

@ -208,11 +208,16 @@ func (in *ExtenderPreemptionArgs) DeepCopyInto(out *ExtenderPreemptionArgs) {
(*in).DeepCopyInto(*out)
}
}
if in.NodeToVictims != nil {
in, out := &in.NodeToVictims, &out.NodeToVictims
*out = make(map[*v1.Node]*Victims, len(*in))
for range *in {
// FIXME: Copying unassignable keys unsupported *v1.Node
if in.NodeNameToVictims != nil {
in, out := &in.NodeNameToVictims, &out.NodeNameToVictims
*out = make(map[string]*Victims, len(*in))
for key, val := range *in {
if val == nil {
(*out)[key] = nil
} else {
(*out)[key] = new(Victims)
val.DeepCopyInto((*out)[key])
}
}
}
if in.NodeNameToMetaVictims != nil {

View File

@ -128,15 +128,16 @@ func (h *HTTPExtender) ProcessPreemption(
if h.nodeCacheCapable {
// If extender has cached node info, pass NodeNameToMetaVictims in args.
nodeNameToVictims := convertToNodeNameToMetaVictims(nodeToVictims)
nodeNameToMetaVictims := convertToNodeNameToMetaVictims(nodeToVictims)
args = &schedulerapi.ExtenderPreemptionArgs{
Pod: pod,
NodeNameToMetaVictims: nodeNameToVictims,
NodeNameToMetaVictims: nodeNameToMetaVictims,
}
} else {
nodeNameToVictims := convertToNodeNameToVictims(nodeToVictims)
args = &schedulerapi.ExtenderPreemptionArgs{
Pod: pod,
NodeToVictims: nodeToVictims,
Pod: pod,
NodeNameToVictims: nodeNameToVictims,
}
}
@ -151,10 +152,10 @@ func (h *HTTPExtender) ProcessPreemption(
return nil, err
}
return nodeToVictims, nil
}
// convertToNodeToVictims converts from meta types to struct type.
// convertToNodeToVictims converts "nodeNameToMetaVictims" from object identifiers,
// such as UIDs and names, to object pointers.
func (h *HTTPExtender) convertToNodeToVictims(
nodeNameToMetaVictims map[string]*schedulerapi.MetaVictims,
nodeNameToInfo map[string]*schedulercache.NodeInfo,
@ -165,7 +166,7 @@ func (h *HTTPExtender) convertToNodeToVictims(
Pods: []*v1.Pod{},
}
for _, metaPod := range metaVictims.Pods {
pod, err := h.restorePodFromNodeInfo(metaPod, nodeName, nodeNameToInfo)
pod, err := h.convertPodUIDToPod(metaPod, nodeName, nodeNameToInfo)
if err != nil {
return nil, err
}
@ -176,11 +177,11 @@ func (h *HTTPExtender) convertToNodeToVictims(
return nodeToVictims, nil
}
// restorePodFromNodeInfo returns v1.Pod object for given MetaPod and node name.
// convertPodUIDToPod returns v1.Pod object for given MetaPod and node name.
// The v1.Pod object is restored by nodeInfo.Pods().
// It should return error if there's cache inconsistency between default scheduler and extender
// so that this pod or node is missing from nodeNameToInfo.
func (h *HTTPExtender) restorePodFromNodeInfo(
func (h *HTTPExtender) convertPodUIDToPod(
metaPod *schedulerapi.MetaPod,
nodeName string,
nodeNameToInfo map[string]*schedulercache.NodeInfo) (*v1.Pod, error) {
@ -219,6 +220,17 @@ func convertToNodeNameToMetaVictims(
return nodeNameToVictims
}
// convertToNodeNameToVictims converts from node type to node name as key.
func convertToNodeNameToVictims(
nodeToVictims map[*v1.Node]*schedulerapi.Victims,
) map[string]*schedulerapi.Victims {
nodeNameToVictims := map[string]*schedulerapi.Victims{}
for node, victims := range nodeToVictims {
nodeNameToVictims[node.GetName()] = victims
}
return nodeNameToVictims
}
// Filter based on extender implemented predicate functions. The filtered list is
// expected to be a subset of the supplied list. failedNodesMap optionally contains
// the list of failed nodes and failure reasons.

View File

@ -119,6 +119,42 @@ type FakeExtender struct {
cachedPDBs []*policy.PodDisruptionBudget
}
func (f *FakeExtender) SupportsPreemption() bool {
// Assume preempt verb is always defined.
return true
}
func (f *FakeExtender) ProcessPreemption(
pod *v1.Pod,
nodeToVictims map[*v1.Node]*schedulerapi.Victims,
nodeNameToInfo map[string]*schedulercache.NodeInfo,
) (map[*v1.Node]*schedulerapi.Victims, error) {
nodeToVictimsCopy := map[*v1.Node]*schedulerapi.Victims{}
// We don't want to change the original nodeToVictims
for k, v := range nodeToVictims {
// In real world implementation, extender's user should have his own way to get node object
// by name if needed (e.g. query kube-apiserver etc).
//
// For test purpose, we just use node from parameters directly.
nodeToVictimsCopy[k] = v
}
for node, victims := range nodeToVictimsCopy {
// Try to do preemption on extender side.
extenderVictimPods, extendernPDBViolations, fits := f.selectVictimsOnNodeByExtender(pod, node, nodeNameToInfo)
// If it's unfit after extender's preemption, this node is unresolvable by preemption overall,
// let's remove it from potential preemption nodes.
if !fits {
delete(nodeToVictimsCopy, node)
} else {
// Append new victims to original victims
nodeToVictimsCopy[node].Pods = append(victims.Pods, extenderVictimPods...)
nodeToVictimsCopy[node].NumPDBViolations = victims.NumPDBViolations + extendernPDBViolations
}
}
return nodeToVictimsCopy, nil
}
// selectVictimsOnNodeByExtender checks the given nodes->pods map with predicates on extender's side.
// Returns:
// 1. More victim pods (if any) amended by preemption phase of extender.
@ -127,6 +163,7 @@ type FakeExtender struct {
func (f *FakeExtender) selectVictimsOnNodeByExtender(
pod *v1.Pod,
node *v1.Node,
nodeNameToInfo map[string]*schedulercache.NodeInfo,
) ([]*v1.Pod, int, bool) {
// TODO(harry): add more test in generic_scheduler_test.go to verify this logic.
// If a extender support preemption but have no cached node info, let's run filter to make sure
@ -139,7 +176,7 @@ func (f *FakeExtender) selectVictimsOnNodeByExtender(
}
// Otherwise, as a extender support preemption and have cached node info, we will assume cachedNodeNameToInfo is available
// and get cached node info by given nodeName.
// and get cached node info by given node name.
nodeInfoCopy := f.cachedNodeNameToInfo[node.GetName()].Clone()
potentialVictims := util.SortableList{CompFunc: util.HigherPriorityPod}
@ -191,38 +228,6 @@ func (f *FakeExtender) selectVictimsOnNodeByExtender(
return victims, numViolatingVictim, true
}
func (f *FakeExtender) SupportsPreemption() bool {
// Assume preempt verb is always defined.
return true
}
func (f *FakeExtender) ProcessPreemption(
pod *v1.Pod,
nodeToVictims map[*v1.Node]*schedulerapi.Victims,
nodeNameToInfo map[string]*schedulercache.NodeInfo,
) (map[*v1.Node]*schedulerapi.Victims, error) {
nodeToVictimsCopy := map[*v1.Node]*schedulerapi.Victims{}
// We don't want to change the original nodeToVictims
for k, v := range nodeToVictims {
nodeToVictimsCopy[k] = v
}
for node, victims := range nodeToVictimsCopy {
// Try to do preemption on extender side.
extenderVictimPods, extendernPDBViolations, fits := f.selectVictimsOnNodeByExtender(pod, node)
// If it's unfit after extender's preemption, this node is unresolvable by preemption overall,
// let's remove it from potential preemption nodes.
if !fits {
delete(nodeToVictimsCopy, node)
} else {
// Append new victims to original victims
nodeToVictimsCopy[node].Pods = append(victims.Pods, extenderVictimPods...)
nodeToVictimsCopy[node].NumPDBViolations = victims.NumPDBViolations + extendernPDBViolations
}
}
return nodeToVictimsCopy, nil
}
// runPredicate run predicates of extender one by one for given pod and node.
// Returns: fits or not.
func (f *FakeExtender) runPredicate(pod *v1.Pod, node *v1.Node) (bool, error) {

View File

@ -541,11 +541,30 @@ type fakeExtender struct {
interestedPodName string
}
func (f *fakeExtender) Filter(pod *v1.Pod, nodes []*v1.Node, nodeNameToInfo map[string]*schedulercache.NodeInfo) (filteredNodes []*v1.Node, failedNodesMap schedulerapi.FailedNodesMap, err error) {
func (f *fakeExtender) ProcessPreemption(
pod *v1.Pod,
nodeToVictims map[*v1.Node]*schedulerapi.Victims,
nodeNameToInfo map[string]*schedulercache.NodeInfo,
) (map[*v1.Node]*schedulerapi.Victims, error) {
return nil, nil
}
func (f *fakeExtender) SupportsPreemption() bool {
return false
}
func (f *fakeExtender) Filter(
pod *v1.Pod,
nodes []*v1.Node,
nodeNameToInfo map[string]*schedulercache.NodeInfo,
) (filteredNodes []*v1.Node, failedNodesMap schedulerapi.FailedNodesMap, err error) {
return nil, nil, nil
}
func (f *fakeExtender) Prioritize(pod *v1.Pod, nodes []*v1.Node) (hostPriorities *schedulerapi.HostPriorityList, weight int, err error) {
func (f *fakeExtender) Prioritize(
pod *v1.Pod,
nodes []*v1.Node,
) (hostPriorities *schedulerapi.HostPriorityList, weight int, err error) {
return nil, 0, nil
}