Merge pull request #24377 from hongchaodeng/sched

Automatic merge from submit-queue

schedulercache: remove bind() from AssumePod

Due to #24197, we make bind() asynchronous and don't need it anymore.
Submit this PR to clean it up.
This commit is contained in:
k8s-merge-robot 2016-04-23 02:19:48 -07:00
commit 733832c15a
6 changed files with 17 additions and 35 deletions

View File

@ -106,7 +106,7 @@ func (s *Scheduler) scheduleOne() {
// will self-repair.
assumed := *pod
assumed.Spec.NodeName = dest
s.config.SchedulerCache.AssumePodIfBindSucceed(&assumed, func() bool { return true })
s.config.SchedulerCache.AssumePod(&assumed)
go func() {
defer metrics.E2eSchedulingLatency.Observe(metrics.SinceInMicroseconds(start))

View File

@ -98,19 +98,15 @@ func (cache *schedulerCache) List(selector labels.Selector) ([]*api.Pod, error)
return pods, nil
}
func (cache *schedulerCache) AssumePodIfBindSucceed(pod *api.Pod, bind func() bool) error {
return cache.assumePodIfBindSucceed(pod, bind, time.Now())
func (cache *schedulerCache) AssumePod(pod *api.Pod) error {
return cache.assumePod(pod, time.Now())
}
// assumePodScheduled exists for making test deterministic by taking time as input argument.
func (cache *schedulerCache) assumePodIfBindSucceed(pod *api.Pod, bind func() bool, now time.Time) error {
// assumePod exists for making test deterministic by taking time as input argument.
func (cache *schedulerCache) assumePod(pod *api.Pod, now time.Time) error {
cache.mu.Lock()
defer cache.mu.Unlock()
if !bind() {
return nil
}
key, err := getPodKey(pod)
if err != nil {
return err

View File

@ -87,8 +87,8 @@ func TestAssumePodScheduled(t *testing.T) {
for i, tt := range tests {
cache := newSchedulerCache(time.Second, time.Second, nil)
for _, pod := range tt.pods {
if err := cache.AssumePodIfBindSucceed(pod, alwaysTrue); err != nil {
t.Fatalf("AssumePodScheduled failed: %v", err)
if err := cache.AssumePod(pod); err != nil {
t.Fatalf("AssumePod failed: %v", err)
}
}
n := cache.nodes[nodeName]
@ -147,7 +147,7 @@ func TestExpirePod(t *testing.T) {
cache := newSchedulerCache(ttl, time.Second, nil)
for _, pod := range tt.pods {
if err := cache.assumePodIfBindSucceed(pod.pod, alwaysTrue, pod.assumedTime); err != nil {
if err := cache.assumePod(pod.pod, pod.assumedTime); err != nil {
t.Fatalf("assumePod failed: %v", err)
}
}
@ -195,7 +195,7 @@ func TestAddPodWillConfirm(t *testing.T) {
for i, tt := range tests {
cache := newSchedulerCache(ttl, time.Second, nil)
for _, podToAssume := range tt.podsToAssume {
if err := cache.assumePodIfBindSucceed(podToAssume, alwaysTrue, now); err != nil {
if err := cache.assumePod(podToAssume, now); err != nil {
t.Fatalf("assumePod failed: %v", err)
}
}
@ -240,7 +240,7 @@ func TestAddPodAfterExpiration(t *testing.T) {
now := time.Now()
for i, tt := range tests {
cache := newSchedulerCache(ttl, time.Second, nil)
if err := cache.assumePodIfBindSucceed(tt.pod, alwaysTrue, now); err != nil {
if err := cache.assumePod(tt.pod, now); err != nil {
t.Fatalf("assumePod failed: %v", err)
}
cache.cleanupAssumedPods(now.Add(2 * ttl))
@ -369,7 +369,7 @@ func TestExpireAddUpdatePod(t *testing.T) {
for _, tt := range tests {
cache := newSchedulerCache(ttl, time.Second, nil)
for _, podToAssume := range tt.podsToAssume {
if err := cache.assumePodIfBindSucceed(podToAssume, alwaysTrue, now); err != nil {
if err := cache.assumePod(podToAssume, now); err != nil {
t.Fatalf("assumePod failed: %v", err)
}
}
@ -527,14 +527,10 @@ func setupCacheWithAssumedPods(b *testing.B, podNum int, assumedTime time.Time)
objName := fmt.Sprintf("%s-pod-%d", nodeName, i%10)
pod := makeBasePod(nodeName, objName, "0", "0", nil)
err := cache.assumePodIfBindSucceed(pod, alwaysTrue, assumedTime)
err := cache.assumePod(pod, assumedTime)
if err != nil {
b.Fatalf("assumePodIfBindSucceed failed: %v", err)
b.Fatalf("assumePod failed: %v", err)
}
}
return cache
}
func alwaysTrue() bool {
return true
}

View File

@ -56,14 +56,10 @@ import (
// - Both "Expired" and "Deleted" are valid end states. In case of some problems, e.g. network issue,
// a pod might have changed its state (e.g. added and deleted) without delivering notification to the cache.
type Cache interface {
// AssumePodIfBindSucceed assumes a pod to be scheduled if binding the pod succeeded.
// If binding return true, the pod's information is aggregated into designated node.
// Note that both binding and assuming are done as one atomic operation from cache's view.
// No other events like Add would happen in between binding and assuming.
// We are passing the binding function and let implementation take care of concurrency control details.
// AssumePod assumes a pod scheduled and aggregates the pod's information into its node.
// The implementation also decides the policy to expire pod before being confirmed (receiving Add event).
// After expiration, its information would be subtracted.
AssumePodIfBindSucceed(pod *api.Pod, bind func() bool) error
AssumePod(pod *api.Pod) error
// AddPod either confirms a pod if it's assumed, or adds it back if it's expired.
// If added back, the pod's information would be added again.

View File

@ -27,10 +27,7 @@ type FakeCache struct {
AssumeFunc func(*api.Pod)
}
func (f *FakeCache) AssumePodIfBindSucceed(pod *api.Pod, bind func() bool) error {
if !bind() {
return nil
}
func (f *FakeCache) AssumePod(pod *api.Pod) error {
f.AssumeFunc(pod)
return nil
}

View File

@ -25,10 +25,7 @@ import (
// PodsToCache is used for testing
type PodsToCache []*api.Pod
func (p PodsToCache) AssumePodIfBindSucceed(pod *api.Pod, bind func() bool) error {
if !bind() {
return nil
}
func (p PodsToCache) AssumePod(pod *api.Pod) error {
return nil
}