Support snapshotting a scheduler cache

Towards #60860
This commit is contained in:
Yongkun Anfernee Gui 2018-03-07 15:12:53 -08:00
parent 07c04c2a17
commit 73a9836d69
4 changed files with 81 additions and 0 deletions

View File

@ -80,6 +80,33 @@ func newSchedulerCache(ttl, period time.Duration, stop <-chan struct{}) *schedul
}
}
// Snapshot takes a snapshot of the current schedulerCache.
func (cache *schedulerCache) Snapshot() *Snapshot {
cache.mu.Lock()
defer cache.mu.Unlock()
nodes := make(map[string]*NodeInfo)
for k, v := range cache.nodes {
nodes[k] = v.Clone()
}
assumedPods := make(map[string]bool)
for k, v := range cache.assumedPods {
assumedPods[k] = v
}
pdbs := make(map[string]*policy.PodDisruptionBudget)
for k, v := range cache.pdbs {
pdbs[k] = v.DeepCopy()
}
return &Snapshot{
Nodes: nodes,
AssumedPods: assumedPods,
Pdbs: pdbs,
}
}
func (cache *schedulerCache) UpdateNodeNameToInfoMap(nodeNameToInfo map[string]*NodeInfo) error {
cache.mu.Lock()
defer cache.mu.Unlock()

View File

@ -325,6 +325,46 @@ func TestAddPodWillConfirm(t *testing.T) {
}
}
func TestSnapshot(t *testing.T) {
nodeName := "node"
now := time.Now()
ttl := 10 * time.Second
testPods := []*v1.Pod{
makeBasePod(t, nodeName, "test-1", "100m", "500", "", []v1.ContainerPort{{HostIP: "127.0.0.1", HostPort: 80, Protocol: "TCP"}}),
makeBasePod(t, nodeName, "test-2", "200m", "1Ki", "", []v1.ContainerPort{{HostIP: "127.0.0.1", HostPort: 80, Protocol: "TCP"}}),
}
tests := []struct {
podsToAssume []*v1.Pod
podsToAdd []*v1.Pod
}{{ // two pod were assumed at same time. But first one is called Add() and gets confirmed.
podsToAssume: []*v1.Pod{testPods[0], testPods[1]},
podsToAdd: []*v1.Pod{testPods[0]},
}}
for _, tt := range tests {
cache := newSchedulerCache(ttl, time.Second, nil)
for _, podToAssume := range tt.podsToAssume {
if err := assumeAndFinishBinding(cache, podToAssume, now); err != nil {
t.Fatalf("assumePod failed: %v", err)
}
}
for _, podToAdd := range tt.podsToAdd {
if err := cache.AddPod(podToAdd); err != nil {
t.Fatalf("AddPod failed: %v", err)
}
}
snapshot := cache.Snapshot()
if !reflect.DeepEqual(snapshot.Nodes, cache.nodes) {
t.Fatalf("expect \n%+v; got \n%+v", cache.nodes, snapshot.Nodes)
}
if !reflect.DeepEqual(snapshot.AssumedPods, cache.assumedPods) {
t.Fatalf("expect \n%+v; got \n%+v", cache.assumedPods, snapshot.AssumedPods)
}
}
}
// TestAddPodWillReplaceAssumed tests that a pod being Add()ed will replace any assumed pod.
func TestAddPodWillReplaceAssumed(t *testing.T) {
now := time.Now()

View File

@ -119,4 +119,14 @@ type Cache interface {
// FilteredList returns all cached pods that pass the filter.
FilteredList(filter PodFilter, selector labels.Selector) ([]*v1.Pod, error)
// Snapshot takes a snapshot on current cache
Snapshot() *Snapshot
}
// Snapshot is a snapshot of cache state
type Snapshot struct {
AssumedPods map[string]bool
Nodes map[string]*NodeInfo
Pdbs map[string]*policy.PodDisruptionBudget
}

View File

@ -100,3 +100,7 @@ func (f *FakeCache) List(s labels.Selector) ([]*v1.Pod, error) { return nil, nil
func (f *FakeCache) FilteredList(filter schedulercache.PodFilter, selector labels.Selector) ([]*v1.Pod, error) {
return nil, nil
}
func (f *FakeCache) Snapshot() *schedulercache.Snapshot {
return &schedulercache.Snapshot{}
}