scheduler: remove err param from Register

This commit is contained in:
Sergiusz Urbaniak 2015-11-10 14:08:50 +01:00
parent 6fced095ce
commit 6a16fe314f
5 changed files with 45 additions and 27 deletions

View File

@ -278,10 +278,17 @@ func (k *kubeScheduler) Schedule(pod *api.Pod, unused algorithm.NodeLister) (str
log.Infof("aborting Schedule, pod has been deleted %+v", pod) log.Infof("aborting Schedule, pod has been deleted %+v", pod)
return "", noSuchPodErr return "", noSuchPodErr
} }
task, err := k.api.tasks().Register(k.api.createPodTask(ctx, pod))
task, err := k.api.createPodTask(ctx, pod)
if err != nil { if err != nil {
return "", err return "", err
} }
task, err = k.api.tasks().Register(task)
if err != nil {
return "", err
}
return k.doSchedule(task) return k.doSchedule(task)
//TODO(jdef) it's possible that the pod state has diverged from what //TODO(jdef) it's possible that the pod state has diverged from what

View File

@ -880,11 +880,17 @@ func TestDeleteOne_PendingPod(t *testing.T) {
UID: "foo0", UID: "foo0",
Namespace: api.NamespaceDefault, Namespace: api.NamespaceDefault,
}}} }}}
_, err := reg.Register(podtask.New(api.NewDefaultContext(), "bar", *pod.Pod, &mesos.ExecutorInfo{}))
task, err := podtask.New(api.NewDefaultContext(), "bar", *pod.Pod, &mesos.ExecutorInfo{})
if err != nil { if err != nil {
t.Fatalf("failed to create task: %v", err) t.Fatalf("failed to create task: %v", err)
} }
_, err = reg.Register(task)
if err != nil {
t.Fatalf("failed to register task: %v", err)
}
// preconditions // preconditions
qr := newQueuer(nil) qr := newQueuer(nil)
qr.podQueue.Add(pod, queue.ReplaceExisting) qr.podQueue.Add(pod, queue.ReplaceExisting)
@ -917,7 +923,13 @@ func TestDeleteOne_Running(t *testing.T) {
UID: "foo0", UID: "foo0",
Namespace: api.NamespaceDefault, Namespace: api.NamespaceDefault,
}}} }}}
task, err := reg.Register(podtask.New(api.NewDefaultContext(), "bar", *pod.Pod, &mesos.ExecutorInfo{}))
task, err := podtask.New(api.NewDefaultContext(), "bar", *pod.Pod, &mesos.ExecutorInfo{})
if err != nil {
t.Fatalf("unexpected error: %v", err)
}
task, err = reg.Register(task)
if err != nil { if err != nil {
t.Fatalf("unexpected error: %v", err) t.Fatalf("unexpected error: %v", err)
} }

View File

@ -41,7 +41,7 @@ const (
type Registry interface { type Registry interface {
// register the specified task with this registry, as long as the current error // register the specified task with this registry, as long as the current error
// condition is nil. if no errors occur then return a copy of the registered task. // condition is nil. if no errors occur then return a copy of the registered task.
Register(*T, error) (*T, error) Register(*T) (*T, error)
// unregister the specified task from this registry // unregister the specified task from this registry
Unregister(*T) Unregister(*T)
@ -103,8 +103,7 @@ func (k *inMemoryRegistry) ForPod(podID string) (task *T, currentState StateType
} }
// registers a pod task unless the spec'd error is not nil // registers a pod task unless the spec'd error is not nil
func (k *inMemoryRegistry) Register(task *T, err error) (*T, error) { func (k *inMemoryRegistry) Register(task *T) (*T, error) {
if err == nil {
k.rw.Lock() k.rw.Lock()
defer k.rw.Unlock() defer k.rw.Unlock()
if _, found := k.podToTask[task.podKey]; found { if _, found := k.podToTask[task.podKey]; found {
@ -115,8 +114,8 @@ func (k *inMemoryRegistry) Register(task *T, err error) (*T, error) {
} }
k.podToTask[task.podKey] = task.ID k.podToTask[task.podKey] = task.ID
k.taskRegistry[task.ID] = task k.taskRegistry[task.ID] = task
}
return task.Clone(), err return task.Clone(), nil
} }
// updates internal task state. updates are limited to Spec, Flags, and Offer for // updates internal task state. updates are limited to Spec, Flags, and Offer for

View File

@ -38,14 +38,14 @@ func TestInMemoryRegistry_RegisterGetUnregister(t *testing.T) {
// add a task // add a task
a, _ := fakePodTask("a") a, _ := fakePodTask("a")
a_clone, err := registry.Register(a, nil) a_clone, err := registry.Register(a)
assert.NoError(err) assert.NoError(err)
assert.Equal(a_clone.ID, a.ID) assert.Equal(a_clone.ID, a.ID)
assert.Equal(a_clone.podKey, a.podKey) assert.Equal(a_clone.podKey, a.podKey)
// add another task // add another task
b, _ := fakePodTask("b") b, _ := fakePodTask("b")
b_clone, err := registry.Register(b, nil) b_clone, err := registry.Register(b)
assert.NoError(err) assert.NoError(err)
assert.Equal(b_clone.ID, b.ID) assert.Equal(b_clone.ID, b.ID)
assert.Equal(b_clone.podKey, b.podKey) assert.Equal(b_clone.podKey, b.podKey)
@ -79,21 +79,21 @@ func TestInMemoryRegistry_RegisterGetUnregister(t *testing.T) {
assert.Nil(task) assert.Nil(task)
// re-add a task // re-add a task
a_clone, err = registry.Register(a, nil) a_clone, err = registry.Register(a)
assert.Error(err) assert.Error(err)
assert.Nil(a_clone) assert.Nil(a_clone)
// re-add a task with another podKey, but same task id // re-add a task with another podKey, but same task id
another_a := a.Clone() another_a := a.Clone()
another_a.podKey = "another-pod" another_a.podKey = "another-pod"
another_a_clone, err := registry.Register(another_a, nil) another_a_clone, err := registry.Register(another_a)
assert.Error(err) assert.Error(err)
assert.Nil(another_a_clone) assert.Nil(another_a_clone)
// re-add a task with another task ID, but same podKey // re-add a task with another task ID, but same podKey
another_b := b.Clone() another_b := b.Clone()
another_b.ID = "another-task-id" another_b.ID = "another-task-id"
another_b_clone, err := registry.Register(another_b, nil) another_b_clone, err := registry.Register(another_b)
assert.Error(err) assert.Error(err)
assert.Nil(another_b_clone) assert.Nil(another_b_clone)
@ -124,7 +124,7 @@ func TestInMemoryRegistry_State(t *testing.T) {
// add a task // add a task
a, _ := fakePodTask("a") a, _ := fakePodTask("a")
a_clone, err := registry.Register(a, nil) a_clone, err := registry.Register(a)
assert.NoError(err) assert.NoError(err)
assert.Equal(a.State, a_clone.State) assert.Equal(a.State, a_clone.State)
@ -167,7 +167,7 @@ func TestInMemoryRegistry_Update(t *testing.T) {
// create registry // create registry
registry := NewInMemoryRegistry() registry := NewInMemoryRegistry()
a, _ := fakePodTask("a") a, _ := fakePodTask("a")
registry.Register(a.Clone(), nil) // here clone a because we change it below registry.Register(a.Clone()) // here clone a because we change it below
// state changes are ignored // state changes are ignored
a.State = StateRunning a.State = StateRunning
@ -256,7 +256,7 @@ func testStateTrace(t *testing.T, transitions []transition) *Registry {
registry := NewInMemoryRegistry() registry := NewInMemoryRegistry()
a, _ := fakePodTask("a") a, _ := fakePodTask("a")
a, _ = registry.Register(a, nil) a, _ = registry.Register(a)
// initial pending state // initial pending state
assert.Equal(a.State, StatePending) assert.Equal(a.State, StatePending)

View File

@ -502,7 +502,7 @@ func (k *KubernetesScheduler) reconcileNonTerminalTask(driver bindings.Scheduler
} else if pod, err := k.client.Pods(namespace).Get(name); err == nil { } else if pod, err := k.client.Pods(namespace).Get(name); err == nil {
if t, ok, err := podtask.RecoverFrom(*pod); ok { if t, ok, err := podtask.RecoverFrom(*pod); ok {
log.Infof("recovered task %v from metadata in pod %v/%v", taskId, namespace, name) log.Infof("recovered task %v from metadata in pod %v/%v", taskId, namespace, name)
_, err := k.taskRegistry.Register(t, nil) _, err := k.taskRegistry.Register(t)
if err != nil { if err != nil {
// someone beat us to it?! // someone beat us to it?!
log.Warningf("failed to register recovered task: %v", err) log.Warningf("failed to register recovered task: %v", err)
@ -912,7 +912,7 @@ func (ks *KubernetesScheduler) recoverTasks() error {
log.Errorf("failed to delete pod '%v/%v': %v", pod.Namespace, pod.Name, err) log.Errorf("failed to delete pod '%v/%v': %v", pod.Namespace, pod.Name, err)
} }
} else if ok { } else if ok {
ks.taskRegistry.Register(t, nil) ks.taskRegistry.Register(t)
recoverSlave(t) recoverSlave(t)
log.Infof("recovered task %v from pod %v/%v", t.ID, pod.Namespace, pod.Name) log.Infof("recovered task %v from pod %v/%v", t.ID, pod.Namespace, pod.Name)
} }