diff --git a/contrib/mesos/pkg/executor/executor.go b/contrib/mesos/pkg/executor/executor.go index 6a024fddb93..62c2bba44bd 100644 --- a/contrib/mesos/pkg/executor/executor.go +++ b/contrib/mesos/pkg/executor/executor.go @@ -19,6 +19,7 @@ package executor import ( "encoding/json" "fmt" + "net" "strings" "sync" "sync/atomic" @@ -42,7 +43,7 @@ import ( ) const ( - containerPollTime = 300 * time.Millisecond + containerPollTime = 1 * time.Second launchGracePeriod = 5 * time.Minute ) @@ -97,10 +98,15 @@ type suicideWatcher interface { type podStatusFunc func() (*api.PodStatus, error) +// KubeletInterface consists of the kubelet.Kubelet API's that we actually use +type KubeletInterface interface { + GetHostIP() (net.IP, error) +} + // KubernetesExecutor is an mesos executor that runs pods // in a minion machine. type KubernetesExecutor struct { - kl *kubelet.Kubelet // the kubelet instance. + kl KubeletInterface // the kubelet instance. updateChan chan<- interface{} // to send pod config updates to the kubelet state stateType tasks map[string]*kuberTask @@ -118,11 +124,11 @@ type KubernetesExecutor struct { kubeletFinished <-chan struct{} // signals that kubelet Run() died initialRegistration sync.Once exitFunc func(int) - podStatusFunc func(*kubelet.Kubelet, *api.Pod) (*api.PodStatus, error) + podStatusFunc func(KubeletInterface, *api.Pod) (*api.PodStatus, error) } type Config struct { - Kubelet *kubelet.Kubelet + Kubelet KubeletInterface Updates chan<- interface{} // to send pod config updates to the kubelet SourceName string APIClient *client.Client @@ -132,7 +138,7 @@ type Config struct { SuicideTimeout time.Duration KubeletFinished <-chan struct{} // signals that kubelet Run() died ExitFunc func(int) - PodStatusFunc func(*kubelet.Kubelet, *api.Pod) (*api.PodStatus, error) + PodStatusFunc func(KubeletInterface, *api.Pod) (*api.PodStatus, error) } func (k *KubernetesExecutor) isConnected() bool { @@ -490,7 +496,18 @@ func (k *KubernetesExecutor) launchTask(driver bindings.ExecutorDriver, taskId s // Delay reporting 'task running' until container is up. psf := podStatusFunc(func() (*api.PodStatus, error) { - return k.podStatusFunc(k.kl, pod) + status, err := k.podStatusFunc(k.kl, pod) + if err != nil { + return nil, err + } + status.Phase = kubelet.GetPhase(&pod.Spec, status.ContainerStatuses) + hostIP, err := k.kl.GetHostIP() + if err != nil { + log.Errorf("Cannot get host IP: %v", err) + } else { + status.HostIP = hostIP.String() + } + return status, nil }) go k._launchTask(driver, taskId, podFullName, psf) diff --git a/contrib/mesos/pkg/executor/executor_test.go b/contrib/mesos/pkg/executor/executor_test.go index 0954454422d..b94deaee182 100644 --- a/contrib/mesos/pkg/executor/executor_test.go +++ b/contrib/mesos/pkg/executor/executor_test.go @@ -18,6 +18,7 @@ package executor import ( "fmt" + "net" "net/http" "net/http/httptest" "reflect" @@ -285,6 +286,15 @@ func TestExecutorReregister(t *testing.T) { mockDriver.AssertExpectations(t) } +type fakeKubelet struct { + *kubelet.Kubelet + hostIP net.IP +} + +func (kl *fakeKubelet) GetHostIP() (net.IP, error) { + return kl.hostIP, nil +} + // TestExecutorLaunchAndKillTask ensures that the executor is able to launch // and kill tasks while properly bookkeping its tasks. func TestExecutorLaunchAndKillTask(t *testing.T) { @@ -304,8 +314,11 @@ func TestExecutorLaunchAndKillTask(t *testing.T) { Host: testApiServer.server.URL, Version: testapi.Version(), }), - Kubelet: &kubelet.Kubelet{}, - PodStatusFunc: func(kl *kubelet.Kubelet, pod *api.Pod) (*api.PodStatus, error) { + Kubelet: &fakeKubelet{ + Kubelet: &kubelet.Kubelet{}, + hostIP: net.IPv4(127, 0, 0, 1), + }, + PodStatusFunc: func(kl KubeletInterface, pod *api.Pod) (*api.PodStatus, error) { return &api.PodStatus{ ContainerStatuses: []api.ContainerStatus{ { @@ -489,6 +502,7 @@ func NewTestPod(i int) *api.Pod { Spec: api.PodSpec{ Containers: []api.Container{ { + Name: "foo", Ports: []api.ContainerPort{ { ContainerPort: 8000 + i, diff --git a/contrib/mesos/pkg/executor/service/service.go b/contrib/mesos/pkg/executor/service/service.go index be3335c0dc9..dc8ba9beabe 100644 --- a/contrib/mesos/pkg/executor/service/service.go +++ b/contrib/mesos/pkg/executor/service/service.go @@ -376,8 +376,8 @@ func (ks *KubeletExecutorServer) createAndInitKubelet( } }, ExitFunc: os.Exit, - PodStatusFunc: func(kl *kubelet.Kubelet, pod *api.Pod) (*api.PodStatus, error) { - return kl.GetRuntime().GetPodStatus(pod) + PodStatusFunc: func(_ executor.KubeletInterface, pod *api.Pod) (*api.PodStatus, error) { + return klet.GetRuntime().GetPodStatus(pod) }, })