Add RunPredicate to EquivalenceCache.

This method combines "lookup" and "update" into one operation. The
benefit is that this method call is very similar to running an ordinary
predicate, so callers can simplify their code.
This commit is contained in:
Jonathan Basseri 2018-04-23 15:02:39 -07:00
parent 625bce3ff6
commit ca6b312c97
2 changed files with 175 additions and 0 deletions

View File

@ -24,6 +24,7 @@ import (
"k8s.io/apimachinery/pkg/util/sets"
"k8s.io/kubernetes/pkg/scheduler/algorithm"
"k8s.io/kubernetes/pkg/scheduler/algorithm/predicates"
"k8s.io/kubernetes/pkg/scheduler/schedulercache"
hashutil "k8s.io/kubernetes/pkg/util/hash"
"github.com/golang/glog"
@ -70,6 +71,36 @@ func NewEquivalenceCache() *EquivalenceCache {
}
}
// RunPredicate will return a cached predicate result. In case of a cache miss, the predicate will
// be run and its results cached for the next call.
//
// NOTE: RunPredicate will not update the equivalence cache if the given NodeInfo is stale.
func (ec *EquivalenceCache) RunPredicate(
pred algorithm.FitPredicate,
predicateKey string,
pod *v1.Pod,
meta algorithm.PredicateMetadata,
nodeInfo *schedulercache.NodeInfo,
equivClassInfo *equivalenceClassInfo,
cache schedulercache.Cache,
) (bool, []algorithm.PredicateFailureReason, error) {
ec.Lock()
defer ec.Unlock()
fit, reasons, invalid := ec.PredicateWithECache(pod.GetName(), nodeInfo.Node().GetName(), predicateKey, equivClassInfo.hash, false)
if !invalid {
return fit, reasons, nil
}
fit, reasons, err := pred(pod, meta, nodeInfo)
if err != nil {
return fit, reasons, err
}
// Skip update if NodeInfo is stale.
if cache != nil && cache.IsUpToDate(nodeInfo) {
ec.UpdateCachedPredicateItem(pod.GetName(), nodeInfo.Node().GetName(), predicateKey, fit, reasons, equivClassInfo.hash, false)
}
return fit, reasons, nil
}
// UpdateCachedPredicateItem updates pod predicate for equivalence class
func (ec *EquivalenceCache) UpdateCachedPredicateItem(
podName, nodeName, predicateKey string,

View File

@ -17,6 +17,7 @@ limitations under the License.
package core
import (
"errors"
"reflect"
"sync"
"testing"
@ -157,6 +158,149 @@ type predicateItemType struct {
reasons []algorithm.PredicateFailureReason
}
// upToDateCache is a fake Cache where IsUpToDate always returns true.
type upToDateCache = schedulertesting.FakeCache
// staleNodeCache is a fake Cache where IsUpToDate always returns false.
type staleNodeCache struct {
schedulertesting.FakeCache
}
func (c *staleNodeCache) IsUpToDate(*schedulercache.NodeInfo) bool { return false }
// mockPredicate provides an algorithm.FitPredicate with pre-set return values.
type mockPredicate struct {
fit bool
reasons []algorithm.PredicateFailureReason
err error
callCount int
}
func (p *mockPredicate) predicate(*v1.Pod, algorithm.PredicateMetadata, *schedulercache.NodeInfo) (bool, []algorithm.PredicateFailureReason, error) {
p.callCount++
return p.fit, p.reasons, p.err
}
func TestRunPredicate(t *testing.T) {
tests := []struct {
name string
pred mockPredicate
cache schedulercache.Cache
expectFit, expectCacheHit, expectCacheWrite bool
expectedReasons []algorithm.PredicateFailureReason
expectedError string
}{
{
name: "pod fits/cache hit",
pred: mockPredicate{},
cache: &upToDateCache{},
expectFit: true,
expectCacheHit: true,
expectCacheWrite: false,
},
{
name: "pod fits/cache miss",
pred: mockPredicate{fit: true},
cache: &upToDateCache{},
expectFit: true,
expectCacheHit: false,
expectCacheWrite: true,
},
{
name: "pod fits/cache miss/no write",
pred: mockPredicate{fit: true},
cache: &staleNodeCache{},
expectFit: true,
expectCacheHit: false,
expectCacheWrite: false,
},
{
name: "pod doesn't fit/cache miss",
pred: mockPredicate{reasons: []algorithm.PredicateFailureReason{predicates.ErrFakePredicate}},
cache: &upToDateCache{},
expectFit: false,
expectCacheHit: false,
expectCacheWrite: true,
expectedReasons: []algorithm.PredicateFailureReason{predicates.ErrFakePredicate},
},
{
name: "pod doesn't fit/cache hit",
pred: mockPredicate{},
cache: &upToDateCache{},
expectFit: false,
expectCacheHit: true,
expectCacheWrite: false,
expectedReasons: []algorithm.PredicateFailureReason{predicates.ErrFakePredicate},
},
{
name: "predicate error",
pred: mockPredicate{err: errors.New("This is expected")},
cache: &upToDateCache{},
expectFit: false,
expectCacheHit: false,
expectCacheWrite: false,
expectedError: "This is expected",
},
}
for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
node := schedulercache.NewNodeInfo()
node.SetNode(&v1.Node{ObjectMeta: metav1.ObjectMeta{Name: "n1"}})
pod := &v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "p1"}}
meta := algorithm.EmptyPredicateMetadataProducer(nil, nil)
ecache := NewEquivalenceCache()
equivClass := ecache.getEquivalenceClassInfo(pod)
if test.expectCacheHit {
ecache.UpdateCachedPredicateItem(pod.Name, node.Node().Name, "testPredicate", test.expectFit, test.expectedReasons, equivClass.hash)
}
fit, reasons, err := ecache.RunPredicate(test.pred.predicate, "testPredicate", pod, meta, node, equivClass, test.cache)
if err != nil {
if err.Error() != test.expectedError {
t.Errorf("Expected error %v but got %v", test.expectedError, err)
}
} else if len(test.expectedError) > 0 {
t.Errorf("Expected error %v but got nil", test.expectedError)
}
if fit && !test.expectFit {
t.Errorf("pod should not fit")
}
if !fit && test.expectFit {
t.Errorf("pod should fit")
}
if len(reasons) != len(test.expectedReasons) {
t.Errorf("Expected failures: %v but got %v", test.expectedReasons, reasons)
} else {
for i, reason := range reasons {
if reason != test.expectedReasons[i] {
t.Errorf("Expected failures: %v but got %v", test.expectedReasons, reasons)
break
}
}
}
if test.expectCacheHit && test.pred.callCount != 0 {
t.Errorf("Predicate should not be called")
}
if !test.expectCacheHit && test.pred.callCount == 0 {
t.Errorf("Predicate should be called")
}
_, _, invalid := ecache.PredicateWithECache(pod.Name, node.Node().Name, "testPredicate", equivClass.hash)
if invalid && test.expectCacheWrite {
t.Errorf("Cache write should happen")
}
if !test.expectCacheHit && test.expectCacheWrite && invalid {
t.Errorf("Cache write should happen")
}
if !test.expectCacheHit && !test.expectCacheWrite && !invalid {
t.Errorf("Cache write should not happen")
}
})
}
}
func TestUpdateCachedPredicateItem(t *testing.T) {
tests := []struct {
name string