From 9366ac4143bf927c7fc413c07c5a8e9ddd5509f7 Mon Sep 17 00:00:00 2001 From: "Dr. Stefan Schimanski" Date: Tue, 29 Sep 2015 13:21:59 +0200 Subject: [PATCH 01/12] Pass PodLW to executor in tests --- contrib/mesos/pkg/executor/executor.go | 6 +++++- contrib/mesos/pkg/executor/executor_test.go | 17 ++++++++--------- contrib/mesos/pkg/executor/mock_test.go | 14 +++++++++----- contrib/mesos/pkg/executor/suicide_test.go | 7 +++---- 4 files changed, 25 insertions(+), 19 deletions(-) diff --git a/contrib/mesos/pkg/executor/executor.go b/contrib/mesos/pkg/executor/executor.go index 7b6fd5bdd03..254e0b219e5 100644 --- a/contrib/mesos/pkg/executor/executor.go +++ b/contrib/mesos/pkg/executor/executor.go @@ -139,7 +139,7 @@ type Config struct { ExitFunc func(int) PodStatusFunc func(KubeletInterface, *api.Pod) (*api.PodStatus, error) StaticPodsConfigPath string - PodLW cache.ListerWatcher + PodLW cache.ListerWatcher // mandatory, otherwise initialiation will panic LaunchGracePeriod time.Duration } @@ -172,6 +172,10 @@ func New(config Config) *KubernetesExecutor { } // watch pods from the given pod ListWatch + if config.PodLW == nil { + // fail early to make debugging easier + panic("cannot create executor with nil PodLW") + } _, k.podController = framework.NewInformer(config.PodLW, &api.Pod{}, podRelistPeriod, &framework.ResourceEventHandlerFuncs{ AddFunc: func(obj interface{}) { pod := obj.(*api.Pod) diff --git a/contrib/mesos/pkg/executor/executor_test.go b/contrib/mesos/pkg/executor/executor_test.go index 1e127977bf2..eb942e5d5a9 100644 --- a/contrib/mesos/pkg/executor/executor_test.go +++ b/contrib/mesos/pkg/executor/executor_test.go @@ -57,12 +57,7 @@ import ( // after Register is called. func TestExecutorRegister(t *testing.T) { mockDriver := &MockExecutorDriver{} - updates := make(chan interface{}, 1024) - executor := New(Config{ - Docker: dockertools.ConnectToDockerOrDie("fake://"), - Updates: updates, - SourceName: "executor_test", - }) + executor, updates := NewTestKubernetesExecutor() executor.Init(mockDriver) executor.Registered(mockDriver, nil, nil, nil) @@ -95,7 +90,7 @@ func TestExecutorRegister(t *testing.T) { // connected after a call to Disconnected has occurred. func TestExecutorDisconnect(t *testing.T) { mockDriver := &MockExecutorDriver{} - executor := NewTestKubernetesExecutor() + executor, _ := NewTestKubernetesExecutor() executor.Init(mockDriver) executor.Registered(mockDriver, nil, nil, nil) @@ -110,7 +105,7 @@ func TestExecutorDisconnect(t *testing.T) { // after a connection problem happens, followed by a call to Reregistered. func TestExecutorReregister(t *testing.T) { mockDriver := &MockExecutorDriver{} - executor := NewTestKubernetesExecutor() + executor, _ := NewTestKubernetesExecutor() executor.Init(mockDriver) executor.Registered(mockDriver, nil, nil, nil) @@ -166,6 +161,7 @@ func TestExecutorLaunchAndKillTask(t *testing.T) { Phase: api.PodRunning, }, nil }, + PodLW: &NewMockPodsListWatch(api.PodList{}).ListWatch, } executor := New(config) @@ -330,6 +326,7 @@ func TestExecutorStaticPods(t *testing.T) { }, nil }, StaticPodsConfigPath: staticPodsConfigPath, + PodLW: &NewMockPodsListWatch(api.PodList{}).ListWatch, } executor := New(config) hostname := "h1" @@ -417,6 +414,7 @@ func TestExecutorFrameworkMessage(t *testing.T) { close(kubeletFinished) }, KubeletFinished: kubeletFinished, + PodLW: &NewMockPodsListWatch(api.PodList{}).ListWatch, } executor := New(config) @@ -579,6 +577,7 @@ func TestExecutorShutdown(t *testing.T) { ExitFunc: func(_ int) { atomic.AddInt32(&exitCalled, 1) }, + PodLW: &NewMockPodsListWatch(api.PodList{}).ListWatch, } executor := New(config) @@ -608,7 +607,7 @@ func TestExecutorShutdown(t *testing.T) { func TestExecutorsendFrameworkMessage(t *testing.T) { mockDriver := &MockExecutorDriver{} - executor := NewTestKubernetesExecutor() + executor, _ := NewTestKubernetesExecutor() executor.Init(mockDriver) executor.Registered(mockDriver, nil, nil, nil) diff --git a/contrib/mesos/pkg/executor/mock_test.go b/contrib/mesos/pkg/executor/mock_test.go index be4951eab58..727a66336bb 100644 --- a/contrib/mesos/pkg/executor/mock_test.go +++ b/contrib/mesos/pkg/executor/mock_test.go @@ -22,6 +22,7 @@ import ( "github.com/mesos/mesos-go/mesosproto" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/mock" + "k8s.io/kubernetes/pkg/api" "k8s.io/kubernetes/pkg/kubelet/dockertools" ) @@ -64,16 +65,19 @@ func (m *MockExecutorDriver) SendFrameworkMessage(msg string) (mesosproto.Status return args.Get(0).(mesosproto.Status), args.Error(1) } -func NewTestKubernetesExecutor() *KubernetesExecutor { +func NewTestKubernetesExecutor() (*KubernetesExecutor, chan interface{}) { + updates := make(chan interface{}, 1024) return New(Config{ - Docker: dockertools.ConnectToDockerOrDie("fake://"), - Updates: make(chan interface{}, 1024), - }) + Docker: dockertools.ConnectToDockerOrDie("fake://"), + Updates: updates, + PodLW: &NewMockPodsListWatch(api.PodList{}).ListWatch, + SourceName: "executor_test", + }), updates } func TestExecutorNew(t *testing.T) { mockDriver := &MockExecutorDriver{} - executor := NewTestKubernetesExecutor() + executor, _ := NewTestKubernetesExecutor() executor.Init(mockDriver) assert.Equal(t, executor.isDone(), false, "executor should not be in Done state on initialization") diff --git a/contrib/mesos/pkg/executor/suicide_test.go b/contrib/mesos/pkg/executor/suicide_test.go index 706ad2876e2..5426433f671 100644 --- a/contrib/mesos/pkg/executor/suicide_test.go +++ b/contrib/mesos/pkg/executor/suicide_test.go @@ -67,7 +67,7 @@ func (t *suicideTracker) makeJumper(_ jumper) jumper { func TestSuicide_zeroTimeout(t *testing.T) { defer glog.Flush() - k := New(Config{}) + k, _ := NewTestKubernetesExecutor() tracker := &suicideTracker{suicideWatcher: k.suicideWatch} k.suicideWatch = tracker @@ -92,9 +92,8 @@ func TestSuicide_zeroTimeout(t *testing.T) { func TestSuicide_WithTasks(t *testing.T) { defer glog.Flush() - k := New(Config{ - SuicideTimeout: 50 * time.Millisecond, - }) + k, _ := NewTestKubernetesExecutor() + k.suicideTimeout = 50 * time.Millisecond jumps := uint32(0) tracker := &suicideTracker{suicideWatcher: k.suicideWatch, jumps: &jumps} From 6af86cbaadfb6c1c9cd3c58cd44deafb8fedc28e Mon Sep 17 00:00:00 2001 From: "Dr. Stefan Schimanski" Date: Wed, 23 Sep 2015 16:37:25 +0200 Subject: [PATCH 02/12] Avoid panics during executor shutdown due to write to closed channel --- contrib/mesos/pkg/executor/executor.go | 16 +++++++++++++++- 1 file changed, 15 insertions(+), 1 deletion(-) diff --git a/contrib/mesos/pkg/executor/executor.go b/contrib/mesos/pkg/executor/executor.go index 254e0b219e5..02b17cddee3 100644 --- a/contrib/mesos/pkg/executor/executor.go +++ b/contrib/mesos/pkg/executor/executor.go @@ -107,7 +107,7 @@ type KubernetesExecutor struct { state stateType tasks map[string]*kuberTask pods map[string]*api.Pod - lock sync.RWMutex + lock sync.Mutex sourcename string client *client.Client done chan struct{} // signals shutdown @@ -269,6 +269,11 @@ func (k *KubernetesExecutor) onInitialRegistration() { defer close(k.initialRegComplete) // emit an empty update to allow the mesos "source" to be marked as seen + k.lock.Lock() + defer k.lock.Unlock() + if k.isDone() { + return + } k.updateChan <- kubetypes.PodUpdate{ Pods: []*api.Pod{}, Op: kubetypes.SET, @@ -397,6 +402,9 @@ func (k *KubernetesExecutor) handleChangedApiserverPod(pod *api.Pod) { oldPod.DeletionTimestamp = pod.DeletionTimestamp oldPod.DeletionGracePeriodSeconds = pod.DeletionGracePeriodSeconds + if k.isDone() { + return + } update := kubetypes.PodUpdate{ Op: kubetypes.UPDATE, Pods: []*api.Pod{oldPod}, @@ -570,6 +578,9 @@ func (k *KubernetesExecutor) launchTask(driver bindings.ExecutorDriver, taskId s k.pods[podFullName] = pod // send the new pod to the kubelet which will spin it up + if k.isDone() { + return + } update := kubetypes.PodUpdate{ Op: kubetypes.ADD, Pods: []*api.Pod{pod}, @@ -775,6 +786,9 @@ func (k *KubernetesExecutor) removePodTask(driver bindings.ExecutorDriver, tid, delete(k.pods, pid) // tell the kubelet to remove the pod + if k.isDone() { + return + } update := kubetypes.PodUpdate{ Op: kubetypes.REMOVE, Pods: []*api.Pod{pod}, From 686b767f289b7b832747504ff7e7a8887c384f81 Mon Sep 17 00:00:00 2001 From: "Dr. Stefan Schimanski" Date: Fri, 21 Aug 2015 15:23:44 +0200 Subject: [PATCH 03/12] Remove redundant kubelet dependency of executor --- contrib/mesos/pkg/executor/executor.go | 49 +++++-------------- contrib/mesos/pkg/executor/executor_test.go | 11 +---- contrib/mesos/pkg/executor/service/service.go | 31 ++++++++---- 3 files changed, 34 insertions(+), 57 deletions(-) diff --git a/contrib/mesos/pkg/executor/executor.go b/contrib/mesos/pkg/executor/executor.go index 02b17cddee3..28defa1f32b 100644 --- a/contrib/mesos/pkg/executor/executor.go +++ b/contrib/mesos/pkg/executor/executor.go @@ -19,7 +19,6 @@ package executor import ( "encoding/json" "fmt" - "net" "strings" "sync" "sync/atomic" @@ -94,15 +93,9 @@ type kuberTask struct { type podStatusFunc func() (*api.PodStatus, error) -// KubeletInterface consists of the kubelet.Kubelet API's that we actually use -type KubeletInterface interface { - GetHostIP() (net.IP, error) -} - // KubernetesExecutor is an mesos executor that runs pods // in a minion machine. type KubernetesExecutor struct { - kl KubeletInterface // the kubelet instance. updateChan chan<- interface{} // to send pod config updates to the kubelet state stateType tasks map[string]*kuberTask @@ -119,8 +112,7 @@ type KubernetesExecutor struct { kubeletFinished <-chan struct{} // signals that kubelet Run() died initialRegistration sync.Once exitFunc func(int) - podStatusFunc func(KubeletInterface, *api.Pod) (*api.PodStatus, error) - staticPodsConfig []byte + podStatusFunc func(*api.Pod) (*api.PodStatus, error) staticPodsConfigPath string initialRegComplete chan struct{} podController *framework.Controller @@ -128,7 +120,6 @@ type KubernetesExecutor struct { } type Config struct { - Kubelet KubeletInterface Updates chan<- interface{} // to send pod config updates to the kubelet SourceName string APIClient *client.Client @@ -137,7 +128,7 @@ type Config struct { SuicideTimeout time.Duration KubeletFinished <-chan struct{} // signals that kubelet Run() died ExitFunc func(int) - PodStatusFunc func(KubeletInterface, *api.Pod) (*api.PodStatus, error) + PodStatusFunc func(*api.Pod) (*api.PodStatus, error) StaticPodsConfigPath string PodLW cache.ListerWatcher // mandatory, otherwise initialiation will panic LaunchGracePeriod time.Duration @@ -150,7 +141,6 @@ func (k *KubernetesExecutor) isConnected() bool { // New creates a new kubernetes executor. func New(config Config) *KubernetesExecutor { k := &KubernetesExecutor{ - kl: config.Kubelet, updateChan: config.Updates, state: disconnectedState, tasks: make(map[string]*kuberTask), @@ -196,6 +186,10 @@ func New(config Config) *KubernetesExecutor { return k } +func (k *KubernetesExecutor) InitialRegComplete() <-chan struct{} { + return k.initialRegComplete +} + func (k *KubernetesExecutor) Init(driver bindings.ExecutorDriver) { k.killKubeletContainers() k.resetSuicideWatch(driver) @@ -231,7 +225,7 @@ func (k *KubernetesExecutor) Registered(driver bindings.ExecutorDriver, } if executorInfo != nil && executorInfo.Data != nil { - k.staticPodsConfig = executorInfo.Data + k.initializeStaticPodsSource(executorInfo.Data) } if slaveInfo != nil { @@ -281,24 +275,14 @@ func (k *KubernetesExecutor) onInitialRegistration() { } } -// InitializeStaticPodsSource blocks until initial regstration is complete and -// then creates a static pod source using the given factory func. -func (k *KubernetesExecutor) InitializeStaticPodsSource(sourceFactory func()) { - <-k.initialRegComplete - - if k.staticPodsConfig == nil { - return - } - +// initializeStaticPodsSource unzips the data slice into the static-pods directory +func (k *KubernetesExecutor) initializeStaticPodsSource(data []byte) { log.V(2).Infof("extracting static pods config to %s", k.staticPodsConfigPath) - err := archive.UnzipDir(k.staticPodsConfig, k.staticPodsConfigPath) + err := archive.UnzipDir(data, k.staticPodsConfigPath) if err != nil { log.Errorf("Failed to extract static pod config: %v", err) return } - - log.V(2).Infof("initializing static pods source factory, configured at path %q", k.staticPodsConfigPath) - sourceFactory() } // Disconnected is called when the executor is disconnected from the slave. @@ -597,18 +581,7 @@ func (k *KubernetesExecutor) launchTask(driver bindings.ExecutorDriver, taskId s // Delay reporting 'task running' until container is up. psf := podStatusFunc(func() (*api.PodStatus, error) { - status, err := k.podStatusFunc(k.kl, pod) - if err != nil { - return nil, err - } - status.Phase = kubelet.GetPhase(&pod.Spec, status.ContainerStatuses) - hostIP, err := k.kl.GetHostIP() - if err != nil { - log.Errorf("Cannot get host IP: %v", err) - } else { - status.HostIP = hostIP.String() - } - return status, nil + return k.podStatusFunc(pod) }) go k._launchTask(driver, taskId, podFullName, psf) diff --git a/contrib/mesos/pkg/executor/executor_test.go b/contrib/mesos/pkg/executor/executor_test.go index eb942e5d5a9..0c8a8653ac5 100644 --- a/contrib/mesos/pkg/executor/executor_test.go +++ b/contrib/mesos/pkg/executor/executor_test.go @@ -144,10 +144,6 @@ func TestExecutorLaunchAndKillTask(t *testing.T) { Host: testApiServer.server.URL, Version: testapi.Default.Version(), }), - Kubelet: &fakeKubelet{ - Kubelet: &kubelet.Kubelet{}, - hostIP: net.IPv4(127, 0, 0, 1), - }, PodStatusFunc: func(kl KubeletInterface, pod *api.Pod) (*api.PodStatus, error) { return &api.PodStatus{ ContainerStatuses: []api.ContainerStatus{ @@ -159,6 +155,7 @@ func TestExecutorLaunchAndKillTask(t *testing.T) { }, }, Phase: api.PodRunning, + HostIP: "127.0.0.1", }, nil }, PodLW: &NewMockPodsListWatch(api.PodList{}).ListWatch, @@ -311,7 +308,6 @@ func TestExecutorStaticPods(t *testing.T) { Host: testApiServer.server.URL, Version: testapi.Default.Version(), }), - Kubelet: &kubelet.Kubelet{}, PodStatusFunc: func(kl KubeletInterface, pod *api.Pod) (*api.PodStatus, error) { return &api.PodStatus{ ContainerStatuses: []api.ContainerStatus{ @@ -393,10 +389,6 @@ func TestExecutorFrameworkMessage(t *testing.T) { Host: testApiServer.server.URL, Version: testapi.Default.Version(), }), - Kubelet: &fakeKubelet{ - Kubelet: &kubelet.Kubelet{}, - hostIP: net.IPv4(127, 0, 0, 1), - }, PodStatusFunc: func(kl KubeletInterface, pod *api.Pod) (*api.PodStatus, error) { return &api.PodStatus{ ContainerStatuses: []api.ContainerStatus{ @@ -408,6 +400,7 @@ func TestExecutorFrameworkMessage(t *testing.T) { }, }, Phase: api.PodRunning, + HostIP: "127.0.0.1", }, nil }, ShutdownAlert: func() { diff --git a/contrib/mesos/pkg/executor/service/service.go b/contrib/mesos/pkg/executor/service/service.go index 27200494e4d..97cc60052c0 100644 --- a/contrib/mesos/pkg/executor/service/service.go +++ b/contrib/mesos/pkg/executor/service/service.go @@ -368,17 +368,28 @@ func (ks *KubeletExecutorServer) createAndInitKubelet( kubeletFinished := make(chan struct{}) staticPodsConfigPath := filepath.Join(kc.RootDirectory, "static-pods") exec := executor.New(executor.Config{ - Kubelet: klet, - Updates: updates, - SourceName: MESOS_CFG_SOURCE, - APIClient: kc.KubeClient, - Docker: kc.DockerClient, - SuicideTimeout: ks.SuicideTimeout, + Updates: updates, + SourceName: MESOS_CFG_SOURCE, + APIClient: kc.KubeClient, + Docker: kc.DockerClient, + SuicideTimeout: ks.SuicideTimeout, LaunchGracePeriod: ks.LaunchGracePeriod, - KubeletFinished: kubeletFinished, - ExitFunc: os.Exit, - PodStatusFunc: func(_ executor.KubeletInterface, pod *api.Pod) (*api.PodStatus, error) { - return klet.GetRuntime().GetPodStatus(pod) + KubeletFinished: kubeletFinished, + ExitFunc: os.Exit, + PodStatusFunc: func(pod *api.Pod) (*api.PodStatus, error) { + status, err := klet.GetRuntime().GetPodStatus(pod) + if err != nil { + return nil, err + } + + status.Phase = kubelet.GetPhase(&pod.Spec, status.ContainerStatuses) + hostIP, err := klet.GetHostIP() + if err != nil { + log.Errorf("Cannot get host IP: %v", err) + } else { + status.HostIP = hostIP.String() + } + return status, nil }, StaticPodsConfigPath: staticPodsConfigPath, PodLW: cache.NewListWatchFromClient(kc.KubeClient, "pods", api.NamespaceAll, fields.OneTermEqualSelector(client.PodHost, kc.NodeName)), From efdd72602795e31e5940aae537e74503c9d93516 Mon Sep 17 00:00:00 2001 From: "Dr. Stefan Schimanski" Date: Fri, 21 Aug 2015 16:50:54 +0200 Subject: [PATCH 04/12] Un-knot static pods creation in executor --- contrib/mesos/pkg/executor/executor.go | 1 - contrib/mesos/pkg/executor/executor_test.go | 16 +++++----- contrib/mesos/pkg/executor/service/service.go | 29 ++++++++++++------- 3 files changed, 26 insertions(+), 20 deletions(-) diff --git a/contrib/mesos/pkg/executor/executor.go b/contrib/mesos/pkg/executor/executor.go index 28defa1f32b..d44b9b2248c 100644 --- a/contrib/mesos/pkg/executor/executor.go +++ b/contrib/mesos/pkg/executor/executor.go @@ -39,7 +39,6 @@ import ( "k8s.io/kubernetes/pkg/client/cache" client "k8s.io/kubernetes/pkg/client/unversioned" "k8s.io/kubernetes/pkg/controller/framework" - "k8s.io/kubernetes/pkg/kubelet" "k8s.io/kubernetes/pkg/kubelet/container" "k8s.io/kubernetes/pkg/kubelet/dockertools" kubetypes "k8s.io/kubernetes/pkg/kubelet/types" diff --git a/contrib/mesos/pkg/executor/executor_test.go b/contrib/mesos/pkg/executor/executor_test.go index 0c8a8653ac5..d2c908c5620 100644 --- a/contrib/mesos/pkg/executor/executor_test.go +++ b/contrib/mesos/pkg/executor/executor_test.go @@ -144,7 +144,7 @@ func TestExecutorLaunchAndKillTask(t *testing.T) { Host: testApiServer.server.URL, Version: testapi.Default.Version(), }), - PodStatusFunc: func(kl KubeletInterface, pod *api.Pod) (*api.PodStatus, error) { + PodStatusFunc: func(pod *api.Pod) (*api.PodStatus, error) { return &api.PodStatus{ ContainerStatuses: []api.ContainerStatus{ { @@ -300,7 +300,6 @@ func TestExecutorStaticPods(t *testing.T) { defer os.RemoveAll(staticPodsConfigPath) mockDriver := &MockExecutorDriver{} - updates := make(chan interface{}, 1024) config := Config{ Docker: dockertools.ConnectToDockerOrDie("fake://"), Updates: make(chan interface{}, 1), // allow kube-executor source to proceed past init @@ -308,7 +307,7 @@ func TestExecutorStaticPods(t *testing.T) { Host: testApiServer.server.URL, Version: testapi.Default.Version(), }), - PodStatusFunc: func(kl KubeletInterface, pod *api.Pod) (*api.PodStatus, error) { + PodStatusFunc: func(pod *api.Pod) (*api.PodStatus, error) { return &api.PodStatus{ ContainerStatuses: []api.ContainerStatus{ { @@ -325,10 +324,11 @@ func TestExecutorStaticPods(t *testing.T) { PodLW: &NewMockPodsListWatch(api.PodList{}).ListWatch, } executor := New(config) + + // register static pod source hostname := "h1" - go executor.InitializeStaticPodsSource(func() { - kconfig.NewSourceFile(staticPodsConfigPath, hostname, 1*time.Second, updates) - }) + fileSourceUpdates := make(chan interface{}, 1024) + kconfig.NewSourceFile(staticPodsConfigPath, hostname, 1*time.Second, fileSourceUpdates) // create ExecutorInfo with static pod zip in data field executorInfo := mesosutil.NewExecutorInfo( @@ -350,7 +350,7 @@ func TestExecutorStaticPods(t *testing.T) { select { case <-timeout: t.Fatalf("Executor should send pod updates for %v pods, only saw %v", expectedStaticPodsNum, len(seenPods)) - case update, ok := <-updates: + case update, ok := <-fileSourceUpdates: if !ok { return } @@ -389,7 +389,7 @@ func TestExecutorFrameworkMessage(t *testing.T) { Host: testApiServer.server.URL, Version: testapi.Default.Version(), }), - PodStatusFunc: func(kl KubeletInterface, pod *api.Pod) (*api.PodStatus, error) { + PodStatusFunc: func(pod *api.Pod) (*api.PodStatus, error) { return &api.PodStatus{ ContainerStatuses: []api.ContainerStatus{ { diff --git a/contrib/mesos/pkg/executor/service/service.go b/contrib/mesos/pkg/executor/service/service.go index 97cc60052c0..40989f8c43e 100644 --- a/contrib/mesos/pkg/executor/service/service.go +++ b/contrib/mesos/pkg/executor/service/service.go @@ -363,10 +363,16 @@ func (ks *KubeletExecutorServer) createAndInitKubelet( return nil, nil, err } + // create static pods directory + staticPodsConfigPath := filepath.Join(kc.RootDirectory, "static-pods") + err = os.Mkdir(staticPodsConfigPath, 0755) + if err != nil { + return nil, nil, err + } + //TODO(jdef) either configure Watch here with something useful, or else // get rid of it from executor.Config kubeletFinished := make(chan struct{}) - staticPodsConfigPath := filepath.Join(kc.RootDirectory, "static-pods") exec := executor.New(executor.Config{ Updates: updates, SourceName: MESOS_CFG_SOURCE, @@ -395,13 +401,6 @@ func (ks *KubeletExecutorServer) createAndInitKubelet( PodLW: cache.NewListWatchFromClient(kc.KubeClient, "pods", api.NamespaceAll, fields.OneTermEqualSelector(client.PodHost, kc.NodeName)), }) - go exec.InitializeStaticPodsSource(func() { - // Create file source only when we are called back. Otherwise, it is never marked unseen. - fileSourceUpdates := pc.Channel(kubetypes.FileSource) - - kconfig.NewSourceFile(staticPodsConfigPath, kc.Hostname, kc.FileCheckFrequency, fileSourceUpdates) - }) - k := &kubeletExecutor{ Kubelet: klet, address: ks.Address, @@ -423,12 +422,20 @@ func (ks *KubeletExecutorServer) createAndInitKubelet( k.driver = driver } - log.V(2).Infof("Initialize executor driver...") - k.BirthCry() + k.StartGarbageCollection() + + log.V(2).Infof("Initialize executor driver...") exec.Init(k.driver) - k.StartGarbageCollection() + <- exec.InitialRegComplete() + + // from here the executor is registered with the Mesos master + + // create static-pods directory file source + log.V(2).Infof("initializing static pods source factory, configured at path %q", staticPodsConfigPath) + fileSourceUpdates := pc.Channel(kubetypes.FileSource) + kconfig.NewSourceFile(staticPodsConfigPath, kc.Hostname, kc.FileCheckFrequency, fileSourceUpdates) return k, pc, nil } From 93ae257af4e9a94b0fd3598f68a55b24a741d3ec Mon Sep 17 00:00:00 2001 From: "Dr. Stefan Schimanski" Date: Fri, 21 Aug 2015 17:23:23 +0200 Subject: [PATCH 05/12] Consolidate executor driver initialization code --- contrib/mesos/pkg/executor/service/service.go | 55 ++++++++----------- 1 file changed, 24 insertions(+), 31 deletions(-) diff --git a/contrib/mesos/pkg/executor/service/service.go b/contrib/mesos/pkg/executor/service/service.go index 40989f8c43e..271e765aaee 100644 --- a/contrib/mesos/pkg/executor/service/service.go +++ b/contrib/mesos/pkg/executor/service/service.go @@ -25,7 +25,6 @@ import ( "path/filepath" "strconv" "strings" - "sync" "time" log "github.com/golang/glog" @@ -257,6 +256,7 @@ func (s *KubeletExecutorServer) Run(hks hyperkube.Interface, _ []string) error { return err } + // start health check server if s.HealthzPort > 0 { healthz.DefaultHealthz() go util.Until(func() { @@ -401,6 +401,29 @@ func (ks *KubeletExecutorServer) createAndInitKubelet( PodLW: cache.NewListWatchFromClient(kc.KubeClient, "pods", api.NamespaceAll, fields.OneTermEqualSelector(client.PodHost, kc.NodeName)), }) + // initialize driver and initialize the executor with it + dconfig := bindings.DriverConfig{ + Executor: exec, + HostnameOverride: ks.HostnameOverride, + BindingAddress: ks.Address, + } + driver, err := bindings.NewMesosExecutorDriver(dconfig) + if err != nil { + log.Fatalf("failed to create executor driver: %v", err) + } + log.V(2).Infof("Initialize executor driver...") + exec.Init(driver) + + // start the driver + go func() { + if _, err := driver.Run(); err != nil { + log.Fatalf("executor driver failed: %v", err) + } + log.Info("executor Run completed") + }() + + <- exec.InitialRegComplete() + k := &kubeletExecutor{ Kubelet: klet, address: ks.Address, @@ -411,27 +434,9 @@ func (ks *KubeletExecutorServer) createAndInitKubelet( clientConfig: clientConfig, } - dconfig := bindings.DriverConfig{ - Executor: exec, - HostnameOverride: ks.HostnameOverride, - BindingAddress: ks.Address, - } - if driver, err := bindings.NewMesosExecutorDriver(dconfig); err != nil { - log.Fatalf("failed to create executor driver: %v", err) - } else { - k.driver = driver - } - k.BirthCry() k.StartGarbageCollection() - log.V(2).Infof("Initialize executor driver...") - exec.Init(k.driver) - - <- exec.InitialRegComplete() - - // from here the executor is registered with the Mesos master - // create static-pods directory file source log.V(2).Infof("initializing static pods source factory, configured at path %q", staticPodsConfigPath) fileSourceUpdates := pc.Channel(kubetypes.FileSource) @@ -443,8 +448,6 @@ func (ks *KubeletExecutorServer) createAndInitKubelet( // kubelet decorator type kubeletExecutor struct { *kubelet.Kubelet - initialize sync.Once - driver bindings.ExecutorDriver address net.IP dockerClient dockertools.DockerInterface hks hyperkube.Interface @@ -454,16 +457,6 @@ type kubeletExecutor struct { } func (kl *kubeletExecutor) ListenAndServe(address net.IP, port uint, tlsOptions *kubelet.TLSOptions, auth kubelet.AuthInterface, enableDebuggingHandlers bool) { - // this func could be called many times, depending how often the HTTP server crashes, - // so only execute certain initialization procs once - kl.initialize.Do(func() { - go func() { - if _, err := kl.driver.Run(); err != nil { - log.Fatalf("executor driver failed: %v", err) - } - log.Info("executor Run completed") - }() - }) log.Infof("Starting kubelet server...") kubelet.ListenAndServeKubeletServer(kl, address, port, tlsOptions, auth, enableDebuggingHandlers) } From a60df400fd7a577913620544dfa0a9c55c1a1039 Mon Sep 17 00:00:00 2001 From: "Dr. Stefan Schimanski" Date: Fri, 21 Aug 2015 18:30:29 +0200 Subject: [PATCH 06/12] Decouple executor initialization from kubelet This patch reduces the dependencies of the executor from the kubelet. This makes it possible launch the kubelet after the executor. This considerably reduces the complexity of the startup code. Moreover, this work is a requirement to use a standalone kubelet some day. --- contrib/mesos/pkg/executor/executor.go | 60 +++-- contrib/mesos/pkg/executor/executor_test.go | 50 ++-- contrib/mesos/pkg/executor/mock_test.go | 6 +- contrib/mesos/pkg/executor/service/service.go | 214 ++++++++++-------- 4 files changed, 176 insertions(+), 154 deletions(-) diff --git a/contrib/mesos/pkg/executor/executor.go b/contrib/mesos/pkg/executor/executor.go index d44b9b2248c..14d5558df56 100644 --- a/contrib/mesos/pkg/executor/executor.go +++ b/contrib/mesos/pkg/executor/executor.go @@ -95,14 +95,14 @@ type podStatusFunc func() (*api.PodStatus, error) // KubernetesExecutor is an mesos executor that runs pods // in a minion machine. type KubernetesExecutor struct { - updateChan chan<- interface{} // to send pod config updates to the kubelet + updateChan chan<- kubetypes.PodUpdate // sent to the kubelet, closed on shutdown state stateType tasks map[string]*kuberTask pods map[string]*api.Pod lock sync.Mutex - sourcename string client *client.Client - done chan struct{} // signals shutdown + terminate chan struct{} // signals that the executor should shutdown + registered chan struct{} // closed when registerd outgoing chan func() (mesos.Status, error) // outgoing queue to the mesos driver dockerClient dockertools.DockerInterface suicideWatch suicideWatcher @@ -113,14 +113,12 @@ type KubernetesExecutor struct { exitFunc func(int) podStatusFunc func(*api.Pod) (*api.PodStatus, error) staticPodsConfigPath string - initialRegComplete chan struct{} podController *framework.Controller launchGracePeriod time.Duration } type Config struct { - Updates chan<- interface{} // to send pod config updates to the kubelet - SourceName string + Updates chan<- kubetypes.PodUpdate // to send pod config updates to the kubelet APIClient *client.Client Docker dockertools.DockerInterface ShutdownAlert func() @@ -144,9 +142,8 @@ func New(config Config) *KubernetesExecutor { state: disconnectedState, tasks: make(map[string]*kuberTask), pods: make(map[string]*api.Pod), - sourcename: config.SourceName, client: config.APIClient, - done: make(chan struct{}), + terminate: make(chan struct{}), outgoing: make(chan func() (mesos.Status, error), 1024), dockerClient: config.Docker, suicideTimeout: config.SuicideTimeout, @@ -155,7 +152,7 @@ func New(config Config) *KubernetesExecutor { shutdownAlert: config.ShutdownAlert, exitFunc: config.ExitFunc, podStatusFunc: config.PodStatusFunc, - initialRegComplete: make(chan struct{}), + registered: make(chan struct{}), staticPodsConfigPath: config.StaticPodsConfigPath, launchGracePeriod: config.LaunchGracePeriod, } @@ -185,26 +182,24 @@ func New(config Config) *KubernetesExecutor { return k } -func (k *KubernetesExecutor) InitialRegComplete() <-chan struct{} { - return k.initialRegComplete +// InitiallyRegistered returns a channel which is closed when the executor is +// registered with the Mesos master. +func (k *KubernetesExecutor) InitiallyRegistered() <-chan struct{} { + return k.registered } func (k *KubernetesExecutor) Init(driver bindings.ExecutorDriver) { k.killKubeletContainers() k.resetSuicideWatch(driver) - go k.podController.Run(k.done) + go k.podController.Run(k.terminate) go k.sendLoop() //TODO(jdef) monitor kubeletFinished and shutdown if it happens } -func (k *KubernetesExecutor) Done() <-chan struct{} { - return k.done -} - func (k *KubernetesExecutor) isDone() bool { select { - case <-k.done: + case <-k.terminate: return true default: return false @@ -234,7 +229,12 @@ func (k *KubernetesExecutor) Registered(driver bindings.ExecutorDriver, } } - k.initialRegistration.Do(k.onInitialRegistration) + k.updateChan <- kubetypes.PodUpdate{ + Pods: []*api.Pod{}, + Op: kubetypes.SET, + } + + close(k.registered) } // Reregistered is called when the executor is successfully re-registered with the slave. @@ -255,12 +255,6 @@ func (k *KubernetesExecutor) Reregistered(driver bindings.ExecutorDriver, slaveI } } - k.initialRegistration.Do(k.onInitialRegistration) -} - -func (k *KubernetesExecutor) onInitialRegistration() { - defer close(k.initialRegComplete) - // emit an empty update to allow the mesos "source" to be marked as seen k.lock.Lock() defer k.lock.Unlock() @@ -268,9 +262,8 @@ func (k *KubernetesExecutor) onInitialRegistration() { return } k.updateChan <- kubetypes.PodUpdate{ - Pods: []*api.Pod{}, - Op: kubetypes.SET, - Source: k.sourcename, + Pods: []*api.Pod{}, + Op: kubetypes.SET, } } @@ -818,7 +811,8 @@ func (k *KubernetesExecutor) doShutdown(driver bindings.ExecutorDriver) { (&k.state).transitionTo(terminalState) // signal to all listeners that this KubeletExecutor is done! - close(k.done) + close(k.terminate) + close(k.updateChan) if k.shutdownAlert != nil { func() { @@ -892,7 +886,7 @@ func newStatus(taskId *mesos.TaskID, state mesos.TaskState, message string) *mes func (k *KubernetesExecutor) sendStatus(driver bindings.ExecutorDriver, status *mesos.TaskStatus) { select { - case <-k.done: + case <-k.terminate: default: k.outgoing <- func() (mesos.Status, error) { return driver.SendStatusUpdate(status) } } @@ -900,7 +894,7 @@ func (k *KubernetesExecutor) sendStatus(driver bindings.ExecutorDriver, status * func (k *KubernetesExecutor) sendFrameworkMessage(driver bindings.ExecutorDriver, msg string) { select { - case <-k.done: + case <-k.terminate: default: k.outgoing <- func() (mesos.Status, error) { return driver.SendFrameworkMessage(msg) } } @@ -910,12 +904,12 @@ func (k *KubernetesExecutor) sendLoop() { defer log.V(1).Info("sender loop exiting") for { select { - case <-k.done: + case <-k.terminate: return default: if !k.isConnected() { select { - case <-k.done: + case <-k.terminate: case <-time.After(1 * time.Second): } continue @@ -935,7 +929,7 @@ func (k *KubernetesExecutor) sendLoop() { } // attempt to re-queue the sender select { - case <-k.done: + case <-k.terminate: case k.outgoing <- sender: } } diff --git a/contrib/mesos/pkg/executor/executor_test.go b/contrib/mesos/pkg/executor/executor_test.go index d2c908c5620..a91ca55c304 100644 --- a/contrib/mesos/pkg/executor/executor_test.go +++ b/contrib/mesos/pkg/executor/executor_test.go @@ -63,18 +63,14 @@ func TestExecutorRegister(t *testing.T) { executor.Registered(mockDriver, nil, nil, nil) initialPodUpdate := kubetypes.PodUpdate{ - Pods: []*api.Pod{}, - Op: kubetypes.SET, - Source: executor.sourcename, + Pods: []*api.Pod{}, + Op: kubetypes.SET, } receivedInitialPodUpdate := false select { - case m := <-updates: - update, ok := m.(kubetypes.PodUpdate) - if ok { - if reflect.DeepEqual(initialPodUpdate, update) { - receivedInitialPodUpdate = true - } + case update := <-updates: + if reflect.DeepEqual(initialPodUpdate, update) { + receivedInitialPodUpdate = true } case <-time.After(time.Second): } @@ -136,7 +132,7 @@ func TestExecutorLaunchAndKillTask(t *testing.T) { defer testApiServer.server.Close() mockDriver := &MockExecutorDriver{} - updates := make(chan interface{}, 1024) + updates := make(chan kubetypes.PodUpdate, 1024) config := Config{ Docker: dockertools.ConnectToDockerOrDie("fake://"), Updates: updates, @@ -154,7 +150,7 @@ func TestExecutorLaunchAndKillTask(t *testing.T) { }, }, }, - Phase: api.PodRunning, + Phase: api.PodRunning, HostIP: "127.0.0.1", }, nil }, @@ -205,9 +201,8 @@ func TestExecutorLaunchAndKillTask(t *testing.T) { gotPodUpdate := false select { - case m := <-updates: - update, ok := m.(kubetypes.PodUpdate) - if ok && len(update.Pods) == 1 { + case update := <-updates: + if len(update.Pods) == 1 { gotPodUpdate = true } case <-time.After(time.Second): @@ -302,7 +297,7 @@ func TestExecutorStaticPods(t *testing.T) { mockDriver := &MockExecutorDriver{} config := Config{ Docker: dockertools.ConnectToDockerOrDie("fake://"), - Updates: make(chan interface{}, 1), // allow kube-executor source to proceed past init + Updates: make(chan kubetypes.PodUpdate, 1), // allow kube-executor source to proceed past init APIClient: client.NewOrDie(&client.Config{ Host: testApiServer.server.URL, Version: testapi.Default.Version(), @@ -384,7 +379,7 @@ func TestExecutorFrameworkMessage(t *testing.T) { kubeletFinished := make(chan struct{}) config := Config{ Docker: dockertools.ConnectToDockerOrDie("fake://"), - Updates: make(chan interface{}, 1024), + Updates: make(chan kubetypes.PodUpdate, 1024), APIClient: client.NewOrDie(&client.Config{ Host: testApiServer.server.URL, Version: testapi.Default.Version(), @@ -399,7 +394,7 @@ func TestExecutorFrameworkMessage(t *testing.T) { }, }, }, - Phase: api.PodRunning, + Phase: api.PodRunning, HostIP: "127.0.0.1", }, nil }, @@ -560,9 +555,10 @@ func TestExecutorShutdown(t *testing.T) { mockDriver := &MockExecutorDriver{} kubeletFinished := make(chan struct{}) var exitCalled int32 = 0 + updates := make(chan kubetypes.PodUpdate, 1024) config := Config{ Docker: dockertools.ConnectToDockerOrDie("fake://"), - Updates: make(chan interface{}, 1024), + Updates: updates, ShutdownAlert: func() { close(kubeletFinished) }, @@ -586,11 +582,21 @@ func TestExecutorShutdown(t *testing.T) { assert.Equal(t, true, executor.isDone(), "executor should be in Done state after Shutdown") - select { - case <-executor.Done(): - default: - t.Fatal("done channel should be closed after shutdown") + // channel should be closed now, only a constant number of updates left + num := len(updates) +drainLoop: + for { + select { + case _, ok := <-updates: + if !ok { + break drainLoop + } + num -= 1 + default: + t.Fatal("Updates chan should be closed after Shutdown") + } } + assert.Equal(t, num, 0, "Updates chan should get no new updates after Shutdown") assert.Equal(t, true, atomic.LoadInt32(&exitCalled) > 0, "the executor should call its ExitFunc when it is ready to close down") diff --git a/contrib/mesos/pkg/executor/mock_test.go b/contrib/mesos/pkg/executor/mock_test.go index 727a66336bb..f73039ff7dd 100644 --- a/contrib/mesos/pkg/executor/mock_test.go +++ b/contrib/mesos/pkg/executor/mock_test.go @@ -23,6 +23,7 @@ import ( "github.com/stretchr/testify/assert" "github.com/stretchr/testify/mock" "k8s.io/kubernetes/pkg/api" + kubetypes "k8s.io/kubernetes/pkg/kubelet/types" "k8s.io/kubernetes/pkg/kubelet/dockertools" ) @@ -65,13 +66,12 @@ func (m *MockExecutorDriver) SendFrameworkMessage(msg string) (mesosproto.Status return args.Get(0).(mesosproto.Status), args.Error(1) } -func NewTestKubernetesExecutor() (*KubernetesExecutor, chan interface{}) { - updates := make(chan interface{}, 1024) +func NewTestKubernetesExecutor() (*KubernetesExecutor, chan kubetypes.PodUpdate) { + updates := make(chan kubetypes.PodUpdate, 1024) return New(Config{ Docker: dockertools.ConnectToDockerOrDie("fake://"), Updates: updates, PodLW: &NewMockPodsListWatch(api.PodList{}).ListWatch, - SourceName: "executor_test", }), updates } diff --git a/contrib/mesos/pkg/executor/service/service.go b/contrib/mesos/pkg/executor/service/service.go index 271e765aaee..45488830f9f 100644 --- a/contrib/mesos/pkg/executor/service/service.go +++ b/contrib/mesos/pkg/executor/service/service.go @@ -18,13 +18,13 @@ package service import ( "fmt" - "math/rand" "net" "net/http" "os" "path/filepath" "strconv" "strings" + "sync" "time" log "github.com/golang/glog" @@ -35,10 +35,8 @@ import ( "k8s.io/kubernetes/contrib/mesos/pkg/executor/config" "k8s.io/kubernetes/contrib/mesos/pkg/hyperkube" "k8s.io/kubernetes/pkg/api" - "k8s.io/kubernetes/pkg/client/cache" client "k8s.io/kubernetes/pkg/client/unversioned" "k8s.io/kubernetes/pkg/credentialprovider" - "k8s.io/kubernetes/pkg/fields" "k8s.io/kubernetes/pkg/healthz" "k8s.io/kubernetes/pkg/kubelet" "k8s.io/kubernetes/pkg/kubelet/cadvisor" @@ -50,6 +48,7 @@ import ( utilio "k8s.io/kubernetes/pkg/util/io" "k8s.io/kubernetes/pkg/util/mount" "k8s.io/kubernetes/pkg/util/oom" + "k8s.io/kubernetes/pkg/util/rand" ) const ( @@ -62,6 +61,8 @@ type KubeletExecutorServer struct { *app.KubeletServer SuicideTimeout time.Duration LaunchGracePeriod time.Duration + kletLock sync.Mutex + klet *kubelet.Kubelet } func NewKubeletExecutorServer() *KubeletExecutorServer { @@ -100,6 +101,13 @@ func (s *KubeletExecutorServer) Run(hks hyperkube.Interface, _ []string) error { s.SystemContainer = "" s.DockerDaemonContainer = "" + // create static pods directory + staticPodsConfigPath := filepath.Join(s.RootDirectory, "static-pods") + err := os.Mkdir(staticPodsConfigPath, 0755) + if err != nil { + return err + } + // create apiserver client var apiclient *client.Client clientConfig, err := s.CreateAPIServerClientConfig() @@ -173,6 +181,43 @@ func (s *KubeletExecutorServer) Run(hks hyperkube.Interface, _ []string) error { log.Warningf("Unknown Docker exec handler %q; defaulting to native", s.DockerExecHandlerName) dockerExecHandler = &dockertools.NativeExecHandler{} } + dockerClient := dockertools.ConnectToDockerOrDie(s.DockerEndpoint) + + //TODO(jdef) either configure Watch here with something useful, or else + // get rid of it from executor.Config + kubeletFinished := make(chan struct{}) + execUpdates := make(chan kubetypes.PodUpdate, 1) + exec := executor.New(executor.Config{ + Updates: execUpdates, + APIClient: apiclient, + Docker: dockerClient, + SuicideTimeout: s.SuicideTimeout, + KubeletFinished: kubeletFinished, + ExitFunc: os.Exit, + PodStatusFunc: func(pod *api.Pod) (*api.PodStatus, error) { + s.kletLock.Lock() + defer s.kletLock.Unlock() + + if s.klet == nil { + return nil, fmt.Errorf("PodStatucFunc called before kubelet is initialized") + } + + status, err := s.klet.GetRuntime().GetPodStatus(pod) + if err != nil { + return nil, err + } + + status.Phase = kubelet.GetPhase(&pod.Spec, status.ContainerStatuses) + hostIP, err := s.klet.GetHostIP() + if err != nil { + log.Errorf("Cannot get host IP: %v", err) + } else { + status.HostIP = hostIP.String() + } + return status, nil + }, + StaticPodsConfigPath: staticPodsConfigPath, + }) manifestURLHeader := make(http.Header) if s.ManifestURLHeader != "" { @@ -183,6 +228,30 @@ func (s *KubeletExecutorServer) Run(hks hyperkube.Interface, _ []string) error { manifestURLHeader.Set(pieces[0], pieces[1]) } + // initialize driver and initialize the executor with it + dconfig := bindings.DriverConfig{ + Executor: exec, + HostnameOverride: s.HostnameOverride, + BindingAddress: s.Address, + } + driver, err := bindings.NewMesosExecutorDriver(dconfig) + if err != nil { + log.Fatalf("failed to create executor driver: %v", err) + } + log.V(2).Infof("Initialize executor driver...") + exec.Init(driver) + + // start the driver + go func() { + if _, err := driver.Run(); err != nil { + log.Fatalf("executor driver failed: %v", err) + } + log.Info("executor Run completed") + }() + + <-exec.InitiallyRegistered() + + // prepare kubelet kcfg := app.KubeletConfig{ Address: s.Address, AllowPrivileged: s.AllowPrivileged, @@ -196,7 +265,7 @@ func (s *KubeletExecutorServer) Run(hks hyperkube.Interface, _ []string) error { ContainerRuntime: s.ContainerRuntime, CPUCFSQuota: s.CPUCFSQuota, DiskSpacePolicy: diskSpacePolicy, - DockerClient: dockertools.ConnectToDockerOrDie(s.DockerEndpoint), + DockerClient: dockerClient, DockerDaemonContainer: s.DockerDaemonContainer, DockerExecHandler: dockerExecHandler, EnableDebuggingHandlers: s.EnableDebuggingHandlers, @@ -248,9 +317,8 @@ func (s *KubeletExecutorServer) Run(hks hyperkube.Interface, _ []string) error { kcfg.NodeName = kcfg.Hostname kcfg.Builder = app.KubeletBuilder(func(kc *app.KubeletConfig) (app.KubeletBootstrap, *kconfig.PodConfig, error) { - return s.createAndInitKubelet(kc, hks, clientConfig) + return s.createAndInitKubelet(kc, clientConfig, staticPodsConfigPath, execUpdates, kubeletFinished) }) - err = app.RunKubelet(&kcfg) if err != nil { return err @@ -282,8 +350,10 @@ func defaultBindingAddress() string { func (ks *KubeletExecutorServer) createAndInitKubelet( kc *app.KubeletConfig, - hks hyperkube.Interface, clientConfig *client.Config, + staticPodsConfigPath string, + execUpdates <-chan kubetypes.PodUpdate, + kubeletDone chan<- struct{}, ) (app.KubeletBootstrap, *kconfig.PodConfig, error) { // TODO(k8s): block until all sources have delivered at least one update to the channel, or break the sync loop @@ -304,8 +374,24 @@ func (ks *KubeletExecutorServer) createAndInitKubelet( MaxContainers: kc.MaxContainerCount, } + // create main pod source pc := kconfig.NewPodConfig(kconfig.PodConfigNotificationIncremental, kc.Recorder) updates := pc.Channel(MESOS_CFG_SOURCE) + executorDone := make(chan struct{}) + go func() { + // execUpdates will be closed by the executor on shutdown + defer close(executorDone) + + for u := range execUpdates { + u.Source = MESOS_CFG_SOURCE + updates <- u + } + }() + + // create static-pods directory file source + log.V(2).Infof("initializing static pods source factory, configured at path %q", staticPodsConfigPath) + fileSourceUpdates := pc.Channel(kubetypes.FileSource) + kconfig.NewSourceFile(staticPodsConfigPath, kc.Hostname, kc.FileCheckFrequency, fileSourceUpdates) klet, err := kubelet.NewMainKubelet( kc.Hostname, @@ -363,97 +449,33 @@ func (ks *KubeletExecutorServer) createAndInitKubelet( return nil, nil, err } - // create static pods directory - staticPodsConfigPath := filepath.Join(kc.RootDirectory, "static-pods") - err = os.Mkdir(staticPodsConfigPath, 0755) - if err != nil { - return nil, nil, err - } - - //TODO(jdef) either configure Watch here with something useful, or else - // get rid of it from executor.Config - kubeletFinished := make(chan struct{}) - exec := executor.New(executor.Config{ - Updates: updates, - SourceName: MESOS_CFG_SOURCE, - APIClient: kc.KubeClient, - Docker: kc.DockerClient, - SuicideTimeout: ks.SuicideTimeout, - LaunchGracePeriod: ks.LaunchGracePeriod, - KubeletFinished: kubeletFinished, - ExitFunc: os.Exit, - PodStatusFunc: func(pod *api.Pod) (*api.PodStatus, error) { - status, err := klet.GetRuntime().GetPodStatus(pod) - if err != nil { - return nil, err - } - - status.Phase = kubelet.GetPhase(&pod.Spec, status.ContainerStatuses) - hostIP, err := klet.GetHostIP() - if err != nil { - log.Errorf("Cannot get host IP: %v", err) - } else { - status.HostIP = hostIP.String() - } - return status, nil - }, - StaticPodsConfigPath: staticPodsConfigPath, - PodLW: cache.NewListWatchFromClient(kc.KubeClient, "pods", api.NamespaceAll, fields.OneTermEqualSelector(client.PodHost, kc.NodeName)), - }) - - // initialize driver and initialize the executor with it - dconfig := bindings.DriverConfig{ - Executor: exec, - HostnameOverride: ks.HostnameOverride, - BindingAddress: ks.Address, - } - driver, err := bindings.NewMesosExecutorDriver(dconfig) - if err != nil { - log.Fatalf("failed to create executor driver: %v", err) - } - log.V(2).Infof("Initialize executor driver...") - exec.Init(driver) - - // start the driver - go func() { - if _, err := driver.Run(); err != nil { - log.Fatalf("executor driver failed: %v", err) - } - log.Info("executor Run completed") - }() - - <- exec.InitialRegComplete() + ks.kletLock.Lock() + ks.klet = klet + ks.kletLock.Unlock() k := &kubeletExecutor{ - Kubelet: klet, - address: ks.Address, - dockerClient: kc.DockerClient, - hks: hks, - kubeletFinished: kubeletFinished, - executorDone: exec.Done(), - clientConfig: clientConfig, + Kubelet: ks.klet, + address: ks.Address, + dockerClient: kc.DockerClient, + kubeletDone: kubeletDone, + clientConfig: clientConfig, + executorDone: executorDone, } k.BirthCry() k.StartGarbageCollection() - // create static-pods directory file source - log.V(2).Infof("initializing static pods source factory, configured at path %q", staticPodsConfigPath) - fileSourceUpdates := pc.Channel(kubetypes.FileSource) - kconfig.NewSourceFile(staticPodsConfigPath, kc.Hostname, kc.FileCheckFrequency, fileSourceUpdates) - return k, pc, nil } // kubelet decorator type kubeletExecutor struct { *kubelet.Kubelet - address net.IP - dockerClient dockertools.DockerInterface - hks hyperkube.Interface - kubeletFinished chan struct{} // closed once kubelet.Run() returns - executorDone <-chan struct{} // from KubeletExecutor.Done() - clientConfig *client.Config + address net.IP + dockerClient dockertools.DockerInterface + kubeletDone chan<- struct{} // closed once kubelet.Run() returns + executorDone <-chan struct{} // closed when executor terminates + clientConfig *client.Config } func (kl *kubeletExecutor) ListenAndServe(address net.IP, port uint, tlsOptions *kubelet.TLSOptions, auth kubelet.AuthInterface, enableDebuggingHandlers bool) { @@ -463,21 +485,21 @@ func (kl *kubeletExecutor) ListenAndServe(address net.IP, port uint, tlsOptions // runs the main kubelet loop, closing the kubeletFinished chan when the loop exits. // never returns. -func (kl *kubeletExecutor) Run(updates <-chan kubetypes.PodUpdate) { +func (kl *kubeletExecutor) Run(mergedUpdates <-chan kubetypes.PodUpdate) { defer func() { - close(kl.kubeletFinished) + close(kl.kubeletDone) util.HandleCrash() log.Infoln("kubelet run terminated") //TODO(jdef) turn down verbosity // important: never return! this is in our contract select {} }() - // push updates through a closable pipe. when the executor indicates shutdown - // via Done() we want to stop the Kubelet from processing updates. - pipe := make(chan kubetypes.PodUpdate) + // push merged updates into another, closable update channel which is closed + // when the executor shuts down. + closableUpdates := make(chan kubetypes.PodUpdate) go func() { - // closing pipe will cause our patched kubelet's syncLoop() to exit - defer close(pipe) + // closing closableUpdates will cause our patched kubelet's syncLoop() to exit + defer close(closableUpdates) pipeLoop: for { select { @@ -485,9 +507,9 @@ func (kl *kubeletExecutor) Run(updates <-chan kubetypes.PodUpdate) { break pipeLoop default: select { - case u := <-updates: + case u := <-mergedUpdates: select { - case pipe <- u: // noop + case closableUpdates <- u: // noop case <-kl.executorDone: break pipeLoop } @@ -498,12 +520,12 @@ func (kl *kubeletExecutor) Run(updates <-chan kubetypes.PodUpdate) { } }() - // we expect that Run() will complete after the pipe is closed and the + // we expect that Run() will complete after closableUpdates is closed and the // kubelet's syncLoop() has finished processing its backlog, which hopefully // will not take very long. Peeking into the future (current k8s master) it // seems that the backlog has grown from 1 to 50 -- this may negatively impact // us going forward, time will tell. - util.Until(func() { kl.Kubelet.Run(pipe) }, 0, kl.executorDone) + util.Until(func() { kl.Kubelet.Run(closableUpdates) }, 0, kl.executorDone) //TODO(jdef) revisit this if/when executor failover lands // Force kubelet to delete all pods. From dd5bafdba59fd3d7de67d13703e1b93677b7d4ae Mon Sep 17 00:00:00 2001 From: "Dr. Stefan Schimanski" Date: Wed, 23 Sep 2015 10:16:59 +0200 Subject: [PATCH 07/12] Decouple startup of executor and kubelet --- contrib/mesos/pkg/executor/executor.go | 83 +++---- contrib/mesos/pkg/executor/mock_test.go | 8 +- contrib/mesos/pkg/executor/service/service.go | 222 ++++++++++-------- 3 files changed, 167 insertions(+), 146 deletions(-) diff --git a/contrib/mesos/pkg/executor/executor.go b/contrib/mesos/pkg/executor/executor.go index 14d5558df56..14ba8e3433e 100644 --- a/contrib/mesos/pkg/executor/executor.go +++ b/contrib/mesos/pkg/executor/executor.go @@ -87,7 +87,7 @@ func (s *stateType) transitionTo(to stateType, unless ...stateType) bool { type kuberTask struct { mesosTaskInfo *mesos.TaskInfo - podName string + podName string // empty until pod is sent to kubelet and registed in KubernetesExecutor.pods } type podStatusFunc func() (*api.PodStatus, error) @@ -102,14 +102,12 @@ type KubernetesExecutor struct { lock sync.Mutex client *client.Client terminate chan struct{} // signals that the executor should shutdown - registered chan struct{} // closed when registerd outgoing chan func() (mesos.Status, error) // outgoing queue to the mesos driver dockerClient dockertools.DockerInterface suicideWatch suicideWatcher suicideTimeout time.Duration shutdownAlert func() // invoked just prior to executor shutdown kubeletFinished <-chan struct{} // signals that kubelet Run() died - initialRegistration sync.Once exitFunc func(int) podStatusFunc func(*api.Pod) (*api.PodStatus, error) staticPodsConfigPath string @@ -152,7 +150,6 @@ func New(config Config) *KubernetesExecutor { shutdownAlert: config.ShutdownAlert, exitFunc: config.ExitFunc, podStatusFunc: config.PodStatusFunc, - registered: make(chan struct{}), staticPodsConfigPath: config.StaticPodsConfigPath, launchGracePeriod: config.LaunchGracePeriod, } @@ -182,12 +179,6 @@ func New(config Config) *KubernetesExecutor { return k } -// InitiallyRegistered returns a channel which is closed when the executor is -// registered with the Mesos master. -func (k *KubernetesExecutor) InitiallyRegistered() <-chan struct{} { - return k.registered -} - func (k *KubernetesExecutor) Init(driver bindings.ExecutorDriver) { k.killKubeletContainers() k.resetSuicideWatch(driver) @@ -206,6 +197,15 @@ func (k *KubernetesExecutor) isDone() bool { } } +// sendPodUpdate assumes that caller is holding state lock; returns true when update is sent otherwise false +func (k *KubernetesExecutor) sendPodUpdate(u *kubetypes.PodUpdate) bool { + if k.isDone() { + return false + } + k.updateChan <- *u + return true +} + // Registered is called when the executor is successfully registered with the slave. func (k *KubernetesExecutor) Registered(driver bindings.ExecutorDriver, executorInfo *mesos.ExecutorInfo, frameworkInfo *mesos.FrameworkInfo, slaveInfo *mesos.SlaveInfo) { @@ -229,12 +229,13 @@ func (k *KubernetesExecutor) Registered(driver bindings.ExecutorDriver, } } - k.updateChan <- kubetypes.PodUpdate{ + // emit an empty update to allow the mesos "source" to be marked as seen + k.lock.Lock() + defer k.lock.Unlock() + k.sendPodUpdate(&kubetypes.PodUpdate{ Pods: []*api.Pod{}, Op: kubetypes.SET, - } - - close(k.registered) + }) } // Reregistered is called when the executor is successfully re-registered with the slave. @@ -254,17 +255,6 @@ func (k *KubernetesExecutor) Reregistered(driver bindings.ExecutorDriver, slaveI log.Errorf("cannot update node labels: %v", err) } } - - // emit an empty update to allow the mesos "source" to be marked as seen - k.lock.Lock() - defer k.lock.Unlock() - if k.isDone() { - return - } - k.updateChan <- kubetypes.PodUpdate{ - Pods: []*api.Pod{}, - Op: kubetypes.SET, - } } // initializeStaticPodsSource unzips the data slice into the static-pods directory @@ -378,14 +368,10 @@ func (k *KubernetesExecutor) handleChangedApiserverPod(pod *api.Pod) { oldPod.DeletionTimestamp = pod.DeletionTimestamp oldPod.DeletionGracePeriodSeconds = pod.DeletionGracePeriodSeconds - if k.isDone() { - return - } - update := kubetypes.PodUpdate{ + k.sendPodUpdate(&kubetypes.PodUpdate{ Op: kubetypes.UPDATE, Pods: []*api.Pod{oldPod}, - } - k.updateChan <- update + }) } } } @@ -538,7 +524,7 @@ func (k *KubernetesExecutor) launchTask(driver bindings.ExecutorDriver, taskId s k.lock.Lock() defer k.lock.Unlock() - // Add the task. + // find task task, found := k.tasks[taskId] if !found { log.V(1).Infof("task %v not found, probably killed: aborting launch, reporting lost", taskId) @@ -548,21 +534,23 @@ func (k *KubernetesExecutor) launchTask(driver bindings.ExecutorDriver, taskId s //TODO(jdef) check for duplicate pod name, if found send TASK_ERROR - // from here on, we need to delete containers associated with the task - // upon it going into a terminal state + // send the new pod to the kubelet which will spin it up + ok := k.sendPodUpdate(&kubetypes.PodUpdate{ + Op: kubetypes.ADD, + Pods: []*api.Pod{pod}, + }) + if !ok { + return // executor is terminating, cancel launch + } + + // mark task as sent by setting the podName and register the sent pod task.podName = podFullName k.pods[podFullName] = pod - // send the new pod to the kubelet which will spin it up - if k.isDone() { - return - } - update := kubetypes.PodUpdate{ - Op: kubetypes.ADD, - Pods: []*api.Pod{pod}, - } - k.updateChan <- update + // From here on, we need to delete containers associated with the task upon + // it going into a terminal state. + // report task is starting to scheduler statusUpdate := &mesos.TaskStatus{ TaskId: mutil.NewTaskID(taskId), State: mesos.TaskState_TASK_STARTING.Enum(), @@ -575,7 +563,6 @@ func (k *KubernetesExecutor) launchTask(driver bindings.ExecutorDriver, taskId s psf := podStatusFunc(func() (*api.PodStatus, error) { return k.podStatusFunc(pod) }) - go k._launchTask(driver, taskId, podFullName, psf) } @@ -751,14 +738,10 @@ func (k *KubernetesExecutor) removePodTask(driver bindings.ExecutorDriver, tid, delete(k.pods, pid) // tell the kubelet to remove the pod - if k.isDone() { - return - } - update := kubetypes.PodUpdate{ + k.sendPodUpdate(&kubetypes.PodUpdate{ Op: kubetypes.REMOVE, Pods: []*api.Pod{pod}, - } - k.updateChan <- update + }) } // TODO(jdef): ensure that the update propagates, perhaps return a signal chan? k.sendStatus(driver, newStatus(mutil.NewTaskID(tid), state, reason)) diff --git a/contrib/mesos/pkg/executor/mock_test.go b/contrib/mesos/pkg/executor/mock_test.go index f73039ff7dd..1a8b35ac6a2 100644 --- a/contrib/mesos/pkg/executor/mock_test.go +++ b/contrib/mesos/pkg/executor/mock_test.go @@ -23,8 +23,8 @@ import ( "github.com/stretchr/testify/assert" "github.com/stretchr/testify/mock" "k8s.io/kubernetes/pkg/api" - kubetypes "k8s.io/kubernetes/pkg/kubelet/types" "k8s.io/kubernetes/pkg/kubelet/dockertools" + kubetypes "k8s.io/kubernetes/pkg/kubelet/types" ) type MockExecutorDriver struct { @@ -69,9 +69,9 @@ func (m *MockExecutorDriver) SendFrameworkMessage(msg string) (mesosproto.Status func NewTestKubernetesExecutor() (*KubernetesExecutor, chan kubetypes.PodUpdate) { updates := make(chan kubetypes.PodUpdate, 1024) return New(Config{ - Docker: dockertools.ConnectToDockerOrDie("fake://"), - Updates: updates, - PodLW: &NewMockPodsListWatch(api.PodList{}).ListWatch, + Docker: dockertools.ConnectToDockerOrDie("fake://"), + Updates: updates, + PodLW: &NewMockPodsListWatch(api.PodList{}).ListWatch, }), updates } diff --git a/contrib/mesos/pkg/executor/service/service.go b/contrib/mesos/pkg/executor/service/service.go index 45488830f9f..8528c187b7f 100644 --- a/contrib/mesos/pkg/executor/service/service.go +++ b/contrib/mesos/pkg/executor/service/service.go @@ -35,8 +35,10 @@ import ( "k8s.io/kubernetes/contrib/mesos/pkg/executor/config" "k8s.io/kubernetes/contrib/mesos/pkg/hyperkube" "k8s.io/kubernetes/pkg/api" + "k8s.io/kubernetes/pkg/client/cache" client "k8s.io/kubernetes/pkg/client/unversioned" "k8s.io/kubernetes/pkg/credentialprovider" + "k8s.io/kubernetes/pkg/fields" "k8s.io/kubernetes/pkg/healthz" "k8s.io/kubernetes/pkg/kubelet" "k8s.io/kubernetes/pkg/kubelet/cadvisor" @@ -61,8 +63,9 @@ type KubeletExecutorServer struct { *app.KubeletServer SuicideTimeout time.Duration LaunchGracePeriod time.Duration - kletLock sync.Mutex - klet *kubelet.Kubelet + + kletLock sync.Mutex // TODO(sttts): remove necessity to access the kubelet from the executor + klet *kubelet.Kubelet } func NewKubeletExecutorServer() *KubeletExecutorServer { @@ -87,27 +90,7 @@ func (s *KubeletExecutorServer) AddFlags(fs *pflag.FlagSet) { fs.DurationVar(&s.LaunchGracePeriod, "mesos-launch-grace-period", s.LaunchGracePeriod, "Launch grace period after which launching tasks will be cancelled. Zero disables launch cancellation.") } -// Run runs the specified KubeletExecutorServer. -func (s *KubeletExecutorServer) Run(hks hyperkube.Interface, _ []string) error { - rand.Seed(time.Now().UTC().UnixNano()) - - oomAdjuster := oom.NewOOMAdjuster() - if err := oomAdjuster.ApplyOOMScoreAdj(0, s.OOMScoreAdj); err != nil { - log.Info(err) - } - - // empty string for the docker and system containers (= cgroup paths). This - // stops the kubelet taking any control over other system processes. - s.SystemContainer = "" - s.DockerDaemonContainer = "" - - // create static pods directory - staticPodsConfigPath := filepath.Join(s.RootDirectory, "static-pods") - err := os.Mkdir(staticPodsConfigPath, 0755) - if err != nil { - return err - } - +func (s *KubeletExecutorServer) runExecutor(execUpdates chan<- kubetypes.PodUpdate, kubeletFinished <-chan struct{}, staticPodsConfigPath string) error { // create apiserver client var apiclient *client.Client clientConfig, err := s.CreateAPIServerClientConfig() @@ -119,6 +102,89 @@ func (s *KubeletExecutorServer) Run(hks hyperkube.Interface, _ []string) error { // back to the apiserver log.Fatalf("No API client: %v", err) } + exec := executor.New(executor.Config{ + Updates: execUpdates, + APIClient: apiclient, + Docker: dockertools.ConnectToDockerOrDie(s.DockerEndpoint), + SuicideTimeout: s.SuicideTimeout, + KubeletFinished: kubeletFinished, + ExitFunc: os.Exit, + PodStatusFunc: func(pod *api.Pod) (*api.PodStatus, error) { + s.kletLock.Lock() + defer s.kletLock.Unlock() + + if s.klet == nil { + return nil, fmt.Errorf("PodStatucFunc called before kubelet is initialized") + } + + status, err := s.klet.GetRuntime().GetPodStatus(pod) + if err != nil { + return nil, err + } + + status.Phase = kubelet.GetPhase(&pod.Spec, status.ContainerStatuses) + hostIP, err := s.klet.GetHostIP() + if err != nil { + log.Errorf("Cannot get host IP: %v", err) + } else { + status.HostIP = hostIP.String() + } + return status, nil + }, + StaticPodsConfigPath: staticPodsConfigPath, + PodLW: cache.NewListWatchFromClient(apiclient, "pods", api.NamespaceAll, + fields.OneTermEqualSelector(client.PodHost, s.HostnameOverride), + ), + }) + + // initialize driver and initialize the executor with it + dconfig := bindings.DriverConfig{ + Executor: exec, + HostnameOverride: s.HostnameOverride, + BindingAddress: s.Address, + } + driver, err := bindings.NewMesosExecutorDriver(dconfig) + if err != nil { + return fmt.Errorf("failed to create executor driver: %v", err) + } + log.V(2).Infof("Initialize executor driver...") + exec.Init(driver) + + // start the driver + go func() { + if _, err := driver.Run(); err != nil { + log.Fatalf("executor driver failed: %v", err) + } + log.Info("executor Run completed") + }() + + return nil +} + +// Run runs the specified KubeletExecutorServer. +func (s *KubeletExecutorServer) runKubelet(execUpdates <-chan kubetypes.PodUpdate, kubeletFinished chan<- struct{}, staticPodsConfigPath string) error { + // empty string for the docker and system containers (= cgroup paths). This + // stops the kubelet taking any control over other system processes. + s.SystemContainer = "" + s.DockerDaemonContainer = "" + + oomAdjuster := oom.NewOOMAdjuster() + if err := oomAdjuster.ApplyOOMScoreAdj(0, s.OOMScoreAdj); err != nil { + log.Info(err) + } + + dockerClient := dockertools.ConnectToDockerOrDie(s.DockerEndpoint) + + // create apiserver client + var apiclient *client.Client + clientConfig, err := s.CreateAPIServerClientConfig() + if err == nil { + apiclient, err = client.New(clientConfig) + } + if err != nil { + // required for k8sm since we need to send api.Binding information back to the apiserver + log.Fatalf("No API client: %v", err) + } log.Infof("Using root directory: %v", s.RootDirectory) credentialprovider.SetPreferredDockercfgPath(s.RootDirectory) @@ -138,6 +204,15 @@ func (s *KubeletExecutorServer) Run(hks hyperkube.Interface, _ []string) error { RootFreeDiskMB: s.LowDiskSpaceThresholdMB, } + manifestURLHeader := make(http.Header) + if s.ManifestURLHeader != "" { + pieces := strings.Split(s.ManifestURLHeader, ":") + if len(pieces) != 2 { + return fmt.Errorf("manifest-url-header must have a single ':' key-value separator, got %q", s.ManifestURLHeader) + } + manifestURLHeader.Set(pieces[0], pieces[1]) + } + //TODO(jdef) intentionally NOT initializing a cloud provider here since: //(a) the kubelet doesn't actually use it //(b) we don't need to create N-kubelet connections to zookeeper for no good reason @@ -181,75 +256,6 @@ func (s *KubeletExecutorServer) Run(hks hyperkube.Interface, _ []string) error { log.Warningf("Unknown Docker exec handler %q; defaulting to native", s.DockerExecHandlerName) dockerExecHandler = &dockertools.NativeExecHandler{} } - dockerClient := dockertools.ConnectToDockerOrDie(s.DockerEndpoint) - - //TODO(jdef) either configure Watch here with something useful, or else - // get rid of it from executor.Config - kubeletFinished := make(chan struct{}) - execUpdates := make(chan kubetypes.PodUpdate, 1) - exec := executor.New(executor.Config{ - Updates: execUpdates, - APIClient: apiclient, - Docker: dockerClient, - SuicideTimeout: s.SuicideTimeout, - KubeletFinished: kubeletFinished, - ExitFunc: os.Exit, - PodStatusFunc: func(pod *api.Pod) (*api.PodStatus, error) { - s.kletLock.Lock() - defer s.kletLock.Unlock() - - if s.klet == nil { - return nil, fmt.Errorf("PodStatucFunc called before kubelet is initialized") - } - - status, err := s.klet.GetRuntime().GetPodStatus(pod) - if err != nil { - return nil, err - } - - status.Phase = kubelet.GetPhase(&pod.Spec, status.ContainerStatuses) - hostIP, err := s.klet.GetHostIP() - if err != nil { - log.Errorf("Cannot get host IP: %v", err) - } else { - status.HostIP = hostIP.String() - } - return status, nil - }, - StaticPodsConfigPath: staticPodsConfigPath, - }) - - manifestURLHeader := make(http.Header) - if s.ManifestURLHeader != "" { - pieces := strings.Split(s.ManifestURLHeader, ":") - if len(pieces) != 2 { - return fmt.Errorf("manifest-url-header must have a single ':' key-value separator, got %q", s.ManifestURLHeader) - } - manifestURLHeader.Set(pieces[0], pieces[1]) - } - - // initialize driver and initialize the executor with it - dconfig := bindings.DriverConfig{ - Executor: exec, - HostnameOverride: s.HostnameOverride, - BindingAddress: s.Address, - } - driver, err := bindings.NewMesosExecutorDriver(dconfig) - if err != nil { - log.Fatalf("failed to create executor driver: %v", err) - } - log.V(2).Infof("Initialize executor driver...") - exec.Init(driver) - - // start the driver - go func() { - if _, err := driver.Run(); err != nil { - log.Fatalf("executor driver failed: %v", err) - } - log.Info("executor Run completed") - }() - - <-exec.InitiallyRegistered() // prepare kubelet kcfg := app.KubeletConfig{ @@ -335,6 +341,37 @@ func (s *KubeletExecutorServer) Run(hks hyperkube.Interface, _ []string) error { }, 5*time.Second, util.NeverStop) } + return nil +} + +// Run runs the specified KubeletExecutorServer. +func (s *KubeletExecutorServer) Run(hks hyperkube.Interface, _ []string) error { + rand.Seed(time.Now().UTC().UnixNano()) + + // create shared channels + kubeletFinished := make(chan struct{}) + execUpdates := make(chan kubetypes.PodUpdate, 1) + + // create static pods directory + staticPodsConfigPath := filepath.Join(s.RootDirectory, "static-pods") + err := os.Mkdir(staticPodsConfigPath, 0755) + if err != nil { + return err + } + + // start executor + err = s.runExecutor(execUpdates, kubeletFinished, staticPodsConfigPath) + if err != nil { + return err + } + + // start kubelet + err = s.runKubelet(execUpdates, kubeletFinished, staticPodsConfigPath) + if err != nil { + close(kubeletFinished) // tell executor + return err + } + // block until executor is shut down or commits shutdown select {} } @@ -531,3 +568,4 @@ func (kl *kubeletExecutor) Run(mergedUpdates <-chan kubetypes.PodUpdate) { // Force kubelet to delete all pods. kl.HandlePodDeletions(kl.GetPods()) } + From 0feb1bceb5ea8ddf2727235f73b04f355ab221f3 Mon Sep 17 00:00:00 2001 From: "Dr. Stefan Schimanski" Date: Wed, 23 Sep 2015 09:38:00 +0200 Subject: [PATCH 08/12] Re-use KubeletServer.KubeletConfig and KubeletServer.Run --- contrib/mesos/pkg/executor/service/service.go | 261 +++--------------- 1 file changed, 39 insertions(+), 222 deletions(-) diff --git a/contrib/mesos/pkg/executor/service/service.go b/contrib/mesos/pkg/executor/service/service.go index 8528c187b7f..d9819d55257 100644 --- a/contrib/mesos/pkg/executor/service/service.go +++ b/contrib/mesos/pkg/executor/service/service.go @@ -19,11 +19,8 @@ package service import ( "fmt" "net" - "net/http" "os" "path/filepath" - "strconv" - "strings" "sync" "time" @@ -37,20 +34,13 @@ import ( "k8s.io/kubernetes/pkg/api" "k8s.io/kubernetes/pkg/client/cache" client "k8s.io/kubernetes/pkg/client/unversioned" - "k8s.io/kubernetes/pkg/credentialprovider" "k8s.io/kubernetes/pkg/fields" - "k8s.io/kubernetes/pkg/healthz" "k8s.io/kubernetes/pkg/kubelet" - "k8s.io/kubernetes/pkg/kubelet/cadvisor" kconfig "k8s.io/kubernetes/pkg/kubelet/config" - kubecontainer "k8s.io/kubernetes/pkg/kubelet/container" + kubeletContainer "k8s.io/kubernetes/pkg/kubelet/container" "k8s.io/kubernetes/pkg/kubelet/dockertools" kubetypes "k8s.io/kubernetes/pkg/kubelet/types" "k8s.io/kubernetes/pkg/util" - utilio "k8s.io/kubernetes/pkg/util/io" - "k8s.io/kubernetes/pkg/util/mount" - "k8s.io/kubernetes/pkg/util/oom" - "k8s.io/kubernetes/pkg/util/rand" ) const ( @@ -90,18 +80,8 @@ func (s *KubeletExecutorServer) AddFlags(fs *pflag.FlagSet) { fs.DurationVar(&s.LaunchGracePeriod, "mesos-launch-grace-period", s.LaunchGracePeriod, "Launch grace period after which launching tasks will be cancelled. Zero disables launch cancellation.") } -func (s *KubeletExecutorServer) runExecutor(execUpdates chan<- kubetypes.PodUpdate, kubeletFinished <-chan struct{}, staticPodsConfigPath string) error { - // create apiserver client - var apiclient *client.Client - clientConfig, err := s.CreateAPIServerClientConfig() - if err == nil { - apiclient, err = client.New(clientConfig) - } - if err != nil { - // required for k8sm since we need to send api.Binding information - // back to the apiserver - log.Fatalf("No API client: %v", err) - } +func (s *KubeletExecutorServer) runExecutor(execUpdates chan<- kubetypes.PodUpdate, kubeletFinished <-chan struct{}, + staticPodsConfigPath string, apiclient *client.Client) error { exec := executor.New(executor.Config{ Updates: execUpdates, APIClient: apiclient, @@ -161,19 +141,40 @@ func (s *KubeletExecutorServer) runExecutor(execUpdates chan<- kubetypes.PodUpda return nil } -// Run runs the specified KubeletExecutorServer. -func (s *KubeletExecutorServer) runKubelet(execUpdates <-chan kubetypes.PodUpdate, kubeletFinished chan<- struct{}, staticPodsConfigPath string) error { - // empty string for the docker and system containers (= cgroup paths). This - // stops the kubelet taking any control over other system processes. - s.SystemContainer = "" - s.DockerDaemonContainer = "" - - oomAdjuster := oom.NewOOMAdjuster() - if err := oomAdjuster.ApplyOOMScoreAdj(0, s.OOMScoreAdj); err != nil { - log.Info(err) +func (s *KubeletExecutorServer) runKubelet(execUpdates <-chan kubetypes.PodUpdate, kubeletFinished chan<- struct{}, + staticPodsConfigPath string, apiclient *client.Client) error { + kcfg, err := s.UnsecuredKubeletConfig() + if err == nil { + kcfg.Builder = func(kc *app.KubeletConfig) (app.KubeletBootstrap, *kconfig.PodConfig, error) { + return s.createAndInitKubelet(kc, staticPodsConfigPath, execUpdates, kubeletFinished) + } + kcfg.DockerDaemonContainer = "" // don't move the docker daemon into a cgroup + kcfg.Hostname = kcfg.HostnameOverride + kcfg.KubeClient = apiclient + kcfg.NodeName = kcfg.HostnameOverride + kcfg.StandaloneMode = false + kcfg.SystemContainer = "" // don't take control over other system processes. + err = s.KubeletServer.Run(kcfg) } - dockerClient := dockertools.ConnectToDockerOrDie(s.DockerEndpoint) + if err != nil { + close(kubeletFinished) + } + return err +} + +// Run runs the specified KubeletExecutorServer. +func (s *KubeletExecutorServer) Run(hks hyperkube.Interface, _ []string) error { + // create shared channels + kubeletFinished := make(chan struct{}) + execUpdates := make(chan kubetypes.PodUpdate, 1) + + // create static pods directory + staticPodsConfigPath := filepath.Join(s.RootDirectory, "static-pods") + err := os.Mkdir(staticPodsConfigPath, 0755) + if err != nil { + return err + } // create apiserver client var apiclient *client.Client @@ -186,194 +187,14 @@ func (s *KubeletExecutorServer) runKubelet(execUpdates <-chan kubetypes.PodUpdat log.Fatalf("No API client: %v", err) } - log.Infof("Using root directory: %v", s.RootDirectory) - credentialprovider.SetPreferredDockercfgPath(s.RootDirectory) - - cAdvisorInterface, err := cadvisor.New(s.CAdvisorPort) - if err != nil { - return err - } - - imageGCPolicy := kubelet.ImageGCPolicy{ - HighThresholdPercent: s.ImageGCHighThresholdPercent, - LowThresholdPercent: s.ImageGCLowThresholdPercent, - } - - diskSpacePolicy := kubelet.DiskSpacePolicy{ - DockerFreeDiskMB: s.LowDiskSpaceThresholdMB, - RootFreeDiskMB: s.LowDiskSpaceThresholdMB, - } - - manifestURLHeader := make(http.Header) - if s.ManifestURLHeader != "" { - pieces := strings.Split(s.ManifestURLHeader, ":") - if len(pieces) != 2 { - return fmt.Errorf("manifest-url-header must have a single ':' key-value separator, got %q", s.ManifestURLHeader) - } - manifestURLHeader.Set(pieces[0], pieces[1]) - } - - //TODO(jdef) intentionally NOT initializing a cloud provider here since: - //(a) the kubelet doesn't actually use it - //(b) we don't need to create N-kubelet connections to zookeeper for no good reason - //cloud := cloudprovider.InitCloudProvider(s.CloudProvider, s.CloudConfigFile) - //log.Infof("Successfully initialized cloud provider: %q from the config file: %q\n", s.CloudProvider, s.CloudConfigFile) - - hostNetworkSources, err := kubetypes.GetValidatedSources(strings.Split(s.HostNetworkSources, ",")) - if err != nil { - return err - } - - hostPIDSources, err := kubetypes.GetValidatedSources(strings.Split(s.HostPIDSources, ",")) - if err != nil { - return err - } - - hostIPCSources, err := kubetypes.GetValidatedSources(strings.Split(s.HostIPCSources, ",")) - if err != nil { - return err - } - - tlsOptions, err := s.InitializeTLS() - if err != nil { - return err - } - mounter := mount.New() - if s.Containerized { - log.V(2).Info("Running kubelet in containerized mode (experimental)") - mounter = &mount.NsenterMounter{} - } - - var writer utilio.Writer = &utilio.StdWriter{} - var dockerExecHandler dockertools.ExecHandler - switch s.DockerExecHandlerName { - case "native": - dockerExecHandler = &dockertools.NativeExecHandler{} - case "nsenter": - writer = &utilio.NsenterWriter{} - dockerExecHandler = &dockertools.NsenterExecHandler{} - default: - log.Warningf("Unknown Docker exec handler %q; defaulting to native", s.DockerExecHandlerName) - dockerExecHandler = &dockertools.NativeExecHandler{} - } - - // prepare kubelet - kcfg := app.KubeletConfig{ - Address: s.Address, - AllowPrivileged: s.AllowPrivileged, - CAdvisorInterface: cAdvisorInterface, - CgroupRoot: s.CgroupRoot, - Cloud: nil, // TODO(jdef) Cloud, specifying null here because we don't want all kubelets polling mesos-master; need to account for this in the cloudprovider impl - ClusterDNS: s.ClusterDNS, - ClusterDomain: s.ClusterDomain, - // ConfigFile: "" - ConfigureCBR0: s.ConfigureCBR0, - ContainerRuntime: s.ContainerRuntime, - CPUCFSQuota: s.CPUCFSQuota, - DiskSpacePolicy: diskSpacePolicy, - DockerClient: dockerClient, - DockerDaemonContainer: s.DockerDaemonContainer, - DockerExecHandler: dockerExecHandler, - EnableDebuggingHandlers: s.EnableDebuggingHandlers, - EnableServer: s.EnableServer, - EventBurst: s.EventBurst, - EventRecordQPS: s.EventRecordQPS, - FileCheckFrequency: s.FileCheckFrequency, - HostnameOverride: s.HostnameOverride, - HostNetworkSources: hostNetworkSources, - HostPIDSources: hostPIDSources, - HostIPCSources: hostIPCSources, - // HTTPCheckFrequency - ImageGCPolicy: imageGCPolicy, - KubeClient: apiclient, - // ManifestURL: "" - ManifestURLHeader: manifestURLHeader, - MasterServiceNamespace: s.MasterServiceNamespace, - MaxContainerCount: s.MaxContainerCount, - MaxOpenFiles: s.MaxOpenFiles, - MaxPerPodContainerCount: s.MaxPerPodContainerCount, - MaxPods: s.MaxPods, - MinimumGCAge: s.MinimumGCAge, - Mounter: mounter, - NetworkPluginName: s.NetworkPluginName, - NetworkPlugins: app.ProbeNetworkPlugins(s.NetworkPluginDir), - NodeStatusUpdateFrequency: s.NodeStatusUpdateFrequency, - OOMAdjuster: oomAdjuster, - OSInterface: kubecontainer.RealOS{}, - PodCIDR: s.PodCIDR, - PodInfraContainerImage: s.PodInfraContainerImage, - Port: s.Port, - ReadOnlyPort: s.ReadOnlyPort, - RegisterNode: s.RegisterNode, - RegistryBurst: s.RegistryBurst, - RegistryPullQPS: s.RegistryPullQPS, - ResolverConfig: s.ResolverConfig, - ResourceContainer: s.ResourceContainer, - RootDirectory: s.RootDirectory, - Runonce: s.RunOnce, - // StandaloneMode: false - StreamingConnectionIdleTimeout: s.StreamingConnectionIdleTimeout, - SyncFrequency: s.SyncFrequency, - SystemContainer: s.SystemContainer, - TLSOptions: tlsOptions, - VolumePlugins: app.ProbeVolumePlugins(), - Writer: writer, - } - - kcfg.NodeName = kcfg.Hostname - - kcfg.Builder = app.KubeletBuilder(func(kc *app.KubeletConfig) (app.KubeletBootstrap, *kconfig.PodConfig, error) { - return s.createAndInitKubelet(kc, clientConfig, staticPodsConfigPath, execUpdates, kubeletFinished) - }) - err = app.RunKubelet(&kcfg) - if err != nil { - return err - } - - // start health check server - if s.HealthzPort > 0 { - healthz.DefaultHealthz() - go util.Until(func() { - err := http.ListenAndServe(net.JoinHostPort(s.HealthzBindAddress.String(), strconv.Itoa(s.HealthzPort)), nil) - if err != nil { - log.Errorf("Starting health server failed: %v", err) - } - }, 5*time.Second, util.NeverStop) - } - - return nil -} - -// Run runs the specified KubeletExecutorServer. -func (s *KubeletExecutorServer) Run(hks hyperkube.Interface, _ []string) error { - rand.Seed(time.Now().UTC().UnixNano()) - - // create shared channels - kubeletFinished := make(chan struct{}) - execUpdates := make(chan kubetypes.PodUpdate, 1) - - // create static pods directory - staticPodsConfigPath := filepath.Join(s.RootDirectory, "static-pods") - err := os.Mkdir(staticPodsConfigPath, 0755) - if err != nil { - return err - } - // start executor - err = s.runExecutor(execUpdates, kubeletFinished, staticPodsConfigPath) + err = s.runExecutor(execUpdates, kubeletFinished, staticPodsConfigPath, apiclient) if err != nil { return err } - // start kubelet - err = s.runKubelet(execUpdates, kubeletFinished, staticPodsConfigPath) - if err != nil { - close(kubeletFinished) // tell executor - return err - } - - // block until executor is shut down or commits shutdown - select {} + // start kubelet, blocking + return s.runKubelet(execUpdates, kubeletFinished, staticPodsConfigPath, apiclient) } func defaultBindingAddress() string { @@ -387,12 +208,10 @@ func defaultBindingAddress() string { func (ks *KubeletExecutorServer) createAndInitKubelet( kc *app.KubeletConfig, - clientConfig *client.Config, staticPodsConfigPath string, execUpdates <-chan kubetypes.PodUpdate, kubeletDone chan<- struct{}, ) (app.KubeletBootstrap, *kconfig.PodConfig, error) { - // TODO(k8s): block until all sources have delivered at least one update to the channel, or break the sync loop // up into "per source" synchronizations // TODO(k8s): KubeletConfig.KubeClient should be a client interface, but client interface misses certain methods @@ -405,7 +224,7 @@ func (ks *KubeletExecutorServer) createAndInitKubelet( kubeClient = kc.KubeClient } - gcPolicy := kubecontainer.ContainerGCPolicy{ + gcPolicy := kubeletContainer.ContainerGCPolicy{ MinAge: kc.MinimumGCAge, MaxPerPodContainer: kc.MaxPerPodContainerCount, MaxContainers: kc.MaxContainerCount, @@ -495,7 +314,6 @@ func (ks *KubeletExecutorServer) createAndInitKubelet( address: ks.Address, dockerClient: kc.DockerClient, kubeletDone: kubeletDone, - clientConfig: clientConfig, executorDone: executorDone, } @@ -512,7 +330,6 @@ type kubeletExecutor struct { dockerClient dockertools.DockerInterface kubeletDone chan<- struct{} // closed once kubelet.Run() returns executorDone <-chan struct{} // closed when executor terminates - clientConfig *client.Config } func (kl *kubeletExecutor) ListenAndServe(address net.IP, port uint, tlsOptions *kubelet.TLSOptions, auth kubelet.AuthInterface, enableDebuggingHandlers bool) { From 4ec703174b9f1c78d2fba8faffcf41935f8430ec Mon Sep 17 00:00:00 2001 From: "Dr. Stefan Schimanski" Date: Wed, 23 Sep 2015 10:05:00 +0200 Subject: [PATCH 09/12] Simplify executorKubelet decorator and move to kubelet.go - simplify - rename kubeletExecutor -> executorKubelet - move executorKubelet code into kubelet.go - remove duplicated executorKubelet.ListenAndServe - fix executorKubelet.Run comment --- contrib/mesos/pkg/executor/service/kubelet.go | 83 +++++++++++++++++++ contrib/mesos/pkg/executor/service/service.go | 6 +- 2 files changed, 85 insertions(+), 4 deletions(-) create mode 100644 contrib/mesos/pkg/executor/service/kubelet.go diff --git a/contrib/mesos/pkg/executor/service/kubelet.go b/contrib/mesos/pkg/executor/service/kubelet.go new file mode 100644 index 00000000000..191f0e00e73 --- /dev/null +++ b/contrib/mesos/pkg/executor/service/kubelet.go @@ -0,0 +1,83 @@ +/* +Copyright 2015 The Kubernetes Authors All rights reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package service + +import ( + log "github.com/golang/glog" + "k8s.io/kubernetes/pkg/kubelet" + kubetypes "k8s.io/kubernetes/pkg/kubelet/types" + "k8s.io/kubernetes/pkg/util" +) + +// executorKubelet decorates the kubelet with a Run function that notifies the +// executor by closing kubeletDone before entering blocking state. +type executorKubelet struct { + *kubelet.Kubelet + kubeletDone chan<- struct{} // closed once kubelet.Run() returns + executorDone <-chan struct{} // closed when executor terminates +} + +// Run runs the main kubelet loop, closing the kubeletFinished chan when the +// loop exits. Like the upstream Run, it will never return. +func (kl *executorKubelet) Run(mergedUpdates <-chan kubetypes.PodUpdate) { + defer func() { + // When this Run function is called, we close it here. + // Otherwise, KubeletExecutorServer.runKubelet will. + close(kl.kubeletDone) + util.HandleCrash() + log.Infoln("kubelet run terminated") //TODO(jdef) turn down verbosity + // important: never return! this is in our contract + select {} + }() + + // push merged updates into another, closable update channel which is closed + // when the executor shuts down. + closableUpdates := make(chan kubetypes.PodUpdate) + go func() { + // closing closableUpdates will cause our patched kubelet's syncLoop() to exit + defer close(closableUpdates) + pipeLoop: + for { + select { + case <-kl.executorDone: + break pipeLoop + default: + select { + case u := <-mergedUpdates: + select { + case closableUpdates <- u: // noop + case <-kl.executorDone: + break pipeLoop + } + case <-kl.executorDone: + break pipeLoop + } + } + } + }() + + // we expect that Run() will complete after closableUpdates is closed and the + // kubelet's syncLoop() has finished processing its backlog, which hopefully + // will not take very long. Peeking into the future (current k8s master) it + // seems that the backlog has grown from 1 to 50 -- this may negatively impact + // us going forward, time will tell. + util.Until(func() { kl.Kubelet.Run(closableUpdates) }, 0, kl.executorDone) + + //TODO(jdef) revisit this if/when executor failover lands + // Force kubelet to delete all pods. + kl.HandlePodDeletions(kl.GetPods()) +} diff --git a/contrib/mesos/pkg/executor/service/service.go b/contrib/mesos/pkg/executor/service/service.go index d9819d55257..4841f0db452 100644 --- a/contrib/mesos/pkg/executor/service/service.go +++ b/contrib/mesos/pkg/executor/service/service.go @@ -309,10 +309,9 @@ func (ks *KubeletExecutorServer) createAndInitKubelet( ks.klet = klet ks.kletLock.Unlock() - k := &kubeletExecutor{ + // decorate kubelet such that it shuts down when the executor is + k := &executorKubelet{ Kubelet: ks.klet, - address: ks.Address, - dockerClient: kc.DockerClient, kubeletDone: kubeletDone, executorDone: executorDone, } @@ -385,4 +384,3 @@ func (kl *kubeletExecutor) Run(mergedUpdates <-chan kubetypes.PodUpdate) { // Force kubelet to delete all pods. kl.HandlePodDeletions(kl.GetPods()) } - From d74950cfb9358b76307ce3904fb7d848295885ec Mon Sep 17 00:00:00 2001 From: "Dr. Stefan Schimanski" Date: Wed, 23 Sep 2015 13:07:03 +0200 Subject: [PATCH 10/12] Re-use CreateAndInitKubelet --- contrib/mesos/pkg/executor/service/service.go | 240 ++++-------------- 1 file changed, 55 insertions(+), 185 deletions(-) diff --git a/contrib/mesos/pkg/executor/service/service.go b/contrib/mesos/pkg/executor/service/service.go index 4841f0db452..9290fa8b5d3 100644 --- a/contrib/mesos/pkg/executor/service/service.go +++ b/contrib/mesos/pkg/executor/service/service.go @@ -37,10 +37,8 @@ import ( "k8s.io/kubernetes/pkg/fields" "k8s.io/kubernetes/pkg/kubelet" kconfig "k8s.io/kubernetes/pkg/kubelet/config" - kubeletContainer "k8s.io/kubernetes/pkg/kubelet/container" "k8s.io/kubernetes/pkg/kubelet/dockertools" kubetypes "k8s.io/kubernetes/pkg/kubelet/types" - "k8s.io/kubernetes/pkg/util" ) const ( @@ -141,24 +139,75 @@ func (s *KubeletExecutorServer) runExecutor(execUpdates chan<- kubetypes.PodUpda return nil } -func (s *KubeletExecutorServer) runKubelet(execUpdates <-chan kubetypes.PodUpdate, kubeletFinished chan<- struct{}, +func (s *KubeletExecutorServer) runKubelet(execUpdates <-chan kubetypes.PodUpdate, kubeletDone chan<- struct{}, staticPodsConfigPath string, apiclient *client.Client) error { kcfg, err := s.UnsecuredKubeletConfig() if err == nil { + // apply Messo specific settings + executorDone := make(chan struct{}) kcfg.Builder = func(kc *app.KubeletConfig) (app.KubeletBootstrap, *kconfig.PodConfig, error) { - return s.createAndInitKubelet(kc, staticPodsConfigPath, execUpdates, kubeletFinished) + k, pc, err := app.CreateAndInitKubelet(kc) + if err != nil { + return k, pc, err + } + + klet := k.(*kubelet.Kubelet) + + s.kletLock.Lock() + s.klet = klet + s.kletLock.Unlock() + + // decorate kubelet such that it shuts down when the executor is + decorated := &executorKubelet{ + Kubelet: klet, + kubeletDone: kubeletDone, + executorDone: executorDone, + } + + return decorated, pc, nil } kcfg.DockerDaemonContainer = "" // don't move the docker daemon into a cgroup kcfg.Hostname = kcfg.HostnameOverride kcfg.KubeClient = apiclient kcfg.NodeName = kcfg.HostnameOverride + kcfg.PodConfig = kconfig.NewPodConfig(kconfig.PodConfigNotificationIncremental, kcfg.Recorder) // override the default pod source kcfg.StandaloneMode = false kcfg.SystemContainer = "" // don't take control over other system processes. + if kcfg.Cloud != nil { + // fail early and hard because having the cloud provider loaded would go unnoticed, + // but break bigger cluster because accessing the state.json from every slave kills the master. + panic("cloud provider must not be set") + } + + // create main pod source + updates := kcfg.PodConfig.Channel(MESOS_CFG_SOURCE) + go func() { + // execUpdates will be closed by the executor on shutdown + defer close(executorDone) + + for u := range execUpdates { + u.Source = MESOS_CFG_SOURCE + updates <- u + } + }() + + // create static-pods directory file source + log.V(2).Infof("initializing static pods source factory, configured at path %q", staticPodsConfigPath) + fileSourceUpdates := kcfg.PodConfig.Channel(kubetypes.FileSource) + kconfig.NewSourceFile(staticPodsConfigPath, kcfg.HostnameOverride, kcfg.FileCheckFrequency, fileSourceUpdates) + + // run the kubelet, until execUpdates is closed + // NOTE: because kcfg != nil holds, the upstream Run function will not + // initialize the cloud provider. We explicitly wouldn't want + // that because then every kubelet instance would query the master + // state.json which does not scale. err = s.KubeletServer.Run(kcfg) } if err != nil { - close(kubeletFinished) + // close the channel here. When Run returns without error, the executorKubelet is + // responsible to do this. If it returns with an error, we are responsible here. + close(kubeletDone) } return err } @@ -184,7 +233,7 @@ func (s *KubeletExecutorServer) Run(hks hyperkube.Interface, _ []string) error { } if err != nil { // required for k8sm since we need to send api.Binding information back to the apiserver - log.Fatalf("No API client: %v", err) + return fmt.Errorf("cannot create API client: %v", err) } // start executor @@ -205,182 +254,3 @@ func defaultBindingAddress() string { return libProcessIP } } - -func (ks *KubeletExecutorServer) createAndInitKubelet( - kc *app.KubeletConfig, - staticPodsConfigPath string, - execUpdates <-chan kubetypes.PodUpdate, - kubeletDone chan<- struct{}, -) (app.KubeletBootstrap, *kconfig.PodConfig, error) { - // TODO(k8s): block until all sources have delivered at least one update to the channel, or break the sync loop - // up into "per source" synchronizations - // TODO(k8s): KubeletConfig.KubeClient should be a client interface, but client interface misses certain methods - // used by kubelet. Since NewMainKubelet expects a client interface, we need to make sure we are not passing - // a nil pointer to it when what we really want is a nil interface. - var kubeClient client.Interface - if kc.KubeClient == nil { - kubeClient = nil - } else { - kubeClient = kc.KubeClient - } - - gcPolicy := kubeletContainer.ContainerGCPolicy{ - MinAge: kc.MinimumGCAge, - MaxPerPodContainer: kc.MaxPerPodContainerCount, - MaxContainers: kc.MaxContainerCount, - } - - // create main pod source - pc := kconfig.NewPodConfig(kconfig.PodConfigNotificationIncremental, kc.Recorder) - updates := pc.Channel(MESOS_CFG_SOURCE) - executorDone := make(chan struct{}) - go func() { - // execUpdates will be closed by the executor on shutdown - defer close(executorDone) - - for u := range execUpdates { - u.Source = MESOS_CFG_SOURCE - updates <- u - } - }() - - // create static-pods directory file source - log.V(2).Infof("initializing static pods source factory, configured at path %q", staticPodsConfigPath) - fileSourceUpdates := pc.Channel(kubetypes.FileSource) - kconfig.NewSourceFile(staticPodsConfigPath, kc.Hostname, kc.FileCheckFrequency, fileSourceUpdates) - - klet, err := kubelet.NewMainKubelet( - kc.Hostname, - kc.NodeName, - kc.DockerClient, - kubeClient, - kc.RootDirectory, - kc.PodInfraContainerImage, - kc.SyncFrequency, - float32(kc.RegistryPullQPS), - kc.RegistryBurst, - kc.EventRecordQPS, - kc.EventBurst, - gcPolicy, - pc.SeenAllSources, - kc.RegisterNode, - kc.RegisterSchedulable, - kc.StandaloneMode, - kc.ClusterDomain, - net.IP(kc.ClusterDNS), - kc.MasterServiceNamespace, - kc.VolumePlugins, - kc.NetworkPlugins, - kc.NetworkPluginName, - kc.StreamingConnectionIdleTimeout, - kc.Recorder, - kc.CAdvisorInterface, - kc.ImageGCPolicy, - kc.DiskSpacePolicy, - kc.Cloud, - kc.NodeStatusUpdateFrequency, - kc.ResourceContainer, - kc.OSInterface, - kc.CgroupRoot, - kc.ContainerRuntime, - kc.RktPath, - kc.RktStage1Image, - kc.Mounter, - kc.Writer, - kc.DockerDaemonContainer, - kc.SystemContainer, - kc.ConfigureCBR0, - kc.PodCIDR, - kc.ReconcileCIDR, - kc.MaxPods, - kc.DockerExecHandler, - kc.ResolverConfig, - kc.CPUCFSQuota, - &api.NodeDaemonEndpoints{ - KubeletEndpoint: api.DaemonEndpoint{Port: int(kc.Port)}, - }, - kc.OOMAdjuster, - ) - if err != nil { - return nil, nil, err - } - - ks.kletLock.Lock() - ks.klet = klet - ks.kletLock.Unlock() - - // decorate kubelet such that it shuts down when the executor is - k := &executorKubelet{ - Kubelet: ks.klet, - kubeletDone: kubeletDone, - executorDone: executorDone, - } - - k.BirthCry() - k.StartGarbageCollection() - - return k, pc, nil -} - -// kubelet decorator -type kubeletExecutor struct { - *kubelet.Kubelet - address net.IP - dockerClient dockertools.DockerInterface - kubeletDone chan<- struct{} // closed once kubelet.Run() returns - executorDone <-chan struct{} // closed when executor terminates -} - -func (kl *kubeletExecutor) ListenAndServe(address net.IP, port uint, tlsOptions *kubelet.TLSOptions, auth kubelet.AuthInterface, enableDebuggingHandlers bool) { - log.Infof("Starting kubelet server...") - kubelet.ListenAndServeKubeletServer(kl, address, port, tlsOptions, auth, enableDebuggingHandlers) -} - -// runs the main kubelet loop, closing the kubeletFinished chan when the loop exits. -// never returns. -func (kl *kubeletExecutor) Run(mergedUpdates <-chan kubetypes.PodUpdate) { - defer func() { - close(kl.kubeletDone) - util.HandleCrash() - log.Infoln("kubelet run terminated") //TODO(jdef) turn down verbosity - // important: never return! this is in our contract - select {} - }() - - // push merged updates into another, closable update channel which is closed - // when the executor shuts down. - closableUpdates := make(chan kubetypes.PodUpdate) - go func() { - // closing closableUpdates will cause our patched kubelet's syncLoop() to exit - defer close(closableUpdates) - pipeLoop: - for { - select { - case <-kl.executorDone: - break pipeLoop - default: - select { - case u := <-mergedUpdates: - select { - case closableUpdates <- u: // noop - case <-kl.executorDone: - break pipeLoop - } - case <-kl.executorDone: - break pipeLoop - } - } - } - }() - - // we expect that Run() will complete after closableUpdates is closed and the - // kubelet's syncLoop() has finished processing its backlog, which hopefully - // will not take very long. Peeking into the future (current k8s master) it - // seems that the backlog has grown from 1 to 50 -- this may negatively impact - // us going forward, time will tell. - util.Until(func() { kl.Kubelet.Run(closableUpdates) }, 0, kl.executorDone) - - //TODO(jdef) revisit this if/when executor failover lands - // Force kubelet to delete all pods. - kl.HandlePodDeletions(kl.GetPods()) -} From b629278d7d52a82cb9aa4fb3865a14d52598b1d9 Mon Sep 17 00:00:00 2001 From: "Dr. Stefan Schimanski" Date: Thu, 8 Oct 2015 00:44:05 +0100 Subject: [PATCH 11/12] Set static pods dir to 0750 instead of world readible 0755 --- contrib/mesos/pkg/executor/service/service.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/contrib/mesos/pkg/executor/service/service.go b/contrib/mesos/pkg/executor/service/service.go index 9290fa8b5d3..a6454938927 100644 --- a/contrib/mesos/pkg/executor/service/service.go +++ b/contrib/mesos/pkg/executor/service/service.go @@ -220,7 +220,7 @@ func (s *KubeletExecutorServer) Run(hks hyperkube.Interface, _ []string) error { // create static pods directory staticPodsConfigPath := filepath.Join(s.RootDirectory, "static-pods") - err := os.Mkdir(staticPodsConfigPath, 0755) + err := os.Mkdir(staticPodsConfigPath, 0750) if err != nil { return err } From 0d8384c82034f26edfc91ac0f3363d476618217e Mon Sep 17 00:00:00 2001 From: "Dr. Stefan Schimanski" Date: Thu, 8 Oct 2015 15:57:25 +0100 Subject: [PATCH 12/12] Replace all executor_test timeouts by util.ForeverTestTimeout --- contrib/mesos/pkg/executor/executor_test.go | 25 +++++++++++---------- 1 file changed, 13 insertions(+), 12 deletions(-) diff --git a/contrib/mesos/pkg/executor/executor_test.go b/contrib/mesos/pkg/executor/executor_test.go index a91ca55c304..029e64076eb 100644 --- a/contrib/mesos/pkg/executor/executor_test.go +++ b/contrib/mesos/pkg/executor/executor_test.go @@ -45,6 +45,7 @@ import ( "k8s.io/kubernetes/pkg/kubelet/dockertools" kubetypes "k8s.io/kubernetes/pkg/kubelet/types" "k8s.io/kubernetes/pkg/runtime" + "k8s.io/kubernetes/pkg/util" "k8s.io/kubernetes/pkg/watch" "github.com/mesos/mesos-go/mesosproto" @@ -72,7 +73,7 @@ func TestExecutorRegister(t *testing.T) { if reflect.DeepEqual(initialPodUpdate, update) { receivedInitialPodUpdate = true } - case <-time.After(time.Second): + case <-time.After(util.ForeverTestTimeout): } assert.Equal(t, true, receivedInitialPodUpdate, "executor should have sent an initial PodUpdate "+ @@ -163,7 +164,7 @@ func TestExecutorLaunchAndKillTask(t *testing.T) { select { case <-updates: - case <-time.After(time.Second): + case <-time.After(util.ForeverTestTimeout): t.Fatalf("Executor should send an initial update on Registration") } @@ -193,7 +194,7 @@ func TestExecutorLaunchAndKillTask(t *testing.T) { executor.LaunchTask(mockDriver, taskInfo) - assertext.EventuallyTrue(t, 5*time.Second, func() bool { + assertext.EventuallyTrue(t, util.ForeverTestTimeout, func() bool { executor.lock.Lock() defer executor.lock.Unlock() return len(executor.tasks) == 1 && len(executor.pods) == 1 @@ -205,7 +206,7 @@ func TestExecutorLaunchAndKillTask(t *testing.T) { if len(update.Pods) == 1 { gotPodUpdate = true } - case <-time.After(time.Second): + case <-time.After(util.ForeverTestTimeout): } assert.Equal(t, true, gotPodUpdate, "the executor should send an update about a new pod to "+ @@ -215,7 +216,7 @@ func TestExecutorLaunchAndKillTask(t *testing.T) { finished := kmruntime.After(statusUpdateCalls.Wait) select { case <-finished: - case <-time.After(5 * time.Second): + case <-time.After(util.ForeverTestTimeout): t.Fatalf("timed out waiting for status update calls to finish") } @@ -227,7 +228,7 @@ func TestExecutorLaunchAndKillTask(t *testing.T) { executor.KillTask(mockDriver, taskInfo.TaskId) - assertext.EventuallyTrue(t, 5*time.Second, func() bool { + assertext.EventuallyTrue(t, util.ForeverTestTimeout, func() bool { executor.lock.Lock() defer executor.lock.Unlock() return len(executor.tasks) == 0 && len(executor.pods) == 0 @@ -237,7 +238,7 @@ func TestExecutorLaunchAndKillTask(t *testing.T) { finished = kmruntime.After(statusUpdateCalls.Wait) select { case <-finished: - case <-time.After(5 * time.Second): + case <-time.After(util.ForeverTestTimeout): t.Fatalf("timed out waiting for status update calls to finish") } mockDriver.AssertExpectations(t) @@ -338,7 +339,7 @@ func TestExecutorStaticPods(t *testing.T) { // wait for static pod to start seenPods := map[string]struct{}{} - timeout := time.After(time.Second) + timeout := time.After(util.ForeverTestTimeout) defer mockDriver.AssertExpectations(t) for { // filter by PodUpdate type @@ -438,7 +439,7 @@ func TestExecutorFrameworkMessage(t *testing.T) { // when removing the task from k.tasks through the "task-lost:foo" message below. select { case <-called: - case <-time.After(5 * time.Second): + case <-time.After(util.ForeverTestTimeout): t.Fatalf("timed out waiting for SendStatusUpdate for the running task") } @@ -450,7 +451,7 @@ func TestExecutorFrameworkMessage(t *testing.T) { ).Return(mesosproto.Status_DRIVER_RUNNING, nil).Run(func(_ mock.Arguments) { close(called) }).Once() executor.FrameworkMessage(mockDriver, "task-lost:foo") - assertext.EventuallyTrue(t, 5*time.Second, func() bool { + assertext.EventuallyTrue(t, util.ForeverTestTimeout, func() bool { executor.lock.Lock() defer executor.lock.Unlock() return len(executor.tasks) == 0 && len(executor.pods) == 0 @@ -458,7 +459,7 @@ func TestExecutorFrameworkMessage(t *testing.T) { select { case <-called: - case <-time.After(5 * time.Second): + case <-time.After(util.ForeverTestTimeout): t.Fatalf("timed out waiting for SendStatusUpdate") } @@ -621,7 +622,7 @@ func TestExecutorsendFrameworkMessage(t *testing.T) { // guard against data race in mock driver between AssertExpectations and Called select { case <-called: // expected - case <-time.After(5 * time.Second): + case <-time.After(util.ForeverTestTimeout): t.Fatalf("expected call to SendFrameworkMessage") } mockDriver.AssertExpectations(t)