diff --git a/plugin/pkg/scheduler/scheduler.go b/plugin/pkg/scheduler/scheduler.go index bebcc454fb9..0029ec30293 100644 --- a/plugin/pkg/scheduler/scheduler.go +++ b/plugin/pkg/scheduler/scheduler.go @@ -34,6 +34,7 @@ import ( "k8s.io/kubernetes/plugin/pkg/scheduler/schedulercache" "k8s.io/kubernetes/plugin/pkg/scheduler/util" + "fmt" "github.com/golang/glog" ) @@ -158,17 +159,9 @@ func (sched *Scheduler) Config() *Config { return sched.config } -func (sched *Scheduler) scheduleOne() { - pod := sched.config.NextPod() - if pod.DeletionTimestamp != nil { - sched.config.Recorder.Eventf(pod, v1.EventTypeWarning, "FailedScheduling", "skip schedule deleting pod: %v/%v", pod.Namespace, pod.Name) - glog.V(3).Infof("Skip schedule deleting pod: %v/%v", pod.Namespace, pod.Name) - return - } - - glog.V(3).Infof("Attempting to schedule pod: %v/%v", pod.Namespace, pod.Name) - start := time.Now() - dest, err := sched.config.Algorithm.Schedule(pod, sched.config.NodeLister) +// schedule implements the scheduling algorithm and returns the suggested host. +func (sched *Scheduler) schedule(pod *v1.Pod) (string, error) { + host, err := sched.config.Algorithm.Schedule(pod, sched.config.NodeLister) if err != nil { glog.V(1).Infof("Failed to schedule pod: %v/%v", pod.Namespace, pod.Name) sched.config.Error(pod, err) @@ -179,16 +172,19 @@ func (sched *Scheduler) scheduleOne() { Reason: v1.PodReasonUnschedulable, Message: err.Error(), }) - return + return "", err } - metrics.SchedulingAlgorithmLatency.Observe(metrics.SinceInMicroseconds(start)) + return host, err +} +// assume signals to the cache that a pod is already in the cache, so that binding can be asnychronous. +func (sched *Scheduler) assume(pod *v1.Pod, host string) error { // Optimistically assume that the binding will succeed and send it to apiserver // in the background. // If the binding fails, scheduler will release resources allocated to assumed pod // immediately. assumed := *pod - assumed.Spec.NodeName = dest + assumed.Spec.NodeName = host if err := sched.config.SchedulerCache.AssumePod(&assumed); err != nil { glog.Errorf("scheduler cache AssumePod failed: %v", err) // TODO: This means that a given pod is already in cache (which means it @@ -197,51 +193,85 @@ func (sched *Scheduler) scheduleOne() { // fix the problem, but should reduce its impact), we simply return here, // as binding doesn't make sense anyway. // This should be fixed properly though. - return + return err } // Optimistically assume that the binding will succeed, so we need to invalidate affected // predicates in equivalence cache. // If the binding fails, these invalidated item will not break anything. if sched.config.Ecache != nil { - sched.config.Ecache.InvalidateCachedPredicateItemForPodAdd(pod, dest) + sched.config.Ecache.InvalidateCachedPredicateItemForPodAdd(pod, host) + } + return nil +} + +// bind binds a pod to a given node defined in a binding object. We expect this to run asynchronously, so we +// handle binding metrics internally. +func (sched *Scheduler) bind(assumed *v1.Pod, b *v1.Binding) error { + bindingStart := time.Now() + // If binding succeeded then PodScheduled condition will be updated in apiserver so that + // it's atomic with setting host. + err := sched.config.Binder.Bind(b) + if err := sched.config.SchedulerCache.FinishBinding(assumed); err != nil { + return fmt.Errorf("scheduler cache FinishBinding failed: %v", err) + } + if err != nil { + glog.V(1).Infof("Failed to bind pod: %v/%v", assumed.Namespace, assumed.Name) + if err := sched.config.SchedulerCache.ForgetPod(assumed); err != nil { + return fmt.Errorf("scheduler cache ForgetPod failed: %v", err) + } + sched.config.Error(assumed, err) + sched.config.Recorder.Eventf(assumed, v1.EventTypeWarning, "FailedScheduling", "Binding rejected: %v", err) + sched.config.PodConditionUpdater.Update(assumed, &v1.PodCondition{ + Type: v1.PodScheduled, + Status: v1.ConditionFalse, + Reason: "BindingRejected", + }) + return err + } + metrics.BindingLatency.Observe(metrics.SinceInMicroseconds(bindingStart)) + sched.config.Recorder.Eventf(assumed, v1.EventTypeNormal, "Scheduled", "Successfully assigned %v to %v", assumed.Name, b.Target.Name) + return nil +} + +// scheduleOne does the entire scheduling workflow for a single pod. It is serialized on the scheduling algorithm's host fitting. +func (sched *Scheduler) scheduleOne() { + pod := sched.config.NextPod() + if pod.DeletionTimestamp != nil { + sched.config.Recorder.Eventf(pod, v1.EventTypeWarning, "FailedScheduling", "skip schedule deleting pod: %v/%v", pod.Namespace, pod.Name) + glog.V(3).Infof("Skip schedule deleting pod: %v/%v", pod.Namespace, pod.Name) + return } - go func() { - defer func() { - metrics.E2eSchedulingLatency.Observe(metrics.SinceInMicroseconds(start)) - }() + glog.V(3).Infof("Attempting to schedule pod: %v/%v", pod.Namespace, pod.Name) - b := &v1.Binding{ + // Synchronously attempt to find a fit for the pod. + start := time.Now() + suggestedHost, err := sched.schedule(pod) + metrics.SchedulingAlgorithmLatency.Observe(metrics.SinceInMicroseconds(start)) + if err != nil { + return + } + + // Tell the cache to assume that a pod now is running on a given node, even though it hasn't been bound yet. + // This allows us to keep scheduling without waiting on binding to occur. + err = sched.assume(pod, suggestedHost) + if err != nil { + return + } + + // bind the pod to its host asynchronously (we can do this b/c of the assumption step above). + go func() { + err := sched.bind(pod, &v1.Binding{ ObjectMeta: metav1.ObjectMeta{Namespace: pod.Namespace, Name: pod.Name}, Target: v1.ObjectReference{ Kind: "Node", - Name: dest, + Name: suggestedHost, }, - } - - bindingStart := time.Now() - // If binding succeeded then PodScheduled condition will be updated in apiserver so that - // it's atomic with setting host. - err := sched.config.Binder.Bind(b) - if err := sched.config.SchedulerCache.FinishBinding(&assumed); err != nil { - glog.Errorf("scheduler cache FinishBinding failed: %v", err) - } + }) + metrics.E2eSchedulingLatency.Observe(metrics.SinceInMicroseconds(start)) if err != nil { - glog.V(1).Infof("Failed to bind pod: %v/%v", pod.Namespace, pod.Name) - if err := sched.config.SchedulerCache.ForgetPod(&assumed); err != nil { - glog.Errorf("scheduler cache ForgetPod failed: %v", err) - } - sched.config.Error(pod, err) - sched.config.Recorder.Eventf(pod, v1.EventTypeWarning, "FailedScheduling", "Binding rejected: %v", err) - sched.config.PodConditionUpdater.Update(pod, &v1.PodCondition{ - Type: v1.PodScheduled, - Status: v1.ConditionFalse, - Reason: "BindingRejected", - }) - return + glog.Errorf("Internal error binding pod: (%v)", err) } - metrics.BindingLatency.Observe(metrics.SinceInMicroseconds(bindingStart)) - sched.config.Recorder.Eventf(pod, v1.EventTypeNormal, "Scheduled", "Successfully assigned %v to %v", pod.Name, dest) }() }