diff --git a/pkg/registry/etcd/etcd.go b/pkg/registry/etcd/etcd.go index 1f54041c8c2..477c850b20a 100644 --- a/pkg/registry/etcd/etcd.go +++ b/pkg/registry/etcd/etcd.go @@ -99,19 +99,15 @@ func makeContainerKey(machine string) string { return "/registry/hosts/" + machine + "/kubelet" } -// CreatePod creates a pod based on a specification, schedule it onto a specific machine. -func (r *Registry) CreatePod(machine string, pod api.Pod) error { +// CreatePod creates a pod based on a specification. +func (r *Registry) CreatePod(pod api.Pod) error { // Set current status to "Waiting". pod.CurrentState.Status = api.PodWaiting pod.CurrentState.Host = "" // DesiredState.Host == "" is a signal to the scheduler that this pod needs scheduling. pod.DesiredState.Status = api.PodRunning pod.DesiredState.Host = "" - if err := r.CreateObj(makePodKey(pod.ID), &pod); err != nil { - return err - } - // TODO: Until scheduler separation is completed, just assign here. - return r.assignPod(pod.ID, machine) + return r.CreateObj(makePodKey(pod.ID), &pod) } // ApplyBinding implements binding's registry @@ -119,23 +115,29 @@ func (r *Registry) ApplyBinding(binding *api.Binding) error { return r.assignPod(binding.PodID, binding.Host) } -// assignPod assigns the given pod to the given machine. -// TODO: hook this up via apiserver, not by calling it from CreatePod(). -func (r *Registry) assignPod(podID string, machine string) error { +// setPodHostTo sets the given pod's host to 'machine' iff it was previously 'oldMachine'. +// Returns the current state of the pod, or an error. +func (r *Registry) setPodHostTo(podID, oldMachine, machine string) (finalPod *api.Pod, err error) { podKey := makePodKey(podID) - var finalPod *api.Pod - err := r.AtomicUpdate(podKey, &api.Pod{}, func(obj interface{}) (interface{}, error) { + err = r.AtomicUpdate(podKey, &api.Pod{}, func(obj interface{}) (interface{}, error) { pod, ok := obj.(*api.Pod) if !ok { return nil, fmt.Errorf("unexpected object: %#v", obj) } - if pod.DesiredState.Host != "" { + if pod.DesiredState.Host != oldMachine { return nil, fmt.Errorf("pod %v is already assigned to host %v", pod.ID, pod.DesiredState.Host) } pod.DesiredState.Host = machine finalPod = pod return pod, nil }) + return finalPod, err +} + +// assignPod assigns the given pod to the given machine. +// TODO: hook this up via apiserver, not by calling it from CreatePod(). +func (r *Registry) assignPod(podID string, machine string) error { + finalPod, err := r.setPodHostTo(podID, "", machine) if err != nil { return err } @@ -151,11 +153,10 @@ func (r *Registry) assignPod(podID string, machine string) error { return manifests, nil }) if err != nil { - // Don't strand stuff. This is a terrible hack that won't be needed - // when the above TODO is fixed. - err2 := r.Delete(podKey, false) - if err2 != nil { - glog.Errorf("Probably stranding a pod, couldn't delete %v: %#v", podKey, err2) + // Put the pod's host back the way it was. This is a terrible hack that + // won't be needed if we convert this to a rectification loop. + if _, err2 := r.setPodHostTo(podID, machine, ""); err2 != nil { + glog.Errorf("Stranding pod %v; couldn't clear host after previous error: %v", podID, err2) } } return err diff --git a/pkg/registry/etcd/etcd_test.go b/pkg/registry/etcd/etcd_test.go index 0e10c226315..1bf44bd501e 100644 --- a/pkg/registry/etcd/etcd_test.go +++ b/pkg/registry/etcd/etcd_test.go @@ -78,7 +78,7 @@ func TestEtcdCreatePod(t *testing.T) { } fakeClient.Set("/registry/hosts/machine/kubelet", api.EncodeOrDie(&api.ContainerManifestList{}), 0) registry := NewTestEtcdRegistry(fakeClient, []string{"machine"}) - err := registry.CreatePod("machine", api.Pod{ + err := registry.CreatePod(api.Pod{ JSONBase: api.JSONBase{ ID: "foo", }, @@ -93,7 +93,13 @@ func TestEtcdCreatePod(t *testing.T) { }, }) if err != nil { - t.Errorf("unexpected error: %v", err) + t.Fatalf("unexpected error: %v", err) + } + + // Suddenly, a wild scheduler appears: + err = registry.ApplyBinding(&api.Binding{PodID: "foo", Host: "machine"}) + if err != nil { + t.Fatalf("unexpected error: %v", err) } resp, err := fakeClient.Get("/registry/pods/foo", false, false) @@ -132,7 +138,7 @@ func TestEtcdCreatePodAlreadyExisting(t *testing.T) { E: nil, } registry := NewTestEtcdRegistry(fakeClient, []string{"machine"}) - err := registry.CreatePod("machine", api.Pod{ + err := registry.CreatePod(api.Pod{ JSONBase: api.JSONBase{ ID: "foo", }, @@ -158,20 +164,27 @@ func TestEtcdCreatePodWithContainersError(t *testing.T) { E: tools.EtcdErrorValueRequired, } registry := NewTestEtcdRegistry(fakeClient, []string{"machine"}) - err := registry.CreatePod("machine", api.Pod{ + err := registry.CreatePod(api.Pod{ JSONBase: api.JSONBase{ ID: "foo", }, }) - if err == nil { - t.Fatalf("Unexpected non-error") + if err != nil { + t.Fatalf("Unexpected error: %v", err) } - _, err = fakeClient.Get("/registry/pods/foo", false, false) + + // Suddenly, a wild scheduler appears: + err = registry.ApplyBinding(&api.Binding{PodID: "foo", Host: "machine"}) if err == nil { - t.Error("Unexpected non-error") + t.Fatalf("Unexpected non error.") } - if !tools.IsEtcdNotFound(err) { - t.Errorf("Unexpected error: %#v", err) + + existingPod, err := registry.GetPod("foo") + if err != nil { + t.Fatalf("Unexpected error: %v", err) + } + if existingPod.DesiredState.Host == "machine" { + t.Fatal("Pod's host changed in response to an unappliable binding.") } } @@ -191,7 +204,7 @@ func TestEtcdCreatePodWithContainersNotFound(t *testing.T) { E: tools.EtcdErrorNotFound, } registry := NewTestEtcdRegistry(fakeClient, []string{"machine"}) - err := registry.CreatePod("machine", api.Pod{ + err := registry.CreatePod(api.Pod{ JSONBase: api.JSONBase{ ID: "foo", }, @@ -210,6 +223,12 @@ func TestEtcdCreatePodWithContainersNotFound(t *testing.T) { t.Fatalf("unexpected error: %v", err) } + // Suddenly, a wild scheduler appears: + err = registry.ApplyBinding(&api.Binding{PodID: "foo", Host: "machine"}) + if err != nil { + t.Fatalf("unexpected error: %v", err) + } + resp, err := fakeClient.Get("/registry/pods/foo", false, false) if err != nil { t.Fatalf("Unexpected error %v", err) @@ -250,7 +269,7 @@ func TestEtcdCreatePodWithExistingContainers(t *testing.T) { }, }), 0) registry := NewTestEtcdRegistry(fakeClient, []string{"machine"}) - err := registry.CreatePod("machine", api.Pod{ + err := registry.CreatePod(api.Pod{ JSONBase: api.JSONBase{ ID: "foo", }, @@ -266,7 +285,13 @@ func TestEtcdCreatePodWithExistingContainers(t *testing.T) { }, }) if err != nil { - t.Errorf("unexpected error: %v", err) + t.Fatalf("unexpected error: %v", err) + } + + // Suddenly, a wild scheduler appears: + err = registry.ApplyBinding(&api.Binding{PodID: "foo", Host: "machine"}) + if err != nil { + t.Fatalf("unexpected error: %v", err) } resp, err := fakeClient.Get("/registry/pods/foo", false, false) diff --git a/pkg/registry/pod/registry.go b/pkg/registry/pod/registry.go index 0a2f0822be7..d65f82a080b 100644 --- a/pkg/registry/pod/registry.go +++ b/pkg/registry/pod/registry.go @@ -30,8 +30,8 @@ type Registry interface { WatchPods(resourceVersion uint64) (watch.Interface, error) // Get a specific pod GetPod(podID string) (*api.Pod, error) - // Create a pod based on a specification, schedule it onto a specific machine. - CreatePod(machine string, pod api.Pod) error + // Create a pod based on a specification. + CreatePod(pod api.Pod) error // Update an existing pod UpdatePod(pod api.Pod) error // Delete an existing pod diff --git a/pkg/registry/pod/storage.go b/pkg/registry/pod/storage.go index 0168ae42e32..47ac6fd0361 100644 --- a/pkg/registry/pod/storage.go +++ b/pkg/registry/pod/storage.go @@ -243,12 +243,7 @@ func getPodStatus(pod *api.Pod) api.PodStatus { func (rs *RegistryStorage) scheduleAndCreatePod(pod api.Pod) error { rs.mu.Lock() defer rs.mu.Unlock() - // TODO(lavalamp): Separate scheduler more cleanly. - machine, err := rs.scheduler.Schedule(pod, rs.minionLister) - if err != nil { - return err - } - return rs.registry.CreatePod(machine, pod) + return rs.registry.CreatePod(pod) } func (rs *RegistryStorage) waitForPodRunning(pod api.Pod) (interface{}, error) { diff --git a/pkg/registry/pod/storage_test.go b/pkg/registry/pod/storage_test.go index 8c4e81f72de..7134cec716f 100644 --- a/pkg/registry/pod/storage_test.go +++ b/pkg/registry/pod/storage_test.go @@ -74,26 +74,6 @@ func TestCreatePodRegistryError(t *testing.T) { expectApiStatusError(t, ch, podRegistry.Err.Error()) } -func TestCreatePodSchedulerError(t *testing.T) { - mockScheduler := registrytest.Scheduler{ - Err: fmt.Errorf("test error"), - } - storage := RegistryStorage{ - scheduler: &mockScheduler, - } - desiredState := api.PodState{ - Manifest: api.ContainerManifest{ - Version: "v1beta1", - }, - } - pod := &api.Pod{DesiredState: desiredState} - ch, err := storage.Create(pod) - if err != nil { - t.Errorf("Expected %#v, Got %#v", nil, err) - } - expectApiStatusError(t, ch, mockScheduler.Err.Error()) -} - func TestCreatePodSetsIds(t *testing.T) { podRegistry := registrytest.NewPodRegistry(nil) podRegistry.Err = fmt.Errorf("test error") diff --git a/pkg/registry/registrytest/pod.go b/pkg/registry/registrytest/pod.go index f702e8743a8..6efaaf7fa87 100644 --- a/pkg/registry/registrytest/pod.go +++ b/pkg/registry/registrytest/pod.go @@ -25,10 +25,9 @@ import ( ) type PodRegistry struct { - Err error - Machine string - Pod *api.Pod - Pods []api.Pod + Err error + Pod *api.Pod + Pods []api.Pod sync.Mutex mux *watch.Mux @@ -66,10 +65,9 @@ func (r *PodRegistry) GetPod(podId string) (*api.Pod, error) { return r.Pod, r.Err } -func (r *PodRegistry) CreatePod(machine string, pod api.Pod) error { +func (r *PodRegistry) CreatePod(pod api.Pod) error { r.Lock() defer r.Unlock() - r.Machine = machine r.Pod = &pod r.mux.Action(watch.Added, &pod) return r.Err