Move service affinity predicate logic to its plugin.

This commit is contained in:
Abdullah Gharaibeh
2019-12-13 12:46:29 -05:00
parent ded2ff39c3
commit 7331ec7b02
10 changed files with 410 additions and 592 deletions

View File

@@ -25,7 +25,6 @@ import (
apps "k8s.io/api/apps/v1"
v1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/kubernetes/pkg/scheduler/algorithm/predicates"
"k8s.io/kubernetes/pkg/scheduler/algorithm/priorities"
"k8s.io/kubernetes/pkg/scheduler/framework/plugins/migration"
framework "k8s.io/kubernetes/pkg/scheduler/framework/v1alpha1"
@@ -165,18 +164,18 @@ func TestServiceAffinity(t *testing.T) {
nodes := []*v1.Node{&node1, &node2, &node3, &node4, &node5}
snapshot := nodeinfosnapshot.NewSnapshot(nodeinfosnapshot.CreateNodeInfoMap(test.pods, nodes))
predicate, precompute := predicates.NewServiceAffinityPredicate(snapshot.NodeInfos(), snapshot.Pods(), fakelisters.ServiceLister(test.services), test.labels)
predicates.RegisterPredicateMetadataProducer("ServiceAffinityMetaProducer", precompute)
p := &ServiceAffinity{
predicate: predicate,
sharedLister: snapshot,
serviceLister: fakelisters.ServiceLister(test.services),
args: Args{
AffinityLabels: test.labels,
},
}
factory := &predicates.MetadataProducerFactory{}
meta := factory.GetPredicateMetadata(test.pod, snapshot)
state := framework.NewCycleState()
state.Write(migration.PredicatesStateKey, &migration.PredicatesStateData{Reference: meta})
if s := p.PreFilter(context.Background(), state, test.pod); !s.IsSuccess() {
t.Errorf("PreFilter failed: %v", s.Message())
}
status := p.Filter(context.Background(), state, test.pod, snapshot.NodeInfoMap[test.node.Name])
if status.Code() != test.res {
t.Errorf("Status mismatch. got: %v, want: %v", status.Code(), test.res)
@@ -391,12 +390,12 @@ func TestServiceAffinityScore(t *testing.T) {
t.Run(test.name, func(t *testing.T) {
nodes := makeLabeledNodeList(test.nodes)
snapshot := nodeinfosnapshot.NewSnapshot(nodeinfosnapshot.CreateNodeInfoMap(test.pods, nodes))
fh, _ := framework.NewFramework(nil, nil, nil, framework.WithSnapshotSharedLister(snapshot))
serviceLister := fakelisters.ServiceLister(test.services)
priorityMapFunction, priorityReduceFunction := priorities.NewServiceAntiAffinityPriority(snapshot.Pods(), serviceLister, test.labels)
p := &ServiceAffinity{
handle: fh,
sharedLister: snapshot,
serviceLister: serviceLister,
priorityMapFunction: priorityMapFunction,
priorityReduceFunction: priorityReduceFunction,
}
@@ -434,6 +433,160 @@ func TestServiceAffinityScore(t *testing.T) {
}
}
func TestPreFilterStateAddRemovePod(t *testing.T) {
var label1 = map[string]string{
"region": "r1",
"zone": "z11",
}
var label2 = map[string]string{
"region": "r1",
"zone": "z12",
}
var label3 = map[string]string{
"region": "r2",
"zone": "z21",
}
selector1 := map[string]string{"foo": "bar"}
tests := []struct {
name string
pendingPod *v1.Pod
addedPod *v1.Pod
existingPods []*v1.Pod
nodes []*v1.Node
services []*v1.Service
}{
{
name: "no anti-affinity or service affinity exist",
pendingPod: &v1.Pod{
ObjectMeta: metav1.ObjectMeta{Name: "pending", Labels: selector1},
},
existingPods: []*v1.Pod{
{ObjectMeta: metav1.ObjectMeta{Name: "p1", Labels: selector1},
Spec: v1.PodSpec{NodeName: "nodeA"},
},
{ObjectMeta: metav1.ObjectMeta{Name: "p2"},
Spec: v1.PodSpec{NodeName: "nodeC"},
},
},
addedPod: &v1.Pod{
ObjectMeta: metav1.ObjectMeta{Name: "addedPod", Labels: selector1},
Spec: v1.PodSpec{NodeName: "nodeB"},
},
nodes: []*v1.Node{
{ObjectMeta: metav1.ObjectMeta{Name: "nodeA", Labels: label1}},
{ObjectMeta: metav1.ObjectMeta{Name: "nodeB", Labels: label2}},
{ObjectMeta: metav1.ObjectMeta{Name: "nodeC", Labels: label3}},
},
},
{
name: "metadata service-affinity data are updated correctly after adding and removing a pod",
pendingPod: &v1.Pod{
ObjectMeta: metav1.ObjectMeta{Name: "pending", Labels: selector1},
},
existingPods: []*v1.Pod{
{ObjectMeta: metav1.ObjectMeta{Name: "p1", Labels: selector1},
Spec: v1.PodSpec{NodeName: "nodeA"},
},
{ObjectMeta: metav1.ObjectMeta{Name: "p2"},
Spec: v1.PodSpec{NodeName: "nodeC"},
},
},
addedPod: &v1.Pod{
ObjectMeta: metav1.ObjectMeta{Name: "addedPod", Labels: selector1},
Spec: v1.PodSpec{NodeName: "nodeB"},
},
services: []*v1.Service{{Spec: v1.ServiceSpec{Selector: selector1}}},
nodes: []*v1.Node{
{ObjectMeta: metav1.ObjectMeta{Name: "nodeA", Labels: label1}},
{ObjectMeta: metav1.ObjectMeta{Name: "nodeB", Labels: label2}},
{ObjectMeta: metav1.ObjectMeta{Name: "nodeC", Labels: label3}},
},
},
}
for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
// getMeta creates predicate meta data given the list of pods.
getState := func(pods []*v1.Pod) (*ServiceAffinity, *framework.CycleState, *preFilterState, *nodeinfosnapshot.Snapshot) {
snapshot := nodeinfosnapshot.NewSnapshot(nodeinfosnapshot.CreateNodeInfoMap(pods, test.nodes))
p := &ServiceAffinity{
sharedLister: snapshot,
serviceLister: fakelisters.ServiceLister(test.services),
}
cycleState := framework.NewCycleState()
preFilterStatus := p.PreFilter(context.Background(), cycleState, test.pendingPod)
if !preFilterStatus.IsSuccess() {
t.Errorf("prefilter failed with status: %v", preFilterStatus)
}
plState, err := getPreFilterState(cycleState)
if err != nil {
t.Errorf("failed to get metadata from cycleState: %v", err)
}
return p, cycleState, plState, snapshot
}
sortState := func(plState *preFilterState) *preFilterState {
sort.SliceStable(plState.matchingPodList, func(i, j int) bool {
return plState.matchingPodList[i].Name < plState.matchingPodList[j].Name
})
sort.SliceStable(plState.matchingPodServices, func(i, j int) bool {
return plState.matchingPodServices[i].Name < plState.matchingPodServices[j].Name
})
return plState
}
// allPodsState is the state produced when all pods, including test.addedPod are given to prefilter.
_, _, plStateAllPods, _ := getState(append(test.existingPods, test.addedPod))
// state is produced for test.existingPods (without test.addedPod).
ipa, state, plState, snapshot := getState(test.existingPods)
// clone the state so that we can compare it later when performing Remove.
plStateOriginal, _ := plState.Clone().(*preFilterState)
// Add test.addedPod to state1 and verify it is equal to allPodsState.
if err := ipa.AddPod(context.Background(), state, test.pendingPod, test.addedPod, snapshot.NodeInfoMap[test.addedPod.Spec.NodeName]); err != nil {
t.Errorf("error adding pod to preFilterState: %v", err)
}
if !reflect.DeepEqual(sortState(plStateAllPods), sortState(plState)) {
t.Errorf("State is not equal, got: %v, want: %v", plState, plStateAllPods)
}
// Remove the added pod pod and make sure it is equal to the original state.
if err := ipa.RemovePod(context.Background(), state, test.pendingPod, test.addedPod, snapshot.NodeInfoMap[test.addedPod.Spec.NodeName]); err != nil {
t.Errorf("error removing pod from preFilterState: %v", err)
}
if !reflect.DeepEqual(sortState(plStateOriginal), sortState(plState)) {
t.Errorf("State is not equal, got: %v, want: %v", plState, plStateOriginal)
}
})
}
}
func TestPreFilterStateClone(t *testing.T) {
source := &preFilterState{
matchingPodList: []*v1.Pod{
{ObjectMeta: metav1.ObjectMeta{Name: "pod1"}},
{ObjectMeta: metav1.ObjectMeta{Name: "pod2"}},
},
matchingPodServices: []*v1.Service{
{ObjectMeta: metav1.ObjectMeta{Name: "service1"}},
},
}
clone := source.Clone()
if clone == source {
t.Errorf("Clone returned the exact same object!")
}
if !reflect.DeepEqual(clone, source) {
t.Errorf("Copy is not equal to source!")
}
}
func makeLabeledNodeList(nodeMap map[string]map[string]string) []*v1.Node {
nodes := make([]*v1.Node, 0, len(nodeMap))
for nodeName, labels := range nodeMap {