mirror of
https://github.com/k3s-io/kubernetes.git
synced 2025-08-02 00:07:50 +00:00
Merge pull request #15976 from mesosphere/sur-k8sm-475-error-checking
Auto commit by PR queue bot
This commit is contained in:
commit
8d923afe23
@ -278,7 +278,18 @@ func (k *kubeScheduler) Schedule(pod *api.Pod, unused algorithm.NodeLister) (str
|
|||||||
log.Infof("aborting Schedule, pod has been deleted %+v", pod)
|
log.Infof("aborting Schedule, pod has been deleted %+v", pod)
|
||||||
return "", noSuchPodErr
|
return "", noSuchPodErr
|
||||||
}
|
}
|
||||||
return k.doSchedule(k.api.tasks().Register(k.api.createPodTask(ctx, pod)))
|
|
||||||
|
task, err := k.api.createPodTask(ctx, pod)
|
||||||
|
if err != nil {
|
||||||
|
return "", err
|
||||||
|
}
|
||||||
|
|
||||||
|
task, err = k.api.tasks().Register(task)
|
||||||
|
if err != nil {
|
||||||
|
return "", err
|
||||||
|
}
|
||||||
|
|
||||||
|
return k.doSchedule(task)
|
||||||
|
|
||||||
//TODO(jdef) it's possible that the pod state has diverged from what
|
//TODO(jdef) it's possible that the pod state has diverged from what
|
||||||
//we knew previously, we should probably update the task.Pod state here
|
//we knew previously, we should probably update the task.Pod state here
|
||||||
@ -294,7 +305,7 @@ func (k *kubeScheduler) Schedule(pod *api.Pod, unused algorithm.NodeLister) (str
|
|||||||
// but we're going to let someone else handle it, probably the mesos task error handler
|
// but we're going to let someone else handle it, probably the mesos task error handler
|
||||||
return "", fmt.Errorf("task %s has already been launched, aborting schedule", task.ID)
|
return "", fmt.Errorf("task %s has already been launched, aborting schedule", task.ID)
|
||||||
} else {
|
} else {
|
||||||
return k.doSchedule(task, nil)
|
return k.doSchedule(task)
|
||||||
}
|
}
|
||||||
|
|
||||||
default:
|
default:
|
||||||
@ -302,16 +313,18 @@ func (k *kubeScheduler) Schedule(pod *api.Pod, unused algorithm.NodeLister) (str
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// Call ScheduleFunc and subtract some resources, returning the name of the machine the task is scheduled on
|
// doSchedule schedules the given task and returns the machine the task is scheduled on
|
||||||
func (k *kubeScheduler) doSchedule(task *podtask.T, err error) (string, error) {
|
// or an error if the scheduling failed.
|
||||||
|
func (k *kubeScheduler) doSchedule(task *podtask.T) (string, error) {
|
||||||
var offer offers.Perishable
|
var offer offers.Perishable
|
||||||
|
var err error
|
||||||
|
|
||||||
if task.HasAcceptedOffer() {
|
if task.HasAcceptedOffer() {
|
||||||
// verify that the offer is still on the table
|
// verify that the offer is still on the table
|
||||||
offerId := task.GetOfferId()
|
var ok bool
|
||||||
if offer, ok := k.api.offers().Get(offerId); ok && !offer.HasExpired() {
|
offer, ok = k.api.offers().Get(task.GetOfferId())
|
||||||
// skip tasks that have already have assigned offers
|
|
||||||
offer = task.Offer
|
if !ok || offer.HasExpired() {
|
||||||
} else {
|
|
||||||
task.Offer.Release()
|
task.Offer.Release()
|
||||||
task.Reset()
|
task.Reset()
|
||||||
if err = k.api.tasks().Update(task); err != nil {
|
if err = k.api.tasks().Update(task); err != nil {
|
||||||
@ -319,36 +332,46 @@ func (k *kubeScheduler) doSchedule(task *podtask.T, err error) (string, error) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
if err == nil && offer == nil {
|
|
||||||
|
if offer == nil {
|
||||||
offer, err = k.api.algorithm().SchedulePod(k.api.offers(), k.api, task)
|
offer, err = k.api.algorithm().SchedulePod(k.api.offers(), k.api, task)
|
||||||
}
|
}
|
||||||
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return "", err
|
return "", err
|
||||||
}
|
}
|
||||||
|
|
||||||
details := offer.Details()
|
details := offer.Details()
|
||||||
if details == nil {
|
if details == nil {
|
||||||
return "", fmt.Errorf("offer already invalid/expired for task %v", task.ID)
|
return "", fmt.Errorf("offer already invalid/expired for task %v", task.ID)
|
||||||
}
|
}
|
||||||
|
|
||||||
slaveId := details.GetSlaveId().GetValue()
|
slaveId := details.GetSlaveId().GetValue()
|
||||||
if slaveHostName := k.api.slaveHostNameFor(slaveId); slaveHostName == "" {
|
slaveHostName := k.api.slaveHostNameFor(slaveId)
|
||||||
|
if slaveHostName == "" {
|
||||||
// not much sense in Release()ing the offer here since its owner died
|
// not much sense in Release()ing the offer here since its owner died
|
||||||
offer.Release()
|
offer.Release()
|
||||||
k.api.offers().Invalidate(details.Id.GetValue())
|
k.api.offers().Invalidate(details.Id.GetValue())
|
||||||
return "", fmt.Errorf("Slave disappeared (%v) while scheduling task %v", slaveId, task.ID)
|
return "", fmt.Errorf("Slave disappeared (%v) while scheduling task %v", slaveId, task.ID)
|
||||||
} else {
|
|
||||||
if task.Offer != nil && task.Offer != offer {
|
|
||||||
return "", fmt.Errorf("task.offer assignment must be idempotent, task %+v: offer %+v", task, offer)
|
|
||||||
}
|
|
||||||
|
|
||||||
task.Offer = offer
|
|
||||||
k.api.algorithm().Procurement()(task, details) // TODO(jdef) why is nothing checking the error returned here?
|
|
||||||
|
|
||||||
if err := k.api.tasks().Update(task); err != nil {
|
|
||||||
offer.Release()
|
|
||||||
return "", err
|
|
||||||
}
|
|
||||||
return slaveHostName, nil
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if task.Offer != nil && task.Offer != offer {
|
||||||
|
return "", fmt.Errorf("task.offer assignment must be idempotent, task %+v: offer %+v", task, offer)
|
||||||
|
}
|
||||||
|
|
||||||
|
task.Offer = offer
|
||||||
|
if err := k.api.algorithm().Procurement()(task, details); err != nil {
|
||||||
|
offer.Release()
|
||||||
|
task.Reset()
|
||||||
|
return "", err
|
||||||
|
}
|
||||||
|
|
||||||
|
if err := k.api.tasks().Update(task); err != nil {
|
||||||
|
offer.Release()
|
||||||
|
return "", err
|
||||||
|
}
|
||||||
|
|
||||||
|
return slaveHostName, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
type queuer struct {
|
type queuer struct {
|
||||||
|
@ -880,11 +880,17 @@ func TestDeleteOne_PendingPod(t *testing.T) {
|
|||||||
UID: "foo0",
|
UID: "foo0",
|
||||||
Namespace: api.NamespaceDefault,
|
Namespace: api.NamespaceDefault,
|
||||||
}}}
|
}}}
|
||||||
_, err := reg.Register(podtask.New(api.NewDefaultContext(), "bar", *pod.Pod, &mesos.ExecutorInfo{}))
|
|
||||||
|
task, err := podtask.New(api.NewDefaultContext(), "bar", *pod.Pod, &mesos.ExecutorInfo{})
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatalf("failed to create task: %v", err)
|
t.Fatalf("failed to create task: %v", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
_, err = reg.Register(task)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("failed to register task: %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
// preconditions
|
// preconditions
|
||||||
qr := newQueuer(nil)
|
qr := newQueuer(nil)
|
||||||
qr.podQueue.Add(pod, queue.ReplaceExisting)
|
qr.podQueue.Add(pod, queue.ReplaceExisting)
|
||||||
@ -917,7 +923,13 @@ func TestDeleteOne_Running(t *testing.T) {
|
|||||||
UID: "foo0",
|
UID: "foo0",
|
||||||
Namespace: api.NamespaceDefault,
|
Namespace: api.NamespaceDefault,
|
||||||
}}}
|
}}}
|
||||||
task, err := reg.Register(podtask.New(api.NewDefaultContext(), "bar", *pod.Pod, &mesos.ExecutorInfo{}))
|
|
||||||
|
task, err := podtask.New(api.NewDefaultContext(), "bar", *pod.Pod, &mesos.ExecutorInfo{})
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("unexpected error: %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
task, err = reg.Register(task)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatalf("unexpected error: %v", err)
|
t.Fatalf("unexpected error: %v", err)
|
||||||
}
|
}
|
||||||
|
@ -41,7 +41,7 @@ const (
|
|||||||
type Registry interface {
|
type Registry interface {
|
||||||
// register the specified task with this registry, as long as the current error
|
// register the specified task with this registry, as long as the current error
|
||||||
// condition is nil. if no errors occur then return a copy of the registered task.
|
// condition is nil. if no errors occur then return a copy of the registered task.
|
||||||
Register(*T, error) (*T, error)
|
Register(*T) (*T, error)
|
||||||
|
|
||||||
// unregister the specified task from this registry
|
// unregister the specified task from this registry
|
||||||
Unregister(*T)
|
Unregister(*T)
|
||||||
@ -103,20 +103,19 @@ func (k *inMemoryRegistry) ForPod(podID string) (task *T, currentState StateType
|
|||||||
}
|
}
|
||||||
|
|
||||||
// registers a pod task unless the spec'd error is not nil
|
// registers a pod task unless the spec'd error is not nil
|
||||||
func (k *inMemoryRegistry) Register(task *T, err error) (*T, error) {
|
func (k *inMemoryRegistry) Register(task *T) (*T, error) {
|
||||||
if err == nil {
|
k.rw.Lock()
|
||||||
k.rw.Lock()
|
defer k.rw.Unlock()
|
||||||
defer k.rw.Unlock()
|
if _, found := k.podToTask[task.podKey]; found {
|
||||||
if _, found := k.podToTask[task.podKey]; found {
|
return nil, fmt.Errorf("task already registered for pod key %q", task.podKey)
|
||||||
return nil, fmt.Errorf("task already registered for pod key %q", task.podKey)
|
|
||||||
}
|
|
||||||
if _, found := k.taskRegistry[task.ID]; found {
|
|
||||||
return nil, fmt.Errorf("task already registered for id %q", task.ID)
|
|
||||||
}
|
|
||||||
k.podToTask[task.podKey] = task.ID
|
|
||||||
k.taskRegistry[task.ID] = task
|
|
||||||
}
|
}
|
||||||
return task.Clone(), err
|
if _, found := k.taskRegistry[task.ID]; found {
|
||||||
|
return nil, fmt.Errorf("task already registered for id %q", task.ID)
|
||||||
|
}
|
||||||
|
k.podToTask[task.podKey] = task.ID
|
||||||
|
k.taskRegistry[task.ID] = task
|
||||||
|
|
||||||
|
return task.Clone(), nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// updates internal task state. updates are limited to Spec, Flags, and Offer for
|
// updates internal task state. updates are limited to Spec, Flags, and Offer for
|
||||||
|
@ -38,14 +38,14 @@ func TestInMemoryRegistry_RegisterGetUnregister(t *testing.T) {
|
|||||||
|
|
||||||
// add a task
|
// add a task
|
||||||
a, _ := fakePodTask("a")
|
a, _ := fakePodTask("a")
|
||||||
a_clone, err := registry.Register(a, nil)
|
a_clone, err := registry.Register(a)
|
||||||
assert.NoError(err)
|
assert.NoError(err)
|
||||||
assert.Equal(a_clone.ID, a.ID)
|
assert.Equal(a_clone.ID, a.ID)
|
||||||
assert.Equal(a_clone.podKey, a.podKey)
|
assert.Equal(a_clone.podKey, a.podKey)
|
||||||
|
|
||||||
// add another task
|
// add another task
|
||||||
b, _ := fakePodTask("b")
|
b, _ := fakePodTask("b")
|
||||||
b_clone, err := registry.Register(b, nil)
|
b_clone, err := registry.Register(b)
|
||||||
assert.NoError(err)
|
assert.NoError(err)
|
||||||
assert.Equal(b_clone.ID, b.ID)
|
assert.Equal(b_clone.ID, b.ID)
|
||||||
assert.Equal(b_clone.podKey, b.podKey)
|
assert.Equal(b_clone.podKey, b.podKey)
|
||||||
@ -79,21 +79,21 @@ func TestInMemoryRegistry_RegisterGetUnregister(t *testing.T) {
|
|||||||
assert.Nil(task)
|
assert.Nil(task)
|
||||||
|
|
||||||
// re-add a task
|
// re-add a task
|
||||||
a_clone, err = registry.Register(a, nil)
|
a_clone, err = registry.Register(a)
|
||||||
assert.Error(err)
|
assert.Error(err)
|
||||||
assert.Nil(a_clone)
|
assert.Nil(a_clone)
|
||||||
|
|
||||||
// re-add a task with another podKey, but same task id
|
// re-add a task with another podKey, but same task id
|
||||||
another_a := a.Clone()
|
another_a := a.Clone()
|
||||||
another_a.podKey = "another-pod"
|
another_a.podKey = "another-pod"
|
||||||
another_a_clone, err := registry.Register(another_a, nil)
|
another_a_clone, err := registry.Register(another_a)
|
||||||
assert.Error(err)
|
assert.Error(err)
|
||||||
assert.Nil(another_a_clone)
|
assert.Nil(another_a_clone)
|
||||||
|
|
||||||
// re-add a task with another task ID, but same podKey
|
// re-add a task with another task ID, but same podKey
|
||||||
another_b := b.Clone()
|
another_b := b.Clone()
|
||||||
another_b.ID = "another-task-id"
|
another_b.ID = "another-task-id"
|
||||||
another_b_clone, err := registry.Register(another_b, nil)
|
another_b_clone, err := registry.Register(another_b)
|
||||||
assert.Error(err)
|
assert.Error(err)
|
||||||
assert.Nil(another_b_clone)
|
assert.Nil(another_b_clone)
|
||||||
|
|
||||||
@ -124,7 +124,7 @@ func TestInMemoryRegistry_State(t *testing.T) {
|
|||||||
|
|
||||||
// add a task
|
// add a task
|
||||||
a, _ := fakePodTask("a")
|
a, _ := fakePodTask("a")
|
||||||
a_clone, err := registry.Register(a, nil)
|
a_clone, err := registry.Register(a)
|
||||||
assert.NoError(err)
|
assert.NoError(err)
|
||||||
assert.Equal(a.State, a_clone.State)
|
assert.Equal(a.State, a_clone.State)
|
||||||
|
|
||||||
@ -167,7 +167,7 @@ func TestInMemoryRegistry_Update(t *testing.T) {
|
|||||||
// create registry
|
// create registry
|
||||||
registry := NewInMemoryRegistry()
|
registry := NewInMemoryRegistry()
|
||||||
a, _ := fakePodTask("a")
|
a, _ := fakePodTask("a")
|
||||||
registry.Register(a.Clone(), nil) // here clone a because we change it below
|
registry.Register(a.Clone()) // here clone a because we change it below
|
||||||
|
|
||||||
// state changes are ignored
|
// state changes are ignored
|
||||||
a.State = StateRunning
|
a.State = StateRunning
|
||||||
@ -256,7 +256,7 @@ func testStateTrace(t *testing.T, transitions []transition) *Registry {
|
|||||||
|
|
||||||
registry := NewInMemoryRegistry()
|
registry := NewInMemoryRegistry()
|
||||||
a, _ := fakePodTask("a")
|
a, _ := fakePodTask("a")
|
||||||
a, _ = registry.Register(a, nil)
|
a, _ = registry.Register(a)
|
||||||
|
|
||||||
// initial pending state
|
// initial pending state
|
||||||
assert.Equal(a.State, StatePending)
|
assert.Equal(a.State, StatePending)
|
||||||
|
@ -502,7 +502,7 @@ func (k *KubernetesScheduler) reconcileNonTerminalTask(driver bindings.Scheduler
|
|||||||
} else if pod, err := k.client.Pods(namespace).Get(name); err == nil {
|
} else if pod, err := k.client.Pods(namespace).Get(name); err == nil {
|
||||||
if t, ok, err := podtask.RecoverFrom(*pod); ok {
|
if t, ok, err := podtask.RecoverFrom(*pod); ok {
|
||||||
log.Infof("recovered task %v from metadata in pod %v/%v", taskId, namespace, name)
|
log.Infof("recovered task %v from metadata in pod %v/%v", taskId, namespace, name)
|
||||||
_, err := k.taskRegistry.Register(t, nil)
|
_, err := k.taskRegistry.Register(t)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
// someone beat us to it?!
|
// someone beat us to it?!
|
||||||
log.Warningf("failed to register recovered task: %v", err)
|
log.Warningf("failed to register recovered task: %v", err)
|
||||||
@ -912,7 +912,7 @@ func (ks *KubernetesScheduler) recoverTasks() error {
|
|||||||
log.Errorf("failed to delete pod '%v/%v': %v", pod.Namespace, pod.Name, err)
|
log.Errorf("failed to delete pod '%v/%v': %v", pod.Namespace, pod.Name, err)
|
||||||
}
|
}
|
||||||
} else if ok {
|
} else if ok {
|
||||||
ks.taskRegistry.Register(t, nil)
|
ks.taskRegistry.Register(t)
|
||||||
recoverSlave(t)
|
recoverSlave(t)
|
||||||
log.Infof("recovered task %v from pod %v/%v", t.ID, pod.Namespace, pod.Name)
|
log.Infof("recovered task %v from pod %v/%v", t.ID, pod.Namespace, pod.Name)
|
||||||
}
|
}
|
||||||
|
Loading…
Reference in New Issue
Block a user