Decouple startup of executor and kubelet

This commit is contained in:
Dr. Stefan Schimanski 2015-09-23 10:16:59 +02:00
parent a60df400fd
commit dd5bafdba5
3 changed files with 167 additions and 146 deletions

View File

@ -87,7 +87,7 @@ func (s *stateType) transitionTo(to stateType, unless ...stateType) bool {
type kuberTask struct {
mesosTaskInfo *mesos.TaskInfo
podName string
podName string // empty until pod is sent to kubelet and registed in KubernetesExecutor.pods
}
type podStatusFunc func() (*api.PodStatus, error)
@ -102,14 +102,12 @@ type KubernetesExecutor struct {
lock sync.Mutex
client *client.Client
terminate chan struct{} // signals that the executor should shutdown
registered chan struct{} // closed when registerd
outgoing chan func() (mesos.Status, error) // outgoing queue to the mesos driver
dockerClient dockertools.DockerInterface
suicideWatch suicideWatcher
suicideTimeout time.Duration
shutdownAlert func() // invoked just prior to executor shutdown
kubeletFinished <-chan struct{} // signals that kubelet Run() died
initialRegistration sync.Once
exitFunc func(int)
podStatusFunc func(*api.Pod) (*api.PodStatus, error)
staticPodsConfigPath string
@ -152,7 +150,6 @@ func New(config Config) *KubernetesExecutor {
shutdownAlert: config.ShutdownAlert,
exitFunc: config.ExitFunc,
podStatusFunc: config.PodStatusFunc,
registered: make(chan struct{}),
staticPodsConfigPath: config.StaticPodsConfigPath,
launchGracePeriod: config.LaunchGracePeriod,
}
@ -182,12 +179,6 @@ func New(config Config) *KubernetesExecutor {
return k
}
// InitiallyRegistered returns a channel which is closed when the executor is
// registered with the Mesos master.
func (k *KubernetesExecutor) InitiallyRegistered() <-chan struct{} {
return k.registered
}
func (k *KubernetesExecutor) Init(driver bindings.ExecutorDriver) {
k.killKubeletContainers()
k.resetSuicideWatch(driver)
@ -206,6 +197,15 @@ func (k *KubernetesExecutor) isDone() bool {
}
}
// sendPodUpdate assumes that caller is holding state lock; returns true when update is sent otherwise false
func (k *KubernetesExecutor) sendPodUpdate(u *kubetypes.PodUpdate) bool {
if k.isDone() {
return false
}
k.updateChan <- *u
return true
}
// Registered is called when the executor is successfully registered with the slave.
func (k *KubernetesExecutor) Registered(driver bindings.ExecutorDriver,
executorInfo *mesos.ExecutorInfo, frameworkInfo *mesos.FrameworkInfo, slaveInfo *mesos.SlaveInfo) {
@ -229,12 +229,13 @@ func (k *KubernetesExecutor) Registered(driver bindings.ExecutorDriver,
}
}
k.updateChan <- kubetypes.PodUpdate{
// emit an empty update to allow the mesos "source" to be marked as seen
k.lock.Lock()
defer k.lock.Unlock()
k.sendPodUpdate(&kubetypes.PodUpdate{
Pods: []*api.Pod{},
Op: kubetypes.SET,
}
close(k.registered)
})
}
// Reregistered is called when the executor is successfully re-registered with the slave.
@ -254,17 +255,6 @@ func (k *KubernetesExecutor) Reregistered(driver bindings.ExecutorDriver, slaveI
log.Errorf("cannot update node labels: %v", err)
}
}
// emit an empty update to allow the mesos "source" to be marked as seen
k.lock.Lock()
defer k.lock.Unlock()
if k.isDone() {
return
}
k.updateChan <- kubetypes.PodUpdate{
Pods: []*api.Pod{},
Op: kubetypes.SET,
}
}
// initializeStaticPodsSource unzips the data slice into the static-pods directory
@ -378,14 +368,10 @@ func (k *KubernetesExecutor) handleChangedApiserverPod(pod *api.Pod) {
oldPod.DeletionTimestamp = pod.DeletionTimestamp
oldPod.DeletionGracePeriodSeconds = pod.DeletionGracePeriodSeconds
if k.isDone() {
return
}
update := kubetypes.PodUpdate{
k.sendPodUpdate(&kubetypes.PodUpdate{
Op: kubetypes.UPDATE,
Pods: []*api.Pod{oldPod},
}
k.updateChan <- update
})
}
}
}
@ -538,7 +524,7 @@ func (k *KubernetesExecutor) launchTask(driver bindings.ExecutorDriver, taskId s
k.lock.Lock()
defer k.lock.Unlock()
// Add the task.
// find task
task, found := k.tasks[taskId]
if !found {
log.V(1).Infof("task %v not found, probably killed: aborting launch, reporting lost", taskId)
@ -548,21 +534,23 @@ func (k *KubernetesExecutor) launchTask(driver bindings.ExecutorDriver, taskId s
//TODO(jdef) check for duplicate pod name, if found send TASK_ERROR
// from here on, we need to delete containers associated with the task
// upon it going into a terminal state
// send the new pod to the kubelet which will spin it up
ok := k.sendPodUpdate(&kubetypes.PodUpdate{
Op: kubetypes.ADD,
Pods: []*api.Pod{pod},
})
if !ok {
return // executor is terminating, cancel launch
}
// mark task as sent by setting the podName and register the sent pod
task.podName = podFullName
k.pods[podFullName] = pod
// send the new pod to the kubelet which will spin it up
if k.isDone() {
return
}
update := kubetypes.PodUpdate{
Op: kubetypes.ADD,
Pods: []*api.Pod{pod},
}
k.updateChan <- update
// From here on, we need to delete containers associated with the task upon
// it going into a terminal state.
// report task is starting to scheduler
statusUpdate := &mesos.TaskStatus{
TaskId: mutil.NewTaskID(taskId),
State: mesos.TaskState_TASK_STARTING.Enum(),
@ -575,7 +563,6 @@ func (k *KubernetesExecutor) launchTask(driver bindings.ExecutorDriver, taskId s
psf := podStatusFunc(func() (*api.PodStatus, error) {
return k.podStatusFunc(pod)
})
go k._launchTask(driver, taskId, podFullName, psf)
}
@ -751,14 +738,10 @@ func (k *KubernetesExecutor) removePodTask(driver bindings.ExecutorDriver, tid,
delete(k.pods, pid)
// tell the kubelet to remove the pod
if k.isDone() {
return
}
update := kubetypes.PodUpdate{
k.sendPodUpdate(&kubetypes.PodUpdate{
Op: kubetypes.REMOVE,
Pods: []*api.Pod{pod},
}
k.updateChan <- update
})
}
// TODO(jdef): ensure that the update propagates, perhaps return a signal chan?
k.sendStatus(driver, newStatus(mutil.NewTaskID(tid), state, reason))

View File

@ -23,8 +23,8 @@ import (
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/mock"
"k8s.io/kubernetes/pkg/api"
kubetypes "k8s.io/kubernetes/pkg/kubelet/types"
"k8s.io/kubernetes/pkg/kubelet/dockertools"
kubetypes "k8s.io/kubernetes/pkg/kubelet/types"
)
type MockExecutorDriver struct {

View File

@ -35,8 +35,10 @@ import (
"k8s.io/kubernetes/contrib/mesos/pkg/executor/config"
"k8s.io/kubernetes/contrib/mesos/pkg/hyperkube"
"k8s.io/kubernetes/pkg/api"
"k8s.io/kubernetes/pkg/client/cache"
client "k8s.io/kubernetes/pkg/client/unversioned"
"k8s.io/kubernetes/pkg/credentialprovider"
"k8s.io/kubernetes/pkg/fields"
"k8s.io/kubernetes/pkg/healthz"
"k8s.io/kubernetes/pkg/kubelet"
"k8s.io/kubernetes/pkg/kubelet/cadvisor"
@ -61,7 +63,8 @@ type KubeletExecutorServer struct {
*app.KubeletServer
SuicideTimeout time.Duration
LaunchGracePeriod time.Duration
kletLock sync.Mutex
kletLock sync.Mutex // TODO(sttts): remove necessity to access the kubelet from the executor
klet *kubelet.Kubelet
}
@ -87,27 +90,7 @@ func (s *KubeletExecutorServer) AddFlags(fs *pflag.FlagSet) {
fs.DurationVar(&s.LaunchGracePeriod, "mesos-launch-grace-period", s.LaunchGracePeriod, "Launch grace period after which launching tasks will be cancelled. Zero disables launch cancellation.")
}
// Run runs the specified KubeletExecutorServer.
func (s *KubeletExecutorServer) Run(hks hyperkube.Interface, _ []string) error {
rand.Seed(time.Now().UTC().UnixNano())
oomAdjuster := oom.NewOOMAdjuster()
if err := oomAdjuster.ApplyOOMScoreAdj(0, s.OOMScoreAdj); err != nil {
log.Info(err)
}
// empty string for the docker and system containers (= cgroup paths). This
// stops the kubelet taking any control over other system processes.
s.SystemContainer = ""
s.DockerDaemonContainer = ""
// create static pods directory
staticPodsConfigPath := filepath.Join(s.RootDirectory, "static-pods")
err := os.Mkdir(staticPodsConfigPath, 0755)
if err != nil {
return err
}
func (s *KubeletExecutorServer) runExecutor(execUpdates chan<- kubetypes.PodUpdate, kubeletFinished <-chan struct{}, staticPodsConfigPath string) error {
// create apiserver client
var apiclient *client.Client
clientConfig, err := s.CreateAPIServerClientConfig()
@ -119,6 +102,89 @@ func (s *KubeletExecutorServer) Run(hks hyperkube.Interface, _ []string) error {
// back to the apiserver
log.Fatalf("No API client: %v", err)
}
exec := executor.New(executor.Config{
Updates: execUpdates,
APIClient: apiclient,
Docker: dockertools.ConnectToDockerOrDie(s.DockerEndpoint),
SuicideTimeout: s.SuicideTimeout,
KubeletFinished: kubeletFinished,
ExitFunc: os.Exit,
PodStatusFunc: func(pod *api.Pod) (*api.PodStatus, error) {
s.kletLock.Lock()
defer s.kletLock.Unlock()
if s.klet == nil {
return nil, fmt.Errorf("PodStatucFunc called before kubelet is initialized")
}
status, err := s.klet.GetRuntime().GetPodStatus(pod)
if err != nil {
return nil, err
}
status.Phase = kubelet.GetPhase(&pod.Spec, status.ContainerStatuses)
hostIP, err := s.klet.GetHostIP()
if err != nil {
log.Errorf("Cannot get host IP: %v", err)
} else {
status.HostIP = hostIP.String()
}
return status, nil
},
StaticPodsConfigPath: staticPodsConfigPath,
PodLW: cache.NewListWatchFromClient(apiclient, "pods", api.NamespaceAll,
fields.OneTermEqualSelector(client.PodHost, s.HostnameOverride),
),
})
// initialize driver and initialize the executor with it
dconfig := bindings.DriverConfig{
Executor: exec,
HostnameOverride: s.HostnameOverride,
BindingAddress: s.Address,
}
driver, err := bindings.NewMesosExecutorDriver(dconfig)
if err != nil {
return fmt.Errorf("failed to create executor driver: %v", err)
}
log.V(2).Infof("Initialize executor driver...")
exec.Init(driver)
// start the driver
go func() {
if _, err := driver.Run(); err != nil {
log.Fatalf("executor driver failed: %v", err)
}
log.Info("executor Run completed")
}()
return nil
}
// Run runs the specified KubeletExecutorServer.
func (s *KubeletExecutorServer) runKubelet(execUpdates <-chan kubetypes.PodUpdate, kubeletFinished chan<- struct{}, staticPodsConfigPath string) error {
// empty string for the docker and system containers (= cgroup paths). This
// stops the kubelet taking any control over other system processes.
s.SystemContainer = ""
s.DockerDaemonContainer = ""
oomAdjuster := oom.NewOOMAdjuster()
if err := oomAdjuster.ApplyOOMScoreAdj(0, s.OOMScoreAdj); err != nil {
log.Info(err)
}
dockerClient := dockertools.ConnectToDockerOrDie(s.DockerEndpoint)
// create apiserver client
var apiclient *client.Client
clientConfig, err := s.CreateAPIServerClientConfig()
if err == nil {
apiclient, err = client.New(clientConfig)
}
if err != nil {
// required for k8sm since we need to send api.Binding information back to the apiserver
log.Fatalf("No API client: %v", err)
}
log.Infof("Using root directory: %v", s.RootDirectory)
credentialprovider.SetPreferredDockercfgPath(s.RootDirectory)
@ -138,6 +204,15 @@ func (s *KubeletExecutorServer) Run(hks hyperkube.Interface, _ []string) error {
RootFreeDiskMB: s.LowDiskSpaceThresholdMB,
}
manifestURLHeader := make(http.Header)
if s.ManifestURLHeader != "" {
pieces := strings.Split(s.ManifestURLHeader, ":")
if len(pieces) != 2 {
return fmt.Errorf("manifest-url-header must have a single ':' key-value separator, got %q", s.ManifestURLHeader)
}
manifestURLHeader.Set(pieces[0], pieces[1])
}
//TODO(jdef) intentionally NOT initializing a cloud provider here since:
//(a) the kubelet doesn't actually use it
//(b) we don't need to create N-kubelet connections to zookeeper for no good reason
@ -181,75 +256,6 @@ func (s *KubeletExecutorServer) Run(hks hyperkube.Interface, _ []string) error {
log.Warningf("Unknown Docker exec handler %q; defaulting to native", s.DockerExecHandlerName)
dockerExecHandler = &dockertools.NativeExecHandler{}
}
dockerClient := dockertools.ConnectToDockerOrDie(s.DockerEndpoint)
//TODO(jdef) either configure Watch here with something useful, or else
// get rid of it from executor.Config
kubeletFinished := make(chan struct{})
execUpdates := make(chan kubetypes.PodUpdate, 1)
exec := executor.New(executor.Config{
Updates: execUpdates,
APIClient: apiclient,
Docker: dockerClient,
SuicideTimeout: s.SuicideTimeout,
KubeletFinished: kubeletFinished,
ExitFunc: os.Exit,
PodStatusFunc: func(pod *api.Pod) (*api.PodStatus, error) {
s.kletLock.Lock()
defer s.kletLock.Unlock()
if s.klet == nil {
return nil, fmt.Errorf("PodStatucFunc called before kubelet is initialized")
}
status, err := s.klet.GetRuntime().GetPodStatus(pod)
if err != nil {
return nil, err
}
status.Phase = kubelet.GetPhase(&pod.Spec, status.ContainerStatuses)
hostIP, err := s.klet.GetHostIP()
if err != nil {
log.Errorf("Cannot get host IP: %v", err)
} else {
status.HostIP = hostIP.String()
}
return status, nil
},
StaticPodsConfigPath: staticPodsConfigPath,
})
manifestURLHeader := make(http.Header)
if s.ManifestURLHeader != "" {
pieces := strings.Split(s.ManifestURLHeader, ":")
if len(pieces) != 2 {
return fmt.Errorf("manifest-url-header must have a single ':' key-value separator, got %q", s.ManifestURLHeader)
}
manifestURLHeader.Set(pieces[0], pieces[1])
}
// initialize driver and initialize the executor with it
dconfig := bindings.DriverConfig{
Executor: exec,
HostnameOverride: s.HostnameOverride,
BindingAddress: s.Address,
}
driver, err := bindings.NewMesosExecutorDriver(dconfig)
if err != nil {
log.Fatalf("failed to create executor driver: %v", err)
}
log.V(2).Infof("Initialize executor driver...")
exec.Init(driver)
// start the driver
go func() {
if _, err := driver.Run(); err != nil {
log.Fatalf("executor driver failed: %v", err)
}
log.Info("executor Run completed")
}()
<-exec.InitiallyRegistered()
// prepare kubelet
kcfg := app.KubeletConfig{
@ -335,6 +341,37 @@ func (s *KubeletExecutorServer) Run(hks hyperkube.Interface, _ []string) error {
}, 5*time.Second, util.NeverStop)
}
return nil
}
// Run runs the specified KubeletExecutorServer.
func (s *KubeletExecutorServer) Run(hks hyperkube.Interface, _ []string) error {
rand.Seed(time.Now().UTC().UnixNano())
// create shared channels
kubeletFinished := make(chan struct{})
execUpdates := make(chan kubetypes.PodUpdate, 1)
// create static pods directory
staticPodsConfigPath := filepath.Join(s.RootDirectory, "static-pods")
err := os.Mkdir(staticPodsConfigPath, 0755)
if err != nil {
return err
}
// start executor
err = s.runExecutor(execUpdates, kubeletFinished, staticPodsConfigPath)
if err != nil {
return err
}
// start kubelet
err = s.runKubelet(execUpdates, kubeletFinished, staticPodsConfigPath)
if err != nil {
close(kubeletFinished) // tell executor
return err
}
// block until executor is shut down or commits shutdown
select {}
}
@ -531,3 +568,4 @@ func (kl *kubeletExecutor) Run(mergedUpdates <-chan kubetypes.PodUpdate) {
// Force kubelet to delete all pods.
kl.HandlePodDeletions(kl.GetPods())
}