diff --git a/pkg/scheduler/schedulercache/cache.go b/pkg/scheduler/schedulercache/cache.go index 78cdca25b3e..18545979209 100644 --- a/pkg/scheduler/schedulercache/cache.go +++ b/pkg/scheduler/schedulercache/cache.go @@ -80,6 +80,33 @@ func newSchedulerCache(ttl, period time.Duration, stop <-chan struct{}) *schedul } } +// Snapshot takes a snapshot of the current schedulerCache. +func (cache *schedulerCache) Snapshot() *Snapshot { + cache.mu.Lock() + defer cache.mu.Unlock() + + nodes := make(map[string]*NodeInfo) + for k, v := range cache.nodes { + nodes[k] = v.Clone() + } + + assumedPods := make(map[string]bool) + for k, v := range cache.assumedPods { + assumedPods[k] = v + } + + pdbs := make(map[string]*policy.PodDisruptionBudget) + for k, v := range cache.pdbs { + pdbs[k] = v.DeepCopy() + } + + return &Snapshot{ + Nodes: nodes, + AssumedPods: assumedPods, + Pdbs: pdbs, + } +} + func (cache *schedulerCache) UpdateNodeNameToInfoMap(nodeNameToInfo map[string]*NodeInfo) error { cache.mu.Lock() defer cache.mu.Unlock() diff --git a/pkg/scheduler/schedulercache/cache_test.go b/pkg/scheduler/schedulercache/cache_test.go index 0eb184bdf3b..04677e7b340 100644 --- a/pkg/scheduler/schedulercache/cache_test.go +++ b/pkg/scheduler/schedulercache/cache_test.go @@ -325,6 +325,46 @@ func TestAddPodWillConfirm(t *testing.T) { } } +func TestSnapshot(t *testing.T) { + nodeName := "node" + now := time.Now() + ttl := 10 * time.Second + + testPods := []*v1.Pod{ + makeBasePod(t, nodeName, "test-1", "100m", "500", "", []v1.ContainerPort{{HostIP: "127.0.0.1", HostPort: 80, Protocol: "TCP"}}), + makeBasePod(t, nodeName, "test-2", "200m", "1Ki", "", []v1.ContainerPort{{HostIP: "127.0.0.1", HostPort: 80, Protocol: "TCP"}}), + } + tests := []struct { + podsToAssume []*v1.Pod + podsToAdd []*v1.Pod + }{{ // two pod were assumed at same time. But first one is called Add() and gets confirmed. + podsToAssume: []*v1.Pod{testPods[0], testPods[1]}, + podsToAdd: []*v1.Pod{testPods[0]}, + }} + + for _, tt := range tests { + cache := newSchedulerCache(ttl, time.Second, nil) + for _, podToAssume := range tt.podsToAssume { + if err := assumeAndFinishBinding(cache, podToAssume, now); err != nil { + t.Fatalf("assumePod failed: %v", err) + } + } + for _, podToAdd := range tt.podsToAdd { + if err := cache.AddPod(podToAdd); err != nil { + t.Fatalf("AddPod failed: %v", err) + } + } + + snapshot := cache.Snapshot() + if !reflect.DeepEqual(snapshot.Nodes, cache.nodes) { + t.Fatalf("expect \n%+v; got \n%+v", cache.nodes, snapshot.Nodes) + } + if !reflect.DeepEqual(snapshot.AssumedPods, cache.assumedPods) { + t.Fatalf("expect \n%+v; got \n%+v", cache.assumedPods, snapshot.AssumedPods) + } + } +} + // TestAddPodWillReplaceAssumed tests that a pod being Add()ed will replace any assumed pod. func TestAddPodWillReplaceAssumed(t *testing.T) { now := time.Now() diff --git a/pkg/scheduler/schedulercache/interface.go b/pkg/scheduler/schedulercache/interface.go index d3c65313b83..8e4f4909c75 100644 --- a/pkg/scheduler/schedulercache/interface.go +++ b/pkg/scheduler/schedulercache/interface.go @@ -119,4 +119,14 @@ type Cache interface { // FilteredList returns all cached pods that pass the filter. FilteredList(filter PodFilter, selector labels.Selector) ([]*v1.Pod, error) + + // Snapshot takes a snapshot on current cache + Snapshot() *Snapshot +} + +// Snapshot is a snapshot of cache state +type Snapshot struct { + AssumedPods map[string]bool + Nodes map[string]*NodeInfo + Pdbs map[string]*policy.PodDisruptionBudget } diff --git a/pkg/scheduler/testing/fake_cache.go b/pkg/scheduler/testing/fake_cache.go index 21c4528ae21..6e1143d2e70 100644 --- a/pkg/scheduler/testing/fake_cache.go +++ b/pkg/scheduler/testing/fake_cache.go @@ -100,3 +100,7 @@ func (f *FakeCache) List(s labels.Selector) ([]*v1.Pod, error) { return nil, nil func (f *FakeCache) FilteredList(filter schedulercache.PodFilter, selector labels.Selector) ([]*v1.Pod, error) { return nil, nil } + +func (f *FakeCache) Snapshot() *schedulercache.Snapshot { + return &schedulercache.Snapshot{} +}