diff --git a/cmd/integration/integration.go b/cmd/integration/integration.go index 41511b13180..aa7c6e191fb 100644 --- a/cmd/integration/integration.go +++ b/cmd/integration/integration.go @@ -71,8 +71,8 @@ var ( type fakeKubeletClient struct{} -func (fakeKubeletClient) GetPodStatus(host, podNamespace, podID string) (api.PodStatusResult, error) { - glog.V(3).Infof("Trying to get container info for %v/%v/%v", host, podNamespace, podID) +func (fakeKubeletClient) GetPodStatus(host, podNamespace, podName string) (api.PodStatusResult, error) { + glog.V(3).Infof("Trying to get container info for %v/%v/%v", host, podNamespace, podName) // This is a horrible hack to get around the fact that we can't provide // different port numbers per kubelet... var c client.PodInfoGetter @@ -88,9 +88,9 @@ func (fakeKubeletClient) GetPodStatus(host, podNamespace, podID string) (api.Pod Port: 10251, } default: - glog.Fatalf("Can't get info for: '%v', '%v - %v'", host, podNamespace, podID) + glog.Fatalf("Can't get info for: '%v', '%v - %v'", host, podNamespace, podName) } - r, err := c.GetPodStatus("127.0.0.1", podNamespace, podID) + r, err := c.GetPodStatus("127.0.0.1", podNamespace, podName) if err != nil { return r, err } @@ -304,29 +304,29 @@ func countEndpoints(eps *api.Endpoints) int { return count } -func podExists(c *client.Client, podNamespace string, podID string) wait.ConditionFunc { +func podExists(c *client.Client, podNamespace string, podName string) wait.ConditionFunc { return func() (bool, error) { - _, err := c.Pods(podNamespace).Get(podID) + _, err := c.Pods(podNamespace).Get(podName) return err == nil, nil } } -func podNotFound(c *client.Client, podNamespace string, podID string) wait.ConditionFunc { +func podNotFound(c *client.Client, podNamespace string, podName string) wait.ConditionFunc { return func() (bool, error) { - _, err := c.Pods(podNamespace).Get(podID) + _, err := c.Pods(podNamespace).Get(podName) return apierrors.IsNotFound(err), nil } } -func podRunning(c *client.Client, podNamespace string, podID string) wait.ConditionFunc { +func podRunning(c *client.Client, podNamespace string, podName string) wait.ConditionFunc { return func() (bool, error) { - pod, err := c.Pods(podNamespace).Get(podID) + pod, err := c.Pods(podNamespace).Get(podName) if apierrors.IsNotFound(err) { return false, nil } if err != nil { // This could be a connection error so we want to retry, but log the error. - glog.Errorf("Error when reading pod %q: %v", podID, err) + glog.Errorf("Error when reading pod %q: %v", podName, err) return false, nil } if pod.Status.Phase != api.PodRunning { @@ -814,6 +814,61 @@ func runServiceTest(client *client.Client) { glog.Info("Service test passed.") } +func runSchedulerNoPhantomPodsTest(client *client.Client) { + pod := &api.Pod{ + Spec: api.PodSpec{ + Containers: []api.Container{ + { + Name: "c1", + Image: "kubernetes/pause", + Ports: []api.ContainerPort{ + {ContainerPort: 1234, HostPort: 9999}, + }, + ImagePullPolicy: api.PullIfNotPresent, + }, + }, + }, + } + + // Assuming we only have two kublets, the third pod here won't schedule + // if the scheduler doesn't correctly handle the delete for the second + // pod. + pod.ObjectMeta.Name = "phantom.foo" + foo, err := client.Pods(api.NamespaceDefault).Create(pod) + if err != nil { + glog.Fatalf("Failed to create pod: %v, %v", pod, err) + } + if err := wait.Poll(time.Second, time.Second*10, podRunning(client, foo.Namespace, foo.Name)); err != nil { + glog.Fatalf("FAILED: pod never started running %v", err) + } + + pod.ObjectMeta.Name = "phantom.bar" + bar, err := client.Pods(api.NamespaceDefault).Create(pod) + if err != nil { + glog.Fatalf("Failed to create pod: %v, %v", pod, err) + } + if err := wait.Poll(time.Second, time.Second*10, podRunning(client, bar.Namespace, bar.Name)); err != nil { + glog.Fatalf("FAILED: pod never started running %v", err) + } + + // Delete a pod to free up room. + err = client.Pods(api.NamespaceDefault).Delete(bar.Name) + if err != nil { + glog.Fatalf("FAILED: couldn't delete pod %q: %v", bar.Name, err) + } + + pod.ObjectMeta.Name = "phantom.baz" + baz, err := client.Pods(api.NamespaceDefault).Create(pod) + if err != nil { + glog.Fatalf("Failed to create pod: %v, %v", pod, err) + } + if err := wait.Poll(time.Second, time.Second*10, podRunning(client, baz.Namespace, baz.Name)); err != nil { + glog.Fatalf("FAILED: (Scheduler probably didn't process deletion of 'phantom.bar') Pod never started running: %v", err) + } + + glog.Info("Scheduler doesn't make phantom pods: test passed.") +} + type testFunc func(*client.Client) func addFlags(fs *pflag.FlagSet) { @@ -904,6 +959,11 @@ func main() { glog.Fatalf("Expected 16 containers; got %v\n\nlist of created containers:\n\n%#v\n\nDocker 1 Created:\n\n%#v\n\nDocker 2 Created:\n\n%#v\n\n", len(createdConts), createdConts.List(), fakeDocker1.Created, fakeDocker2.Created) } glog.Infof("OK - found created containers: %#v", createdConts.List()) + + // This test doesn't run with the others because it can't run in + // parallel and also it schedules extra pods which would change the + // above pod counting logic. + runSchedulerNoPhantomPodsTest(kubeClient) } // ServeCachedManifestFile serves a file for kubelet to read. diff --git a/plugin/pkg/scheduler/factory/factory.go b/plugin/pkg/scheduler/factory/factory.go index 4fc10b398ad..30b5dd60f5a 100644 --- a/plugin/pkg/scheduler/factory/factory.go +++ b/plugin/pkg/scheduler/factory/factory.go @@ -103,6 +103,21 @@ func (f *ConfigFactory) CreateFromConfig(policy schedulerapi.Policy) (*scheduler return f.CreateFromKeys(predicateKeys, priorityKeys) } +// ReflectorDeletionHook passes all operations through to Store, but calls +// OnDelete in a goroutine if there is a deletion. +type ReflectorDeletionHook struct { + cache.Store + OnDelete func(obj interface{}) +} + +func (r ReflectorDeletionHook) Delete(obj interface{}) error { + go func() { + defer util.HandleCrash() + r.OnDelete(obj) + }() + return r.Store.Delete(obj) +} + // Creates a scheduler from a set of registered fit predicate keys and priority keys. func (f *ConfigFactory) CreateFromKeys(predicateKeys, priorityKeys util.StringSet) (*scheduler.Config, error) { glog.V(2).Infof("creating scheduler with fit predicates '%v' and priority functions '%v", predicateKeys, priorityKeys) @@ -125,9 +140,22 @@ func (f *ConfigFactory) CreateFromKeys(predicateKeys, priorityKeys util.StringSe // Watch and queue pods that need scheduling. cache.NewReflector(f.createUnassignedPodLW(), &api.Pod{}, f.PodQueue, 0).Run() + // Pass through all events to the scheduled pod store, but on a deletion, + // also remove from the assumed pods. + assumedPodDeleter := ReflectorDeletionHook{ + Store: f.ScheduledPodLister.Store, + OnDelete: func(obj interface{}) { + if pod, ok := obj.(*api.Pod); ok { + f.modeler.LockedAction(func() { + f.modeler.ForgetPod(pod) + }) + } + }, + } + // Watch and cache all running pods. Scheduler needs to find all pods // so it knows where it's safe to place a pod. Cache this locally. - cache.NewReflector(f.createAssignedPodLW(), &api.Pod{}, f.ScheduledPodLister.Store, 0).Run() + cache.NewReflector(f.createAssignedPodLW(), &api.Pod{}, assumedPodDeleter, 0).Run() // Watch minions. // Minions may be listed frequently, so provide a local up-to-date cache. diff --git a/plugin/pkg/scheduler/modeler.go b/plugin/pkg/scheduler/modeler.go index 97e0907475b..7833b5b94b6 100644 --- a/plugin/pkg/scheduler/modeler.go +++ b/plugin/pkg/scheduler/modeler.go @@ -19,6 +19,7 @@ package scheduler import ( "fmt" "strings" + "sync" "github.com/GoogleCloudPlatform/kubernetes/pkg/api" "github.com/GoogleCloudPlatform/kubernetes/pkg/client/cache" @@ -40,9 +41,24 @@ type ExtendedPodLister interface { Exists(pod *api.Pod) (bool, error) } +// actionLocker implements lockedAction (so the fake and SimpleModeler can both +// use it) +type actionLocker struct { + sync.Mutex +} + +// LockedAction serializes calls of whatever is passed as 'do'. +func (a *actionLocker) LockedAction(do func()) { + a.Lock() + defer a.Unlock() + do() +} + // FakeModeler implements the SystemModeler interface. type FakeModeler struct { AssumePodFunc func(pod *api.Pod) + ForgetPodFunc func(pod *api.Pod) + actionLocker } // AssumePod calls the function variable if it is not nil. @@ -52,6 +68,13 @@ func (f *FakeModeler) AssumePod(pod *api.Pod) { } } +// ForgetPod calls the function variable if it is not nil. +func (f *FakeModeler) ForgetPod(pod *api.Pod) { + if f.ForgetPodFunc != nil { + f.ForgetPodFunc(pod) + } +} + // SimpleModeler implements the SystemModeler interface with a timed pod cache. type SimpleModeler struct { queuedPods ExtendedPodLister @@ -61,6 +84,8 @@ type SimpleModeler struct { // haven't yet shown up in the scheduledPods variable. // TODO: periodically clear this. assumedPods *cache.StoreToPodLister + + actionLocker } // NewSimpleModeler returns a new SimpleModeler. @@ -78,6 +103,10 @@ func (s *SimpleModeler) AssumePod(pod *api.Pod) { s.assumedPods.Add(pod) } +func (s *SimpleModeler) ForgetPod(pod *api.Pod) { + s.assumedPods.Delete(pod) +} + // Extract names for readable logging. func podNames(pods []api.Pod) []string { out := make([]string, len(pods)) diff --git a/plugin/pkg/scheduler/scheduler.go b/plugin/pkg/scheduler/scheduler.go index 86b45f4322b..3dcedb8e1d8 100644 --- a/plugin/pkg/scheduler/scheduler.go +++ b/plugin/pkg/scheduler/scheduler.go @@ -44,6 +44,16 @@ type SystemModeler interface { // The assumtion should last until the system confirms the // assumtion or disconfirms it. AssumePod(pod *api.Pod) + // ForgetPod removes a pod assumtion. (It won't make the model + // show the absence of the given pod if the pod is in the scheduled + // pods list!) + ForgetPod(pod *api.Pod) + + // For serializing calls to Assume/ForgetPod: imagine you want to add + // a pod iff a bind succeeds, but also remove a pod if it is deleted. + // TODO: if SystemModeler begins modeling things other than pods, this + // should probably be parameterized or specialized for pods. + LockedAction(f func()) } // Scheduler watches for new unscheduled pods. It attempts to find @@ -104,16 +114,21 @@ func (s *Scheduler) scheduleOne() { Name: dest, }, } - if err := s.config.Binder.Bind(b); err != nil { - glog.V(1).Infof("Failed to bind pod: %v", err) - s.config.Recorder.Eventf(pod, "failedScheduling", "Binding rejected: %v", err) - s.config.Error(pod, err) - return - } - s.config.Recorder.Eventf(pod, "scheduled", "Successfully assigned %v to %v", pod.Name, dest) - // tell the model to assume that this binding took effect. - assumed := *pod - assumed.Spec.Host = dest - assumed.Status.Host = dest - s.config.Modeler.AssumePod(&assumed) + + // We want to add the pod to the model iff the bind succeeds, but we don't want to race + // with any deletions, which happen asyncronously. + s.config.Modeler.LockedAction(func() { + if err := s.config.Binder.Bind(b); err != nil { + glog.V(1).Infof("Failed to bind pod: %v", err) + s.config.Recorder.Eventf(pod, "failedScheduling", "Binding rejected: %v", err) + s.config.Error(pod, err) + return + } + s.config.Recorder.Eventf(pod, "scheduled", "Successfully assigned %v to %v", pod.Name, dest) + // tell the model to assume that this binding took effect. + assumed := *pod + assumed.Spec.Host = dest + assumed.Status.Host = dest + s.config.Modeler.AssumePod(&assumed) + }) }