From efdd72602795e31e5940aae537e74503c9d93516 Mon Sep 17 00:00:00 2001 From: "Dr. Stefan Schimanski" Date: Fri, 21 Aug 2015 16:50:54 +0200 Subject: [PATCH] Un-knot static pods creation in executor --- contrib/mesos/pkg/executor/executor.go | 1 - contrib/mesos/pkg/executor/executor_test.go | 16 +++++----- contrib/mesos/pkg/executor/service/service.go | 29 ++++++++++++------- 3 files changed, 26 insertions(+), 20 deletions(-) diff --git a/contrib/mesos/pkg/executor/executor.go b/contrib/mesos/pkg/executor/executor.go index 28defa1f32b..d44b9b2248c 100644 --- a/contrib/mesos/pkg/executor/executor.go +++ b/contrib/mesos/pkg/executor/executor.go @@ -39,7 +39,6 @@ import ( "k8s.io/kubernetes/pkg/client/cache" client "k8s.io/kubernetes/pkg/client/unversioned" "k8s.io/kubernetes/pkg/controller/framework" - "k8s.io/kubernetes/pkg/kubelet" "k8s.io/kubernetes/pkg/kubelet/container" "k8s.io/kubernetes/pkg/kubelet/dockertools" kubetypes "k8s.io/kubernetes/pkg/kubelet/types" diff --git a/contrib/mesos/pkg/executor/executor_test.go b/contrib/mesos/pkg/executor/executor_test.go index 0c8a8653ac5..d2c908c5620 100644 --- a/contrib/mesos/pkg/executor/executor_test.go +++ b/contrib/mesos/pkg/executor/executor_test.go @@ -144,7 +144,7 @@ func TestExecutorLaunchAndKillTask(t *testing.T) { Host: testApiServer.server.URL, Version: testapi.Default.Version(), }), - PodStatusFunc: func(kl KubeletInterface, pod *api.Pod) (*api.PodStatus, error) { + PodStatusFunc: func(pod *api.Pod) (*api.PodStatus, error) { return &api.PodStatus{ ContainerStatuses: []api.ContainerStatus{ { @@ -300,7 +300,6 @@ func TestExecutorStaticPods(t *testing.T) { defer os.RemoveAll(staticPodsConfigPath) mockDriver := &MockExecutorDriver{} - updates := make(chan interface{}, 1024) config := Config{ Docker: dockertools.ConnectToDockerOrDie("fake://"), Updates: make(chan interface{}, 1), // allow kube-executor source to proceed past init @@ -308,7 +307,7 @@ func TestExecutorStaticPods(t *testing.T) { Host: testApiServer.server.URL, Version: testapi.Default.Version(), }), - PodStatusFunc: func(kl KubeletInterface, pod *api.Pod) (*api.PodStatus, error) { + PodStatusFunc: func(pod *api.Pod) (*api.PodStatus, error) { return &api.PodStatus{ ContainerStatuses: []api.ContainerStatus{ { @@ -325,10 +324,11 @@ func TestExecutorStaticPods(t *testing.T) { PodLW: &NewMockPodsListWatch(api.PodList{}).ListWatch, } executor := New(config) + + // register static pod source hostname := "h1" - go executor.InitializeStaticPodsSource(func() { - kconfig.NewSourceFile(staticPodsConfigPath, hostname, 1*time.Second, updates) - }) + fileSourceUpdates := make(chan interface{}, 1024) + kconfig.NewSourceFile(staticPodsConfigPath, hostname, 1*time.Second, fileSourceUpdates) // create ExecutorInfo with static pod zip in data field executorInfo := mesosutil.NewExecutorInfo( @@ -350,7 +350,7 @@ func TestExecutorStaticPods(t *testing.T) { select { case <-timeout: t.Fatalf("Executor should send pod updates for %v pods, only saw %v", expectedStaticPodsNum, len(seenPods)) - case update, ok := <-updates: + case update, ok := <-fileSourceUpdates: if !ok { return } @@ -389,7 +389,7 @@ func TestExecutorFrameworkMessage(t *testing.T) { Host: testApiServer.server.URL, Version: testapi.Default.Version(), }), - PodStatusFunc: func(kl KubeletInterface, pod *api.Pod) (*api.PodStatus, error) { + PodStatusFunc: func(pod *api.Pod) (*api.PodStatus, error) { return &api.PodStatus{ ContainerStatuses: []api.ContainerStatus{ { diff --git a/contrib/mesos/pkg/executor/service/service.go b/contrib/mesos/pkg/executor/service/service.go index 97cc60052c0..40989f8c43e 100644 --- a/contrib/mesos/pkg/executor/service/service.go +++ b/contrib/mesos/pkg/executor/service/service.go @@ -363,10 +363,16 @@ func (ks *KubeletExecutorServer) createAndInitKubelet( return nil, nil, err } + // create static pods directory + staticPodsConfigPath := filepath.Join(kc.RootDirectory, "static-pods") + err = os.Mkdir(staticPodsConfigPath, 0755) + if err != nil { + return nil, nil, err + } + //TODO(jdef) either configure Watch here with something useful, or else // get rid of it from executor.Config kubeletFinished := make(chan struct{}) - staticPodsConfigPath := filepath.Join(kc.RootDirectory, "static-pods") exec := executor.New(executor.Config{ Updates: updates, SourceName: MESOS_CFG_SOURCE, @@ -395,13 +401,6 @@ func (ks *KubeletExecutorServer) createAndInitKubelet( PodLW: cache.NewListWatchFromClient(kc.KubeClient, "pods", api.NamespaceAll, fields.OneTermEqualSelector(client.PodHost, kc.NodeName)), }) - go exec.InitializeStaticPodsSource(func() { - // Create file source only when we are called back. Otherwise, it is never marked unseen. - fileSourceUpdates := pc.Channel(kubetypes.FileSource) - - kconfig.NewSourceFile(staticPodsConfigPath, kc.Hostname, kc.FileCheckFrequency, fileSourceUpdates) - }) - k := &kubeletExecutor{ Kubelet: klet, address: ks.Address, @@ -423,12 +422,20 @@ func (ks *KubeletExecutorServer) createAndInitKubelet( k.driver = driver } - log.V(2).Infof("Initialize executor driver...") - k.BirthCry() + k.StartGarbageCollection() + + log.V(2).Infof("Initialize executor driver...") exec.Init(k.driver) - k.StartGarbageCollection() + <- exec.InitialRegComplete() + + // from here the executor is registered with the Mesos master + + // create static-pods directory file source + log.V(2).Infof("initializing static pods source factory, configured at path %q", staticPodsConfigPath) + fileSourceUpdates := pc.Channel(kubetypes.FileSource) + kconfig.NewSourceFile(staticPodsConfigPath, kc.Hostname, kc.FileCheckFrequency, fileSourceUpdates) return k, pc, nil }