diff --git a/contrib/mesos/docs/architecture.md b/contrib/mesos/docs/architecture.md index ff9e6d80a97..937a6efb2b3 100644 --- a/contrib/mesos/docs/architecture.md +++ b/contrib/mesos/docs/architecture.md @@ -18,6 +18,25 @@ The executor launches the pod/task, which registers the bound pod with the kubel ![Architecture Diagram](architecture.png) +## Scheduling + +The scheduling of a pod on Kubernetes on Mesos is essentially a two-phase process: + +1. A new pod is noticed by the k8sm-scheduler and possibly matched with a + Mesos offer. Then: + + - The offer is *accepted*, + - the pod is *annotated* with a number of annotation, especially `k8s.mesosphere.io/bindingHost` + - the pod is *launched* on a Mesos slave. + + The existence of the `bindingHost` annotation tells the k8sm-scheduler that this pod has been launched. If it is not set, the pod is considered *new*. + +2. The Mesos slave receives the task launch event and starts (if not running yet) the k8sm-executor (possibly via the km hyperkube binary). Then: + + - The k8sm-executor *binds* the tasks to the node via the apiserver, which means that the `NodeName` field is set by the apiserver. + - The k8sm-executor sends the pod to the kubelet which is part of the k8sm-executor process. + - The kubelet launches the containers using Docker. + ## Networking Kubernetes-Mesos uses "normal" Docker IPv4, host-private networking, rather than Kubernetes' SDN-based networking that assigns an IP per pod. This is mostly transparent to the user, especially when using the service abstraction to access pods. For details on some issues it creates, see [issues][3]. diff --git a/contrib/mesos/pkg/executor/executor.go b/contrib/mesos/pkg/executor/executor.go index c8da632abd2..575a0875380 100644 --- a/contrib/mesos/pkg/executor/executor.go +++ b/contrib/mesos/pkg/executor/executor.go @@ -437,25 +437,6 @@ func (k *KubernetesExecutor) attemptSuicide(driver bindings.ExecutorDriver, abor // async continuation of LaunchTask func (k *KubernetesExecutor) launchTask(driver bindings.ExecutorDriver, taskId string, pod *api.Pod) { - - //HACK(jdef): cloned binding construction from k8s plugin/pkg/scheduler/scheduler.go - binding := &api.Binding{ - ObjectMeta: api.ObjectMeta{ - Namespace: pod.Namespace, - Name: pod.Name, - Annotations: make(map[string]string), - }, - Target: api.ObjectReference{ - Kind: "Node", - Name: pod.Annotations[meta.BindingHostKey], - }, - } - - // forward the annotations that the scheduler wants to apply - for k, v := range pod.Annotations { - binding.Annotations[k] = v - } - deleteTask := func() { k.lock.Lock() defer k.lock.Unlock() @@ -463,17 +444,57 @@ func (k *KubernetesExecutor) launchTask(driver bindings.ExecutorDriver, taskId s k.resetSuicideWatch(driver) } - log.Infof("Binding '%v/%v' to '%v' with annotations %+v...", pod.Namespace, pod.Name, binding.Target.Name, binding.Annotations) - ctx := api.WithNamespace(api.NewContext(), binding.Namespace) // TODO(k8s): use Pods interface for binding once clusters are upgraded // return b.Pods(binding.Namespace).Bind(binding) - err := k.client.Post().Namespace(api.NamespaceValue(ctx)).Resource("bindings").Body(binding).Do().Error() - if err != nil { - deleteTask() - k.sendStatus(driver, newStatus(mutil.NewTaskID(taskId), mesos.TaskState_TASK_FAILED, - messages.CreateBindingFailure)) - return + if pod.Spec.NodeName == "" { + //HACK(jdef): cloned binding construction from k8s plugin/pkg/scheduler/scheduler.go + binding := &api.Binding{ + ObjectMeta: api.ObjectMeta{ + Namespace: pod.Namespace, + Name: pod.Name, + Annotations: make(map[string]string), + }, + Target: api.ObjectReference{ + Kind: "Node", + Name: pod.Annotations[meta.BindingHostKey], + }, + } + + // forward the annotations that the scheduler wants to apply + for k, v := range pod.Annotations { + binding.Annotations[k] = v + } + + // create binding on apiserver + log.Infof("Binding '%v/%v' to '%v' with annotations %+v...", pod.Namespace, pod.Name, binding.Target.Name, binding.Annotations) + ctx := api.WithNamespace(api.NewContext(), binding.Namespace) + err := k.client.Post().Namespace(api.NamespaceValue(ctx)).Resource("bindings").Body(binding).Do().Error() + if err != nil { + deleteTask() + k.sendStatus(driver, newStatus(mutil.NewTaskID(taskId), mesos.TaskState_TASK_FAILED, + messages.CreateBindingFailure)) + return + } + } else { + // post annotations update to apiserver + patch := struct { + Metadata struct { + Annotations map[string]string `json:"annotations"` + } `json:"metadata"` + }{} + patch.Metadata.Annotations = pod.Annotations + patchJson, _ := json.Marshal(patch) + log.V(4).Infof("Patching annotations %v of pod %v/%v: %v", pod.Annotations, pod.Namespace, pod.Name, string(patchJson)) + err := k.client.Patch(api.MergePatchType).RequestURI(pod.SelfLink).Body(patchJson).Do().Error() + if err != nil { + log.Errorf("Error updating annotations of ready-to-launch pod %v/%v: %v", pod.Namespace, pod.Name, err) + deleteTask() + k.sendStatus(driver, newStatus(mutil.NewTaskID(taskId), mesos.TaskState_TASK_FAILED, + messages.AnnotationUpdateFailure)) + return + } } + podFullName := container.GetPodFullName(pod) // allow a recently failed-over scheduler the chance to recover the task/pod binding: diff --git a/contrib/mesos/pkg/executor/messages/messages.go b/contrib/mesos/pkg/executor/messages/messages.go index bf3dd1a9888..f3287b2c9f5 100644 --- a/contrib/mesos/pkg/executor/messages/messages.go +++ b/contrib/mesos/pkg/executor/messages/messages.go @@ -29,4 +29,6 @@ const ( UnmarshalTaskDataFailure = "unmarshal-task-data-failure" TaskLostAck = "task-lost-ack" // executor acknowledgement of forwarded TASK_LOST framework message Kamikaze = "kamikaze" + WrongSlaveFailure = "pod-for-wrong-slave-failure" + AnnotationUpdateFailure = "annotation-update-failure" ) diff --git a/contrib/mesos/pkg/scheduler/meta/annotations.go b/contrib/mesos/pkg/scheduler/meta/annotations.go index 5c9bf099182..dc8dde231e7 100644 --- a/contrib/mesos/pkg/scheduler/meta/annotations.go +++ b/contrib/mesos/pkg/scheduler/meta/annotations.go @@ -18,7 +18,10 @@ package meta // kubernetes api object annotations const ( - BindingHostKey = "k8s.mesosphere.io/bindingHost" + // the BindingHostKey pod annotation marks a pod as being assigned to a Mesos + // slave. It is already or will be launched on the slave as a task. + BindingHostKey = "k8s.mesosphere.io/bindingHost" + TaskIdKey = "k8s.mesosphere.io/taskId" SlaveIdKey = "k8s.mesosphere.io/slaveId" OfferIdKey = "k8s.mesosphere.io/offerId" diff --git a/contrib/mesos/pkg/scheduler/plugin.go b/contrib/mesos/pkg/scheduler/plugin.go index c0b1f2666ae..06241e150bf 100644 --- a/contrib/mesos/pkg/scheduler/plugin.go +++ b/contrib/mesos/pkg/scheduler/plugin.go @@ -151,6 +151,11 @@ func (b *binder) rollback(task *podtask.T, err error) error { } // assumes that: caller has acquired scheduler lock and that the task is still pending +// +// bind does not actually do the binding itself, but launches the pod as a Mesos task. The +// kubernetes executor on the slave will finally do the binding. This is different from the +// upstream scheduler in the sense that the upstream scheduler does the binding and the +// kubelet will notice that and launches the pod. func (b *binder) bind(ctx api.Context, binding *api.Binding, task *podtask.T) (err error) { // sanity check: ensure that the task hasAcceptedOffer(), it's possible that between // Schedule() and now that the offer for this task was rescinded or invalidated. @@ -193,16 +198,7 @@ func (b *binder) prepareTaskForLaunch(ctx api.Context, machine string, task *pod oemCt := pod.Spec.Containers pod.Spec.Containers = append([]api.Container{}, oemCt...) // (shallow) clone before mod - if pod.Annotations == nil { - pod.Annotations = make(map[string]string) - } else { - oemAnn := pod.Annotations - pod.Annotations = make(map[string]string) - for k, v := range oemAnn { - pod.Annotations[k] = v - } - } - pod.Annotations[annotation.BindingHostKey] = machine + annotateForExecutorOnSlave(&pod, machine) task.SaveRecoveryInfo(pod.Annotations) for _, entry := range task.Spec.PortMap { @@ -237,6 +233,31 @@ type kubeScheduler struct { defaultContainerMemLimit mresource.MegaBytes } +// annotatedForExecutor checks whether a pod is assigned to a Mesos slave, and +// possibly already launched. It can, but doesn't have to be scheduled already +// in the sense of kubernetes, i.e. the NodeName field might still be empty. +func annotatedForExecutor(pod *api.Pod) bool { + _, ok := pod.ObjectMeta.Annotations[annotation.BindingHostKey] + return ok +} + +// annotateForExecutorOnSlave sets the BindingHostKey annotation which +// marks the pod to be processed by the scheduler and launched as a Mesos +// task. The executor on the slave will to the final binding to finish the +// scheduling in the kubernetes sense. +func annotateForExecutorOnSlave(pod *api.Pod, slave string) { + if pod.Annotations == nil { + pod.Annotations = make(map[string]string) + } else { + oemAnn := pod.Annotations + pod.Annotations = make(map[string]string) + for k, v := range oemAnn { + pod.Annotations[k] = v + } + } + pod.Annotations[annotation.BindingHostKey] = slave +} + // Schedule implements the Scheduler interface of Kubernetes. // It returns the selectedMachine's name and error (if there's any). func (k *kubeScheduler) Schedule(pod *api.Pod, unused algorithm.MinionLister) (string, error) { @@ -441,7 +462,7 @@ func (q *queuer) Run(done <-chan struct{}) { } pod := p.(*Pod) - if pod.Spec.NodeName != "" { + if annotatedForExecutor(pod.Pod) { log.V(3).Infof("dequeuing pod for scheduling: %v", pod.Pod.Name) q.dequeue(pod.GetUID()) } else { @@ -490,7 +511,7 @@ func (q *queuer) yield() *api.Pod { log.Warningf("yield unable to understand pod object %+v, will skip: %v", pod, err) } else if !q.podUpdates.Poll(podName, queue.POP_EVENT) { log.V(1).Infof("yield popped a transitioning pod, skipping: %+v", pod) - } else if pod.Spec.NodeName != "" { + } else if annotatedForExecutor(pod) { // should never happen if enqueuePods is filtering properly log.Warningf("yield popped an already-scheduled pod, skipping: %+v", pod) } else { @@ -798,7 +819,7 @@ func (s *schedulingPlugin) reconcilePod(oldPod api.Pod) { return } if oldPod.Spec.NodeName != pod.Spec.NodeName { - if pod.Spec.NodeName == "" { + if annotatedForExecutor(pod) { // pod is unscheduled. // it's possible that we dropped the pod in the scheduler error handler // because of task misalignment with the pod (task.Has(podtask.Launched) == true) diff --git a/contrib/mesos/pkg/scheduler/plugin_test.go b/contrib/mesos/pkg/scheduler/plugin_test.go index 9400c60b672..5a55eec4615 100644 --- a/contrib/mesos/pkg/scheduler/plugin_test.go +++ b/contrib/mesos/pkg/scheduler/plugin_test.go @@ -37,6 +37,7 @@ import ( "github.com/GoogleCloudPlatform/kubernetes/contrib/mesos/pkg/queue" schedcfg "github.com/GoogleCloudPlatform/kubernetes/contrib/mesos/pkg/scheduler/config" "github.com/GoogleCloudPlatform/kubernetes/contrib/mesos/pkg/scheduler/ha" + "github.com/GoogleCloudPlatform/kubernetes/contrib/mesos/pkg/scheduler/meta" "github.com/GoogleCloudPlatform/kubernetes/contrib/mesos/pkg/scheduler/podtask" mresource "github.com/GoogleCloudPlatform/kubernetes/contrib/mesos/pkg/scheduler/resource" log "github.com/golang/glog" @@ -189,8 +190,11 @@ func (lw *MockPodsListWatch) Delete(pod *api.Pod, notify bool) { } // Create a pod with a given index, requiring one port -func NewTestPod(i int) *api.Pod { - name := fmt.Sprintf("pod%d", i) +var currentPodNum int = 0 + +func NewTestPod() (*api.Pod, int) { + currentPodNum = currentPodNum + 1 + name := fmt.Sprintf("pod%d", currentPodNum) return &api.Pod{ TypeMeta: api.TypeMeta{APIVersion: testapi.Version()}, ObjectMeta: api.ObjectMeta{ @@ -203,7 +207,7 @@ func NewTestPod(i int) *api.Pod { { Ports: []api.ContainerPort{ { - ContainerPort: 8000 + i, + ContainerPort: 8000 + currentPodNum, Protocol: api.ProtocolTCP, }, }, @@ -211,7 +215,7 @@ func NewTestPod(i int) *api.Pod { }, }, Status: api.PodStatus{ - PodIP: fmt.Sprintf("1.2.3.%d", 4+i), + PodIP: fmt.Sprintf("1.2.3.%d", 4+currentPodNum), Conditions: []api.PodCondition{ { Type: api.PodReady, @@ -219,12 +223,12 @@ func NewTestPod(i int) *api.Pod { }, }, }, - } + }, currentPodNum } // Offering some cpus and memory and the 8000-9000 port range -func NewTestOffer(i int) *mesos.Offer { - hostname := fmt.Sprintf("h%d", i) +func NewTestOffer(id string) *mesos.Offer { + hostname := "some_hostname" cpus := util.NewScalarResource("cpus", 3.75) mem := util.NewScalarResource("mem", 940) var port8000 uint64 = 8000 @@ -232,7 +236,7 @@ func NewTestOffer(i int) *mesos.Offer { ports8000to9000 := mesos.Value_Range{Begin: &port8000, End: &port9000} ports := util.NewRangesResource("ports", []*mesos.Value_Range{&ports8000to9000}) return &mesos.Offer{ - Id: util.NewOfferID(fmt.Sprintf("offer%d", i)), + Id: util.NewOfferID(id), Hostname: &hostname, SlaveId: util.NewSlaveID(hostname), Resources: []*mesos.Resource{cpus, mem, ports}, @@ -435,11 +439,20 @@ func TestPlugin_LifeCycle(t *testing.T) { mockDriver.On("SendFrameworkMessage", mAny("*mesosproto.ExecutorID"), mAny("*mesosproto.SlaveID"), mAny("string")). Return(mesos.Status_DRIVER_RUNNING, nil) - launchedTasks := make(chan *mesos.TaskInfo, 1) + type LaunchedTask struct { + offerId mesos.OfferID + taskInfo *mesos.TaskInfo + } + launchedTasks := make(chan LaunchedTask, 1) launchTasksCalledFunc := func(args mock.Arguments) { + offerIDs := args.Get(0).([]*mesos.OfferID) taskInfos := args.Get(1).([]*mesos.TaskInfo) + assert.Equal(1, len(offerIDs)) assert.Equal(1, len(taskInfos)) - launchedTasks <- taskInfos[0] + launchedTasks <- LaunchedTask{ + offerId: *offerIDs[0], + taskInfo: taskInfos[0], + } } mockDriver.On("LaunchTasks", mAny("[]*mesosproto.OfferID"), mAny("[]*mesosproto.TaskInfo"), mAny("*mesosproto.Filters")). Return(mesos.Status_DRIVER_RUNNING, nil).Run(launchTasksCalledFunc) @@ -469,30 +482,30 @@ func TestPlugin_LifeCycle(t *testing.T) { //TODO(jdef) refactor things above here into a test suite setup of some sort // fake new, unscheduled pod - pod1 := NewTestPod(1) - podListWatch.Add(pod1, true) // notify watchers + pod, i := NewTestPod() + podListWatch.Add(pod, true) // notify watchers // wait for failedScheduling event because there is no offer assert.EventWithReason(eventObserver, "failedScheduling", "failedScheduling event not received") // add some matching offer - offers1 := []*mesos.Offer{NewTestOffer(1)} - testScheduler.ResourceOffers(nil, offers1) + offers := []*mesos.Offer{NewTestOffer(fmt.Sprintf("offer%d", i))} + testScheduler.ResourceOffers(nil, offers) // and wait for scheduled pod assert.EventWithReason(eventObserver, "scheduled") select { case launchedTask := <-launchedTasks: // report back that the task has been staged, and then started by mesos - testScheduler.StatusUpdate(mockDriver, newTaskStatusForTask(launchedTask, mesos.TaskState_TASK_STAGING)) - testScheduler.StatusUpdate(mockDriver, newTaskStatusForTask(launchedTask, mesos.TaskState_TASK_RUNNING)) + testScheduler.StatusUpdate(mockDriver, newTaskStatusForTask(launchedTask.taskInfo, mesos.TaskState_TASK_STAGING)) + testScheduler.StatusUpdate(mockDriver, newTaskStatusForTask(launchedTask.taskInfo, mesos.TaskState_TASK_RUNNING)) // check that ExecutorInfo.data has the static pod data - assert.Len(launchedTask.Executor.Data, 3) + assert.Len(launchedTask.taskInfo.Executor.Data, 3) // report back that the task has been lost mockDriver.AssertNumberOfCalls(t, "SendFrameworkMessage", 0) - testScheduler.StatusUpdate(mockDriver, newTaskStatusForTask(launchedTask, mesos.TaskState_TASK_LOST)) + testScheduler.StatusUpdate(mockDriver, newTaskStatusForTask(launchedTask.taskInfo, mesos.TaskState_TASK_LOST)) // and wait that framework message is sent to executor mockDriver.AssertNumberOfCalls(t, "SendFrameworkMessage", 1) @@ -501,20 +514,15 @@ func TestPlugin_LifeCycle(t *testing.T) { t.Fatalf("timed out waiting for launchTasks call") } - // start another pod - podNum := 2 - startPod := func() (*api.Pod, *mesos.TaskInfo, *mesos.Offer) { - podNum = podNum + 1 - - // create pod - pod := NewTestPod(podNum) - podListWatch.Add(pod, true) // notify watchers + // define generic pod startup + startPodWithOffers := func(pod *api.Pod, offers []*mesos.Offer) (*api.Pod, *LaunchedTask, *mesos.Offer) { + // notify watchers of new pod + podListWatch.Add(pod, true) // wait for failedScheduling event because there is no offer assert.EventWithReason(eventObserver, "failedScheduling", "failedScheduling event not received") // supply a matching offer - offers := []*mesos.Offer{NewTestOffer(podNum)} testScheduler.ResourceOffers(mockDriver, offers) // and wait to get scheduled @@ -523,9 +531,15 @@ func TestPlugin_LifeCycle(t *testing.T) { // wait for driver.launchTasks call select { case launchedTask := <-launchedTasks: - testScheduler.StatusUpdate(mockDriver, newTaskStatusForTask(launchedTask, mesos.TaskState_TASK_STAGING)) - testScheduler.StatusUpdate(mockDriver, newTaskStatusForTask(launchedTask, mesos.TaskState_TASK_RUNNING)) - return pod, launchedTask, offers[0] + testScheduler.StatusUpdate(mockDriver, newTaskStatusForTask(launchedTask.taskInfo, mesos.TaskState_TASK_STAGING)) + testScheduler.StatusUpdate(mockDriver, newTaskStatusForTask(launchedTask.taskInfo, mesos.TaskState_TASK_RUNNING)) + for _, offer := range offers { + if offer.Id.GetValue() == launchedTask.offerId.GetValue() { + return pod, &launchedTask, offer + } + } + t.Fatalf("unknown offer used to start a pod") + return nil, nil, nil case <-time.After(5 * time.Second): t.Fatal("timed out waiting for launchTasks") @@ -533,12 +547,19 @@ func TestPlugin_LifeCycle(t *testing.T) { } } - pod, launchedTask, _ := startPod() + startTestPod := func() (*api.Pod, *LaunchedTask, *mesos.Offer) { + pod, i := NewTestPod() + offers := []*mesos.Offer{NewTestOffer(fmt.Sprintf("offer%d", i))} + return startPodWithOffers(pod, offers) + } + + // start another pod + pod, launchedTask, _ := startTestPod() // mock drvier.KillTask, should be invoked when a pod is deleted mockDriver.On("KillTask", mAny("*mesosproto.TaskID")).Return(mesos.Status_DRIVER_RUNNING, nil).Run(func(args mock.Arguments) { killedTaskId := *(args.Get(0).(*mesos.TaskID)) - assert.Equal(*launchedTask.TaskId, killedTaskId, "expected same TaskID as during launch") + assert.Equal(*launchedTask.taskInfo.TaskId, killedTaskId, "expected same TaskID as during launch") }) killTaskCalled := mockDriver.Upon() @@ -549,12 +570,31 @@ func TestPlugin_LifeCycle(t *testing.T) { select { case <-killTaskCalled: // report back that the task is finished - testScheduler.StatusUpdate(mockDriver, newTaskStatusForTask(launchedTask, mesos.TaskState_TASK_FINISHED)) + testScheduler.StatusUpdate(mockDriver, newTaskStatusForTask(launchedTask.taskInfo, mesos.TaskState_TASK_FINISHED)) case <-time.After(5 * time.Second): t.Fatal("timed out waiting for KillTask") } + // start a pod with on a given NodeName and check that it is scheduled to the right host + pod, i = NewTestPod() + pod.Spec.NodeName = "hostname1" + offers = []*mesos.Offer{} + for j := 0; j < 3; j++ { + offer := NewTestOffer(fmt.Sprintf("offer%d_%d", i, j)) + hostname := fmt.Sprintf("hostname%d", j) + offer.Hostname = &hostname + offers = append(offers, offer) + } + + _, _, usedOffer := startPodWithOffers(pod, offers) + + assert.Equal(offers[1].Id.GetValue(), usedOffer.Id.GetValue()) + assert.Equal(pod.Spec.NodeName, *usedOffer.Hostname) + + testScheduler.OfferRescinded(mockDriver, offers[0].Id) + testScheduler.OfferRescinded(mockDriver, offers[2].Id) + // start pods: // - which are failing while binding, // - leading to reconciliation @@ -574,26 +614,30 @@ func TestPlugin_LifeCycle(t *testing.T) { } // 1. with pod deleted from the apiserver - pod, launchedTask, _ = startPod() + pod, launchedTask, _ = startTestPod() podListWatch.Delete(pod, false) // not notifying the watchers - failPodFromExecutor(launchedTask) + failPodFromExecutor(launchedTask.taskInfo) // 2. with pod still on the apiserver, not bound - pod, launchedTask, _ = startPod() - failPodFromExecutor(launchedTask) + pod, launchedTask, _ = startTestPod() + failPodFromExecutor(launchedTask.taskInfo) // 3. with pod still on the apiserver, bound i.e. host!="" - pod, launchedTask, usedOffer := startPod() - pod.Spec.NodeName = *usedOffer.Hostname + pod, launchedTask, usedOffer = startTestPod() + pod.Annotations = map[string]string{ + meta.BindingHostKey: *usedOffer.Hostname, + } podListWatch.Modify(pod, false) // not notifying the watchers - failPodFromExecutor(launchedTask) + failPodFromExecutor(launchedTask.taskInfo) // 4. with pod still on the apiserver, bound i.e. host!="", notified via ListWatch - pod, launchedTask, usedOffer = startPod() - pod.Spec.NodeName = *usedOffer.Hostname + pod, launchedTask, usedOffer = startTestPod() + pod.Annotations = map[string]string{ + meta.BindingHostKey: *usedOffer.Hostname, + } podListWatch.Modify(pod, true) // notifying the watchers time.Sleep(time.Second / 2) - failPodFromExecutor(launchedTask) + failPodFromExecutor(launchedTask.taskInfo) } func TestDeleteOne_NonexistentPod(t *testing.T) { diff --git a/contrib/mesos/pkg/scheduler/podtask/pod_task.go b/contrib/mesos/pkg/scheduler/podtask/pod_task.go index 07833c51afe..a557f9c6fe0 100644 --- a/contrib/mesos/pkg/scheduler/podtask/pod_task.go +++ b/contrib/mesos/pkg/scheduler/podtask/pod_task.go @@ -213,6 +213,11 @@ func (t *T) AcceptOffer(offer *mesos.Offer) bool { return false } + // if the user has specified a target host, make sure this offer is for that host + if t.Pod.Spec.NodeName != "" && offer.GetHostname() != t.Pod.Spec.NodeName { + return false + } + // check ports if _, err := t.mapper.Generate(t, offer); err != nil { log.V(3).Info(err)