Un-knot static pods creation in executor

This commit is contained in:
Dr. Stefan Schimanski 2015-08-21 16:50:54 +02:00
parent 686b767f28
commit efdd726027
3 changed files with 26 additions and 20 deletions

View File

@ -39,7 +39,6 @@ import (
"k8s.io/kubernetes/pkg/client/cache" "k8s.io/kubernetes/pkg/client/cache"
client "k8s.io/kubernetes/pkg/client/unversioned" client "k8s.io/kubernetes/pkg/client/unversioned"
"k8s.io/kubernetes/pkg/controller/framework" "k8s.io/kubernetes/pkg/controller/framework"
"k8s.io/kubernetes/pkg/kubelet"
"k8s.io/kubernetes/pkg/kubelet/container" "k8s.io/kubernetes/pkg/kubelet/container"
"k8s.io/kubernetes/pkg/kubelet/dockertools" "k8s.io/kubernetes/pkg/kubelet/dockertools"
kubetypes "k8s.io/kubernetes/pkg/kubelet/types" kubetypes "k8s.io/kubernetes/pkg/kubelet/types"

View File

@ -144,7 +144,7 @@ func TestExecutorLaunchAndKillTask(t *testing.T) {
Host: testApiServer.server.URL, Host: testApiServer.server.URL,
Version: testapi.Default.Version(), Version: testapi.Default.Version(),
}), }),
PodStatusFunc: func(kl KubeletInterface, pod *api.Pod) (*api.PodStatus, error) { PodStatusFunc: func(pod *api.Pod) (*api.PodStatus, error) {
return &api.PodStatus{ return &api.PodStatus{
ContainerStatuses: []api.ContainerStatus{ ContainerStatuses: []api.ContainerStatus{
{ {
@ -300,7 +300,6 @@ func TestExecutorStaticPods(t *testing.T) {
defer os.RemoveAll(staticPodsConfigPath) defer os.RemoveAll(staticPodsConfigPath)
mockDriver := &MockExecutorDriver{} mockDriver := &MockExecutorDriver{}
updates := make(chan interface{}, 1024)
config := Config{ config := Config{
Docker: dockertools.ConnectToDockerOrDie("fake://"), Docker: dockertools.ConnectToDockerOrDie("fake://"),
Updates: make(chan interface{}, 1), // allow kube-executor source to proceed past init Updates: make(chan interface{}, 1), // allow kube-executor source to proceed past init
@ -308,7 +307,7 @@ func TestExecutorStaticPods(t *testing.T) {
Host: testApiServer.server.URL, Host: testApiServer.server.URL,
Version: testapi.Default.Version(), Version: testapi.Default.Version(),
}), }),
PodStatusFunc: func(kl KubeletInterface, pod *api.Pod) (*api.PodStatus, error) { PodStatusFunc: func(pod *api.Pod) (*api.PodStatus, error) {
return &api.PodStatus{ return &api.PodStatus{
ContainerStatuses: []api.ContainerStatus{ ContainerStatuses: []api.ContainerStatus{
{ {
@ -325,10 +324,11 @@ func TestExecutorStaticPods(t *testing.T) {
PodLW: &NewMockPodsListWatch(api.PodList{}).ListWatch, PodLW: &NewMockPodsListWatch(api.PodList{}).ListWatch,
} }
executor := New(config) executor := New(config)
// register static pod source
hostname := "h1" hostname := "h1"
go executor.InitializeStaticPodsSource(func() { fileSourceUpdates := make(chan interface{}, 1024)
kconfig.NewSourceFile(staticPodsConfigPath, hostname, 1*time.Second, updates) kconfig.NewSourceFile(staticPodsConfigPath, hostname, 1*time.Second, fileSourceUpdates)
})
// create ExecutorInfo with static pod zip in data field // create ExecutorInfo with static pod zip in data field
executorInfo := mesosutil.NewExecutorInfo( executorInfo := mesosutil.NewExecutorInfo(
@ -350,7 +350,7 @@ func TestExecutorStaticPods(t *testing.T) {
select { select {
case <-timeout: case <-timeout:
t.Fatalf("Executor should send pod updates for %v pods, only saw %v", expectedStaticPodsNum, len(seenPods)) t.Fatalf("Executor should send pod updates for %v pods, only saw %v", expectedStaticPodsNum, len(seenPods))
case update, ok := <-updates: case update, ok := <-fileSourceUpdates:
if !ok { if !ok {
return return
} }
@ -389,7 +389,7 @@ func TestExecutorFrameworkMessage(t *testing.T) {
Host: testApiServer.server.URL, Host: testApiServer.server.URL,
Version: testapi.Default.Version(), Version: testapi.Default.Version(),
}), }),
PodStatusFunc: func(kl KubeletInterface, pod *api.Pod) (*api.PodStatus, error) { PodStatusFunc: func(pod *api.Pod) (*api.PodStatus, error) {
return &api.PodStatus{ return &api.PodStatus{
ContainerStatuses: []api.ContainerStatus{ ContainerStatuses: []api.ContainerStatus{
{ {

View File

@ -363,10 +363,16 @@ func (ks *KubeletExecutorServer) createAndInitKubelet(
return nil, nil, err return nil, nil, err
} }
// create static pods directory
staticPodsConfigPath := filepath.Join(kc.RootDirectory, "static-pods")
err = os.Mkdir(staticPodsConfigPath, 0755)
if err != nil {
return nil, nil, err
}
//TODO(jdef) either configure Watch here with something useful, or else //TODO(jdef) either configure Watch here with something useful, or else
// get rid of it from executor.Config // get rid of it from executor.Config
kubeletFinished := make(chan struct{}) kubeletFinished := make(chan struct{})
staticPodsConfigPath := filepath.Join(kc.RootDirectory, "static-pods")
exec := executor.New(executor.Config{ exec := executor.New(executor.Config{
Updates: updates, Updates: updates,
SourceName: MESOS_CFG_SOURCE, SourceName: MESOS_CFG_SOURCE,
@ -395,13 +401,6 @@ func (ks *KubeletExecutorServer) createAndInitKubelet(
PodLW: cache.NewListWatchFromClient(kc.KubeClient, "pods", api.NamespaceAll, fields.OneTermEqualSelector(client.PodHost, kc.NodeName)), PodLW: cache.NewListWatchFromClient(kc.KubeClient, "pods", api.NamespaceAll, fields.OneTermEqualSelector(client.PodHost, kc.NodeName)),
}) })
go exec.InitializeStaticPodsSource(func() {
// Create file source only when we are called back. Otherwise, it is never marked unseen.
fileSourceUpdates := pc.Channel(kubetypes.FileSource)
kconfig.NewSourceFile(staticPodsConfigPath, kc.Hostname, kc.FileCheckFrequency, fileSourceUpdates)
})
k := &kubeletExecutor{ k := &kubeletExecutor{
Kubelet: klet, Kubelet: klet,
address: ks.Address, address: ks.Address,
@ -423,12 +422,20 @@ func (ks *KubeletExecutorServer) createAndInitKubelet(
k.driver = driver k.driver = driver
} }
log.V(2).Infof("Initialize executor driver...")
k.BirthCry() k.BirthCry()
k.StartGarbageCollection()
log.V(2).Infof("Initialize executor driver...")
exec.Init(k.driver) exec.Init(k.driver)
k.StartGarbageCollection() <- exec.InitialRegComplete()
// from here the executor is registered with the Mesos master
// create static-pods directory file source
log.V(2).Infof("initializing static pods source factory, configured at path %q", staticPodsConfigPath)
fileSourceUpdates := pc.Channel(kubetypes.FileSource)
kconfig.NewSourceFile(staticPodsConfigPath, kc.Hostname, kc.FileCheckFrequency, fileSourceUpdates)
return k, pc, nil return k, pc, nil
} }