diff --git a/contrib/mesos/pkg/executor/executor.go b/contrib/mesos/pkg/executor/executor.go index 02b17cddee3..28defa1f32b 100644 --- a/contrib/mesos/pkg/executor/executor.go +++ b/contrib/mesos/pkg/executor/executor.go @@ -19,7 +19,6 @@ package executor import ( "encoding/json" "fmt" - "net" "strings" "sync" "sync/atomic" @@ -94,15 +93,9 @@ type kuberTask struct { type podStatusFunc func() (*api.PodStatus, error) -// KubeletInterface consists of the kubelet.Kubelet API's that we actually use -type KubeletInterface interface { - GetHostIP() (net.IP, error) -} - // KubernetesExecutor is an mesos executor that runs pods // in a minion machine. type KubernetesExecutor struct { - kl KubeletInterface // the kubelet instance. updateChan chan<- interface{} // to send pod config updates to the kubelet state stateType tasks map[string]*kuberTask @@ -119,8 +112,7 @@ type KubernetesExecutor struct { kubeletFinished <-chan struct{} // signals that kubelet Run() died initialRegistration sync.Once exitFunc func(int) - podStatusFunc func(KubeletInterface, *api.Pod) (*api.PodStatus, error) - staticPodsConfig []byte + podStatusFunc func(*api.Pod) (*api.PodStatus, error) staticPodsConfigPath string initialRegComplete chan struct{} podController *framework.Controller @@ -128,7 +120,6 @@ type KubernetesExecutor struct { } type Config struct { - Kubelet KubeletInterface Updates chan<- interface{} // to send pod config updates to the kubelet SourceName string APIClient *client.Client @@ -137,7 +128,7 @@ type Config struct { SuicideTimeout time.Duration KubeletFinished <-chan struct{} // signals that kubelet Run() died ExitFunc func(int) - PodStatusFunc func(KubeletInterface, *api.Pod) (*api.PodStatus, error) + PodStatusFunc func(*api.Pod) (*api.PodStatus, error) StaticPodsConfigPath string PodLW cache.ListerWatcher // mandatory, otherwise initialiation will panic LaunchGracePeriod time.Duration @@ -150,7 +141,6 @@ func (k *KubernetesExecutor) isConnected() bool { // New creates a new kubernetes executor. func New(config Config) *KubernetesExecutor { k := &KubernetesExecutor{ - kl: config.Kubelet, updateChan: config.Updates, state: disconnectedState, tasks: make(map[string]*kuberTask), @@ -196,6 +186,10 @@ func New(config Config) *KubernetesExecutor { return k } +func (k *KubernetesExecutor) InitialRegComplete() <-chan struct{} { + return k.initialRegComplete +} + func (k *KubernetesExecutor) Init(driver bindings.ExecutorDriver) { k.killKubeletContainers() k.resetSuicideWatch(driver) @@ -231,7 +225,7 @@ func (k *KubernetesExecutor) Registered(driver bindings.ExecutorDriver, } if executorInfo != nil && executorInfo.Data != nil { - k.staticPodsConfig = executorInfo.Data + k.initializeStaticPodsSource(executorInfo.Data) } if slaveInfo != nil { @@ -281,24 +275,14 @@ func (k *KubernetesExecutor) onInitialRegistration() { } } -// InitializeStaticPodsSource blocks until initial regstration is complete and -// then creates a static pod source using the given factory func. -func (k *KubernetesExecutor) InitializeStaticPodsSource(sourceFactory func()) { - <-k.initialRegComplete - - if k.staticPodsConfig == nil { - return - } - +// initializeStaticPodsSource unzips the data slice into the static-pods directory +func (k *KubernetesExecutor) initializeStaticPodsSource(data []byte) { log.V(2).Infof("extracting static pods config to %s", k.staticPodsConfigPath) - err := archive.UnzipDir(k.staticPodsConfig, k.staticPodsConfigPath) + err := archive.UnzipDir(data, k.staticPodsConfigPath) if err != nil { log.Errorf("Failed to extract static pod config: %v", err) return } - - log.V(2).Infof("initializing static pods source factory, configured at path %q", k.staticPodsConfigPath) - sourceFactory() } // Disconnected is called when the executor is disconnected from the slave. @@ -597,18 +581,7 @@ func (k *KubernetesExecutor) launchTask(driver bindings.ExecutorDriver, taskId s // Delay reporting 'task running' until container is up. psf := podStatusFunc(func() (*api.PodStatus, error) { - status, err := k.podStatusFunc(k.kl, pod) - if err != nil { - return nil, err - } - status.Phase = kubelet.GetPhase(&pod.Spec, status.ContainerStatuses) - hostIP, err := k.kl.GetHostIP() - if err != nil { - log.Errorf("Cannot get host IP: %v", err) - } else { - status.HostIP = hostIP.String() - } - return status, nil + return k.podStatusFunc(pod) }) go k._launchTask(driver, taskId, podFullName, psf) diff --git a/contrib/mesos/pkg/executor/executor_test.go b/contrib/mesos/pkg/executor/executor_test.go index eb942e5d5a9..0c8a8653ac5 100644 --- a/contrib/mesos/pkg/executor/executor_test.go +++ b/contrib/mesos/pkg/executor/executor_test.go @@ -144,10 +144,6 @@ func TestExecutorLaunchAndKillTask(t *testing.T) { Host: testApiServer.server.URL, Version: testapi.Default.Version(), }), - Kubelet: &fakeKubelet{ - Kubelet: &kubelet.Kubelet{}, - hostIP: net.IPv4(127, 0, 0, 1), - }, PodStatusFunc: func(kl KubeletInterface, pod *api.Pod) (*api.PodStatus, error) { return &api.PodStatus{ ContainerStatuses: []api.ContainerStatus{ @@ -159,6 +155,7 @@ func TestExecutorLaunchAndKillTask(t *testing.T) { }, }, Phase: api.PodRunning, + HostIP: "127.0.0.1", }, nil }, PodLW: &NewMockPodsListWatch(api.PodList{}).ListWatch, @@ -311,7 +308,6 @@ func TestExecutorStaticPods(t *testing.T) { Host: testApiServer.server.URL, Version: testapi.Default.Version(), }), - Kubelet: &kubelet.Kubelet{}, PodStatusFunc: func(kl KubeletInterface, pod *api.Pod) (*api.PodStatus, error) { return &api.PodStatus{ ContainerStatuses: []api.ContainerStatus{ @@ -393,10 +389,6 @@ func TestExecutorFrameworkMessage(t *testing.T) { Host: testApiServer.server.URL, Version: testapi.Default.Version(), }), - Kubelet: &fakeKubelet{ - Kubelet: &kubelet.Kubelet{}, - hostIP: net.IPv4(127, 0, 0, 1), - }, PodStatusFunc: func(kl KubeletInterface, pod *api.Pod) (*api.PodStatus, error) { return &api.PodStatus{ ContainerStatuses: []api.ContainerStatus{ @@ -408,6 +400,7 @@ func TestExecutorFrameworkMessage(t *testing.T) { }, }, Phase: api.PodRunning, + HostIP: "127.0.0.1", }, nil }, ShutdownAlert: func() { diff --git a/contrib/mesos/pkg/executor/service/service.go b/contrib/mesos/pkg/executor/service/service.go index 27200494e4d..97cc60052c0 100644 --- a/contrib/mesos/pkg/executor/service/service.go +++ b/contrib/mesos/pkg/executor/service/service.go @@ -368,17 +368,28 @@ func (ks *KubeletExecutorServer) createAndInitKubelet( kubeletFinished := make(chan struct{}) staticPodsConfigPath := filepath.Join(kc.RootDirectory, "static-pods") exec := executor.New(executor.Config{ - Kubelet: klet, - Updates: updates, - SourceName: MESOS_CFG_SOURCE, - APIClient: kc.KubeClient, - Docker: kc.DockerClient, - SuicideTimeout: ks.SuicideTimeout, + Updates: updates, + SourceName: MESOS_CFG_SOURCE, + APIClient: kc.KubeClient, + Docker: kc.DockerClient, + SuicideTimeout: ks.SuicideTimeout, LaunchGracePeriod: ks.LaunchGracePeriod, - KubeletFinished: kubeletFinished, - ExitFunc: os.Exit, - PodStatusFunc: func(_ executor.KubeletInterface, pod *api.Pod) (*api.PodStatus, error) { - return klet.GetRuntime().GetPodStatus(pod) + KubeletFinished: kubeletFinished, + ExitFunc: os.Exit, + PodStatusFunc: func(pod *api.Pod) (*api.PodStatus, error) { + status, err := klet.GetRuntime().GetPodStatus(pod) + if err != nil { + return nil, err + } + + status.Phase = kubelet.GetPhase(&pod.Spec, status.ContainerStatuses) + hostIP, err := klet.GetHostIP() + if err != nil { + log.Errorf("Cannot get host IP: %v", err) + } else { + status.HostIP = hostIP.String() + } + return status, nil }, StaticPodsConfigPath: staticPodsConfigPath, PodLW: cache.NewListWatchFromClient(kc.KubeClient, "pods", api.NamespaceAll, fields.OneTermEqualSelector(client.PodHost, kc.NodeName)),