From dd5bafdba59fd3d7de67d13703e1b93677b7d4ae Mon Sep 17 00:00:00 2001 From: "Dr. Stefan Schimanski" Date: Wed, 23 Sep 2015 10:16:59 +0200 Subject: [PATCH] Decouple startup of executor and kubelet --- contrib/mesos/pkg/executor/executor.go | 83 +++---- contrib/mesos/pkg/executor/mock_test.go | 8 +- contrib/mesos/pkg/executor/service/service.go | 222 ++++++++++-------- 3 files changed, 167 insertions(+), 146 deletions(-) diff --git a/contrib/mesos/pkg/executor/executor.go b/contrib/mesos/pkg/executor/executor.go index 14d5558df56..14ba8e3433e 100644 --- a/contrib/mesos/pkg/executor/executor.go +++ b/contrib/mesos/pkg/executor/executor.go @@ -87,7 +87,7 @@ func (s *stateType) transitionTo(to stateType, unless ...stateType) bool { type kuberTask struct { mesosTaskInfo *mesos.TaskInfo - podName string + podName string // empty until pod is sent to kubelet and registed in KubernetesExecutor.pods } type podStatusFunc func() (*api.PodStatus, error) @@ -102,14 +102,12 @@ type KubernetesExecutor struct { lock sync.Mutex client *client.Client terminate chan struct{} // signals that the executor should shutdown - registered chan struct{} // closed when registerd outgoing chan func() (mesos.Status, error) // outgoing queue to the mesos driver dockerClient dockertools.DockerInterface suicideWatch suicideWatcher suicideTimeout time.Duration shutdownAlert func() // invoked just prior to executor shutdown kubeletFinished <-chan struct{} // signals that kubelet Run() died - initialRegistration sync.Once exitFunc func(int) podStatusFunc func(*api.Pod) (*api.PodStatus, error) staticPodsConfigPath string @@ -152,7 +150,6 @@ func New(config Config) *KubernetesExecutor { shutdownAlert: config.ShutdownAlert, exitFunc: config.ExitFunc, podStatusFunc: config.PodStatusFunc, - registered: make(chan struct{}), staticPodsConfigPath: config.StaticPodsConfigPath, launchGracePeriod: config.LaunchGracePeriod, } @@ -182,12 +179,6 @@ func New(config Config) *KubernetesExecutor { return k } -// InitiallyRegistered returns a channel which is closed when the executor is -// registered with the Mesos master. -func (k *KubernetesExecutor) InitiallyRegistered() <-chan struct{} { - return k.registered -} - func (k *KubernetesExecutor) Init(driver bindings.ExecutorDriver) { k.killKubeletContainers() k.resetSuicideWatch(driver) @@ -206,6 +197,15 @@ func (k *KubernetesExecutor) isDone() bool { } } +// sendPodUpdate assumes that caller is holding state lock; returns true when update is sent otherwise false +func (k *KubernetesExecutor) sendPodUpdate(u *kubetypes.PodUpdate) bool { + if k.isDone() { + return false + } + k.updateChan <- *u + return true +} + // Registered is called when the executor is successfully registered with the slave. func (k *KubernetesExecutor) Registered(driver bindings.ExecutorDriver, executorInfo *mesos.ExecutorInfo, frameworkInfo *mesos.FrameworkInfo, slaveInfo *mesos.SlaveInfo) { @@ -229,12 +229,13 @@ func (k *KubernetesExecutor) Registered(driver bindings.ExecutorDriver, } } - k.updateChan <- kubetypes.PodUpdate{ + // emit an empty update to allow the mesos "source" to be marked as seen + k.lock.Lock() + defer k.lock.Unlock() + k.sendPodUpdate(&kubetypes.PodUpdate{ Pods: []*api.Pod{}, Op: kubetypes.SET, - } - - close(k.registered) + }) } // Reregistered is called when the executor is successfully re-registered with the slave. @@ -254,17 +255,6 @@ func (k *KubernetesExecutor) Reregistered(driver bindings.ExecutorDriver, slaveI log.Errorf("cannot update node labels: %v", err) } } - - // emit an empty update to allow the mesos "source" to be marked as seen - k.lock.Lock() - defer k.lock.Unlock() - if k.isDone() { - return - } - k.updateChan <- kubetypes.PodUpdate{ - Pods: []*api.Pod{}, - Op: kubetypes.SET, - } } // initializeStaticPodsSource unzips the data slice into the static-pods directory @@ -378,14 +368,10 @@ func (k *KubernetesExecutor) handleChangedApiserverPod(pod *api.Pod) { oldPod.DeletionTimestamp = pod.DeletionTimestamp oldPod.DeletionGracePeriodSeconds = pod.DeletionGracePeriodSeconds - if k.isDone() { - return - } - update := kubetypes.PodUpdate{ + k.sendPodUpdate(&kubetypes.PodUpdate{ Op: kubetypes.UPDATE, Pods: []*api.Pod{oldPod}, - } - k.updateChan <- update + }) } } } @@ -538,7 +524,7 @@ func (k *KubernetesExecutor) launchTask(driver bindings.ExecutorDriver, taskId s k.lock.Lock() defer k.lock.Unlock() - // Add the task. + // find task task, found := k.tasks[taskId] if !found { log.V(1).Infof("task %v not found, probably killed: aborting launch, reporting lost", taskId) @@ -548,21 +534,23 @@ func (k *KubernetesExecutor) launchTask(driver bindings.ExecutorDriver, taskId s //TODO(jdef) check for duplicate pod name, if found send TASK_ERROR - // from here on, we need to delete containers associated with the task - // upon it going into a terminal state + // send the new pod to the kubelet which will spin it up + ok := k.sendPodUpdate(&kubetypes.PodUpdate{ + Op: kubetypes.ADD, + Pods: []*api.Pod{pod}, + }) + if !ok { + return // executor is terminating, cancel launch + } + + // mark task as sent by setting the podName and register the sent pod task.podName = podFullName k.pods[podFullName] = pod - // send the new pod to the kubelet which will spin it up - if k.isDone() { - return - } - update := kubetypes.PodUpdate{ - Op: kubetypes.ADD, - Pods: []*api.Pod{pod}, - } - k.updateChan <- update + // From here on, we need to delete containers associated with the task upon + // it going into a terminal state. + // report task is starting to scheduler statusUpdate := &mesos.TaskStatus{ TaskId: mutil.NewTaskID(taskId), State: mesos.TaskState_TASK_STARTING.Enum(), @@ -575,7 +563,6 @@ func (k *KubernetesExecutor) launchTask(driver bindings.ExecutorDriver, taskId s psf := podStatusFunc(func() (*api.PodStatus, error) { return k.podStatusFunc(pod) }) - go k._launchTask(driver, taskId, podFullName, psf) } @@ -751,14 +738,10 @@ func (k *KubernetesExecutor) removePodTask(driver bindings.ExecutorDriver, tid, delete(k.pods, pid) // tell the kubelet to remove the pod - if k.isDone() { - return - } - update := kubetypes.PodUpdate{ + k.sendPodUpdate(&kubetypes.PodUpdate{ Op: kubetypes.REMOVE, Pods: []*api.Pod{pod}, - } - k.updateChan <- update + }) } // TODO(jdef): ensure that the update propagates, perhaps return a signal chan? k.sendStatus(driver, newStatus(mutil.NewTaskID(tid), state, reason)) diff --git a/contrib/mesos/pkg/executor/mock_test.go b/contrib/mesos/pkg/executor/mock_test.go index f73039ff7dd..1a8b35ac6a2 100644 --- a/contrib/mesos/pkg/executor/mock_test.go +++ b/contrib/mesos/pkg/executor/mock_test.go @@ -23,8 +23,8 @@ import ( "github.com/stretchr/testify/assert" "github.com/stretchr/testify/mock" "k8s.io/kubernetes/pkg/api" - kubetypes "k8s.io/kubernetes/pkg/kubelet/types" "k8s.io/kubernetes/pkg/kubelet/dockertools" + kubetypes "k8s.io/kubernetes/pkg/kubelet/types" ) type MockExecutorDriver struct { @@ -69,9 +69,9 @@ func (m *MockExecutorDriver) SendFrameworkMessage(msg string) (mesosproto.Status func NewTestKubernetesExecutor() (*KubernetesExecutor, chan kubetypes.PodUpdate) { updates := make(chan kubetypes.PodUpdate, 1024) return New(Config{ - Docker: dockertools.ConnectToDockerOrDie("fake://"), - Updates: updates, - PodLW: &NewMockPodsListWatch(api.PodList{}).ListWatch, + Docker: dockertools.ConnectToDockerOrDie("fake://"), + Updates: updates, + PodLW: &NewMockPodsListWatch(api.PodList{}).ListWatch, }), updates } diff --git a/contrib/mesos/pkg/executor/service/service.go b/contrib/mesos/pkg/executor/service/service.go index 45488830f9f..8528c187b7f 100644 --- a/contrib/mesos/pkg/executor/service/service.go +++ b/contrib/mesos/pkg/executor/service/service.go @@ -35,8 +35,10 @@ import ( "k8s.io/kubernetes/contrib/mesos/pkg/executor/config" "k8s.io/kubernetes/contrib/mesos/pkg/hyperkube" "k8s.io/kubernetes/pkg/api" + "k8s.io/kubernetes/pkg/client/cache" client "k8s.io/kubernetes/pkg/client/unversioned" "k8s.io/kubernetes/pkg/credentialprovider" + "k8s.io/kubernetes/pkg/fields" "k8s.io/kubernetes/pkg/healthz" "k8s.io/kubernetes/pkg/kubelet" "k8s.io/kubernetes/pkg/kubelet/cadvisor" @@ -61,8 +63,9 @@ type KubeletExecutorServer struct { *app.KubeletServer SuicideTimeout time.Duration LaunchGracePeriod time.Duration - kletLock sync.Mutex - klet *kubelet.Kubelet + + kletLock sync.Mutex // TODO(sttts): remove necessity to access the kubelet from the executor + klet *kubelet.Kubelet } func NewKubeletExecutorServer() *KubeletExecutorServer { @@ -87,27 +90,7 @@ func (s *KubeletExecutorServer) AddFlags(fs *pflag.FlagSet) { fs.DurationVar(&s.LaunchGracePeriod, "mesos-launch-grace-period", s.LaunchGracePeriod, "Launch grace period after which launching tasks will be cancelled. Zero disables launch cancellation.") } -// Run runs the specified KubeletExecutorServer. -func (s *KubeletExecutorServer) Run(hks hyperkube.Interface, _ []string) error { - rand.Seed(time.Now().UTC().UnixNano()) - - oomAdjuster := oom.NewOOMAdjuster() - if err := oomAdjuster.ApplyOOMScoreAdj(0, s.OOMScoreAdj); err != nil { - log.Info(err) - } - - // empty string for the docker and system containers (= cgroup paths). This - // stops the kubelet taking any control over other system processes. - s.SystemContainer = "" - s.DockerDaemonContainer = "" - - // create static pods directory - staticPodsConfigPath := filepath.Join(s.RootDirectory, "static-pods") - err := os.Mkdir(staticPodsConfigPath, 0755) - if err != nil { - return err - } - +func (s *KubeletExecutorServer) runExecutor(execUpdates chan<- kubetypes.PodUpdate, kubeletFinished <-chan struct{}, staticPodsConfigPath string) error { // create apiserver client var apiclient *client.Client clientConfig, err := s.CreateAPIServerClientConfig() @@ -119,6 +102,89 @@ func (s *KubeletExecutorServer) Run(hks hyperkube.Interface, _ []string) error { // back to the apiserver log.Fatalf("No API client: %v", err) } + exec := executor.New(executor.Config{ + Updates: execUpdates, + APIClient: apiclient, + Docker: dockertools.ConnectToDockerOrDie(s.DockerEndpoint), + SuicideTimeout: s.SuicideTimeout, + KubeletFinished: kubeletFinished, + ExitFunc: os.Exit, + PodStatusFunc: func(pod *api.Pod) (*api.PodStatus, error) { + s.kletLock.Lock() + defer s.kletLock.Unlock() + + if s.klet == nil { + return nil, fmt.Errorf("PodStatucFunc called before kubelet is initialized") + } + + status, err := s.klet.GetRuntime().GetPodStatus(pod) + if err != nil { + return nil, err + } + + status.Phase = kubelet.GetPhase(&pod.Spec, status.ContainerStatuses) + hostIP, err := s.klet.GetHostIP() + if err != nil { + log.Errorf("Cannot get host IP: %v", err) + } else { + status.HostIP = hostIP.String() + } + return status, nil + }, + StaticPodsConfigPath: staticPodsConfigPath, + PodLW: cache.NewListWatchFromClient(apiclient, "pods", api.NamespaceAll, + fields.OneTermEqualSelector(client.PodHost, s.HostnameOverride), + ), + }) + + // initialize driver and initialize the executor with it + dconfig := bindings.DriverConfig{ + Executor: exec, + HostnameOverride: s.HostnameOverride, + BindingAddress: s.Address, + } + driver, err := bindings.NewMesosExecutorDriver(dconfig) + if err != nil { + return fmt.Errorf("failed to create executor driver: %v", err) + } + log.V(2).Infof("Initialize executor driver...") + exec.Init(driver) + + // start the driver + go func() { + if _, err := driver.Run(); err != nil { + log.Fatalf("executor driver failed: %v", err) + } + log.Info("executor Run completed") + }() + + return nil +} + +// Run runs the specified KubeletExecutorServer. +func (s *KubeletExecutorServer) runKubelet(execUpdates <-chan kubetypes.PodUpdate, kubeletFinished chan<- struct{}, staticPodsConfigPath string) error { + // empty string for the docker and system containers (= cgroup paths). This + // stops the kubelet taking any control over other system processes. + s.SystemContainer = "" + s.DockerDaemonContainer = "" + + oomAdjuster := oom.NewOOMAdjuster() + if err := oomAdjuster.ApplyOOMScoreAdj(0, s.OOMScoreAdj); err != nil { + log.Info(err) + } + + dockerClient := dockertools.ConnectToDockerOrDie(s.DockerEndpoint) + + // create apiserver client + var apiclient *client.Client + clientConfig, err := s.CreateAPIServerClientConfig() + if err == nil { + apiclient, err = client.New(clientConfig) + } + if err != nil { + // required for k8sm since we need to send api.Binding information back to the apiserver + log.Fatalf("No API client: %v", err) + } log.Infof("Using root directory: %v", s.RootDirectory) credentialprovider.SetPreferredDockercfgPath(s.RootDirectory) @@ -138,6 +204,15 @@ func (s *KubeletExecutorServer) Run(hks hyperkube.Interface, _ []string) error { RootFreeDiskMB: s.LowDiskSpaceThresholdMB, } + manifestURLHeader := make(http.Header) + if s.ManifestURLHeader != "" { + pieces := strings.Split(s.ManifestURLHeader, ":") + if len(pieces) != 2 { + return fmt.Errorf("manifest-url-header must have a single ':' key-value separator, got %q", s.ManifestURLHeader) + } + manifestURLHeader.Set(pieces[0], pieces[1]) + } + //TODO(jdef) intentionally NOT initializing a cloud provider here since: //(a) the kubelet doesn't actually use it //(b) we don't need to create N-kubelet connections to zookeeper for no good reason @@ -181,75 +256,6 @@ func (s *KubeletExecutorServer) Run(hks hyperkube.Interface, _ []string) error { log.Warningf("Unknown Docker exec handler %q; defaulting to native", s.DockerExecHandlerName) dockerExecHandler = &dockertools.NativeExecHandler{} } - dockerClient := dockertools.ConnectToDockerOrDie(s.DockerEndpoint) - - //TODO(jdef) either configure Watch here with something useful, or else - // get rid of it from executor.Config - kubeletFinished := make(chan struct{}) - execUpdates := make(chan kubetypes.PodUpdate, 1) - exec := executor.New(executor.Config{ - Updates: execUpdates, - APIClient: apiclient, - Docker: dockerClient, - SuicideTimeout: s.SuicideTimeout, - KubeletFinished: kubeletFinished, - ExitFunc: os.Exit, - PodStatusFunc: func(pod *api.Pod) (*api.PodStatus, error) { - s.kletLock.Lock() - defer s.kletLock.Unlock() - - if s.klet == nil { - return nil, fmt.Errorf("PodStatucFunc called before kubelet is initialized") - } - - status, err := s.klet.GetRuntime().GetPodStatus(pod) - if err != nil { - return nil, err - } - - status.Phase = kubelet.GetPhase(&pod.Spec, status.ContainerStatuses) - hostIP, err := s.klet.GetHostIP() - if err != nil { - log.Errorf("Cannot get host IP: %v", err) - } else { - status.HostIP = hostIP.String() - } - return status, nil - }, - StaticPodsConfigPath: staticPodsConfigPath, - }) - - manifestURLHeader := make(http.Header) - if s.ManifestURLHeader != "" { - pieces := strings.Split(s.ManifestURLHeader, ":") - if len(pieces) != 2 { - return fmt.Errorf("manifest-url-header must have a single ':' key-value separator, got %q", s.ManifestURLHeader) - } - manifestURLHeader.Set(pieces[0], pieces[1]) - } - - // initialize driver and initialize the executor with it - dconfig := bindings.DriverConfig{ - Executor: exec, - HostnameOverride: s.HostnameOverride, - BindingAddress: s.Address, - } - driver, err := bindings.NewMesosExecutorDriver(dconfig) - if err != nil { - log.Fatalf("failed to create executor driver: %v", err) - } - log.V(2).Infof("Initialize executor driver...") - exec.Init(driver) - - // start the driver - go func() { - if _, err := driver.Run(); err != nil { - log.Fatalf("executor driver failed: %v", err) - } - log.Info("executor Run completed") - }() - - <-exec.InitiallyRegistered() // prepare kubelet kcfg := app.KubeletConfig{ @@ -335,6 +341,37 @@ func (s *KubeletExecutorServer) Run(hks hyperkube.Interface, _ []string) error { }, 5*time.Second, util.NeverStop) } + return nil +} + +// Run runs the specified KubeletExecutorServer. +func (s *KubeletExecutorServer) Run(hks hyperkube.Interface, _ []string) error { + rand.Seed(time.Now().UTC().UnixNano()) + + // create shared channels + kubeletFinished := make(chan struct{}) + execUpdates := make(chan kubetypes.PodUpdate, 1) + + // create static pods directory + staticPodsConfigPath := filepath.Join(s.RootDirectory, "static-pods") + err := os.Mkdir(staticPodsConfigPath, 0755) + if err != nil { + return err + } + + // start executor + err = s.runExecutor(execUpdates, kubeletFinished, staticPodsConfigPath) + if err != nil { + return err + } + + // start kubelet + err = s.runKubelet(execUpdates, kubeletFinished, staticPodsConfigPath) + if err != nil { + close(kubeletFinished) // tell executor + return err + } + // block until executor is shut down or commits shutdown select {} } @@ -531,3 +568,4 @@ func (kl *kubeletExecutor) Run(mergedUpdates <-chan kubetypes.PodUpdate) { // Force kubelet to delete all pods. kl.HandlePodDeletions(kl.GetPods()) } +