Merge pull request #13036 from mesosphere/decouple-executor-and-kubelet

Auto commit by PR queue bot
This commit is contained in:
k8s-merge-robot 2015-10-10 08:03:37 -07:00
commit dfbad569fd
6 changed files with 366 additions and 558 deletions

View File

@ -19,7 +19,6 @@ package executor
import (
"encoding/json"
"fmt"
"net"
"strings"
"sync"
"sync/atomic"
@ -40,7 +39,6 @@ import (
"k8s.io/kubernetes/pkg/client/cache"
client "k8s.io/kubernetes/pkg/client/unversioned"
"k8s.io/kubernetes/pkg/controller/framework"
"k8s.io/kubernetes/pkg/kubelet"
"k8s.io/kubernetes/pkg/kubelet/container"
"k8s.io/kubernetes/pkg/kubelet/dockertools"
kubetypes "k8s.io/kubernetes/pkg/kubelet/types"
@ -89,57 +87,45 @@ func (s *stateType) transitionTo(to stateType, unless ...stateType) bool {
type kuberTask struct {
mesosTaskInfo *mesos.TaskInfo
podName string
podName string // empty until pod is sent to kubelet and registed in KubernetesExecutor.pods
}
type podStatusFunc func() (*api.PodStatus, error)
// KubeletInterface consists of the kubelet.Kubelet API's that we actually use
type KubeletInterface interface {
GetHostIP() (net.IP, error)
}
// KubernetesExecutor is an mesos executor that runs pods
// in a minion machine.
type KubernetesExecutor struct {
kl KubeletInterface // the kubelet instance.
updateChan chan<- interface{} // to send pod config updates to the kubelet
updateChan chan<- kubetypes.PodUpdate // sent to the kubelet, closed on shutdown
state stateType
tasks map[string]*kuberTask
pods map[string]*api.Pod
lock sync.RWMutex
sourcename string
lock sync.Mutex
client *client.Client
done chan struct{} // signals shutdown
terminate chan struct{} // signals that the executor should shutdown
outgoing chan func() (mesos.Status, error) // outgoing queue to the mesos driver
dockerClient dockertools.DockerInterface
suicideWatch suicideWatcher
suicideTimeout time.Duration
shutdownAlert func() // invoked just prior to executor shutdown
kubeletFinished <-chan struct{} // signals that kubelet Run() died
initialRegistration sync.Once
exitFunc func(int)
podStatusFunc func(KubeletInterface, *api.Pod) (*api.PodStatus, error)
staticPodsConfig []byte
podStatusFunc func(*api.Pod) (*api.PodStatus, error)
staticPodsConfigPath string
initialRegComplete chan struct{}
podController *framework.Controller
launchGracePeriod time.Duration
}
type Config struct {
Kubelet KubeletInterface
Updates chan<- interface{} // to send pod config updates to the kubelet
SourceName string
Updates chan<- kubetypes.PodUpdate // to send pod config updates to the kubelet
APIClient *client.Client
Docker dockertools.DockerInterface
ShutdownAlert func()
SuicideTimeout time.Duration
KubeletFinished <-chan struct{} // signals that kubelet Run() died
ExitFunc func(int)
PodStatusFunc func(KubeletInterface, *api.Pod) (*api.PodStatus, error)
PodStatusFunc func(*api.Pod) (*api.PodStatus, error)
StaticPodsConfigPath string
PodLW cache.ListerWatcher
PodLW cache.ListerWatcher // mandatory, otherwise initialiation will panic
LaunchGracePeriod time.Duration
}
@ -150,14 +136,12 @@ func (k *KubernetesExecutor) isConnected() bool {
// New creates a new kubernetes executor.
func New(config Config) *KubernetesExecutor {
k := &KubernetesExecutor{
kl: config.Kubelet,
updateChan: config.Updates,
state: disconnectedState,
tasks: make(map[string]*kuberTask),
pods: make(map[string]*api.Pod),
sourcename: config.SourceName,
client: config.APIClient,
done: make(chan struct{}),
terminate: make(chan struct{}),
outgoing: make(chan func() (mesos.Status, error), 1024),
dockerClient: config.Docker,
suicideTimeout: config.SuicideTimeout,
@ -166,12 +150,15 @@ func New(config Config) *KubernetesExecutor {
shutdownAlert: config.ShutdownAlert,
exitFunc: config.ExitFunc,
podStatusFunc: config.PodStatusFunc,
initialRegComplete: make(chan struct{}),
staticPodsConfigPath: config.StaticPodsConfigPath,
launchGracePeriod: config.LaunchGracePeriod,
}
// watch pods from the given pod ListWatch
if config.PodLW == nil {
// fail early to make debugging easier
panic("cannot create executor with nil PodLW")
}
_, k.podController = framework.NewInformer(config.PodLW, &api.Pod{}, podRelistPeriod, &framework.ResourceEventHandlerFuncs{
AddFunc: func(obj interface{}) {
pod := obj.(*api.Pod)
@ -196,24 +183,29 @@ func (k *KubernetesExecutor) Init(driver bindings.ExecutorDriver) {
k.killKubeletContainers()
k.resetSuicideWatch(driver)
go k.podController.Run(k.done)
go k.podController.Run(k.terminate)
go k.sendLoop()
//TODO(jdef) monitor kubeletFinished and shutdown if it happens
}
func (k *KubernetesExecutor) Done() <-chan struct{} {
return k.done
}
func (k *KubernetesExecutor) isDone() bool {
select {
case <-k.done:
case <-k.terminate:
return true
default:
return false
}
}
// sendPodUpdate assumes that caller is holding state lock; returns true when update is sent otherwise false
func (k *KubernetesExecutor) sendPodUpdate(u *kubetypes.PodUpdate) bool {
if k.isDone() {
return false
}
k.updateChan <- *u
return true
}
// Registered is called when the executor is successfully registered with the slave.
func (k *KubernetesExecutor) Registered(driver bindings.ExecutorDriver,
executorInfo *mesos.ExecutorInfo, frameworkInfo *mesos.FrameworkInfo, slaveInfo *mesos.SlaveInfo) {
@ -227,7 +219,7 @@ func (k *KubernetesExecutor) Registered(driver bindings.ExecutorDriver,
}
if executorInfo != nil && executorInfo.Data != nil {
k.staticPodsConfig = executorInfo.Data
k.initializeStaticPodsSource(executorInfo.Data)
}
if slaveInfo != nil {
@ -237,7 +229,13 @@ func (k *KubernetesExecutor) Registered(driver bindings.ExecutorDriver,
}
}
k.initialRegistration.Do(k.onInitialRegistration)
// emit an empty update to allow the mesos "source" to be marked as seen
k.lock.Lock()
defer k.lock.Unlock()
k.sendPodUpdate(&kubetypes.PodUpdate{
Pods: []*api.Pod{},
Op: kubetypes.SET,
})
}
// Reregistered is called when the executor is successfully re-registered with the slave.
@ -257,39 +255,16 @@ func (k *KubernetesExecutor) Reregistered(driver bindings.ExecutorDriver, slaveI
log.Errorf("cannot update node labels: %v", err)
}
}
k.initialRegistration.Do(k.onInitialRegistration)
}
func (k *KubernetesExecutor) onInitialRegistration() {
defer close(k.initialRegComplete)
// emit an empty update to allow the mesos "source" to be marked as seen
k.updateChan <- kubetypes.PodUpdate{
Pods: []*api.Pod{},
Op: kubetypes.SET,
Source: k.sourcename,
}
}
// InitializeStaticPodsSource blocks until initial regstration is complete and
// then creates a static pod source using the given factory func.
func (k *KubernetesExecutor) InitializeStaticPodsSource(sourceFactory func()) {
<-k.initialRegComplete
if k.staticPodsConfig == nil {
return
}
// initializeStaticPodsSource unzips the data slice into the static-pods directory
func (k *KubernetesExecutor) initializeStaticPodsSource(data []byte) {
log.V(2).Infof("extracting static pods config to %s", k.staticPodsConfigPath)
err := archive.UnzipDir(k.staticPodsConfig, k.staticPodsConfigPath)
err := archive.UnzipDir(data, k.staticPodsConfigPath)
if err != nil {
log.Errorf("Failed to extract static pod config: %v", err)
return
}
log.V(2).Infof("initializing static pods source factory, configured at path %q", k.staticPodsConfigPath)
sourceFactory()
}
// Disconnected is called when the executor is disconnected from the slave.
@ -393,11 +368,10 @@ func (k *KubernetesExecutor) handleChangedApiserverPod(pod *api.Pod) {
oldPod.DeletionTimestamp = pod.DeletionTimestamp
oldPod.DeletionGracePeriodSeconds = pod.DeletionGracePeriodSeconds
update := kubetypes.PodUpdate{
k.sendPodUpdate(&kubetypes.PodUpdate{
Op: kubetypes.UPDATE,
Pods: []*api.Pod{oldPod},
}
k.updateChan <- update
})
}
}
}
@ -550,7 +524,7 @@ func (k *KubernetesExecutor) launchTask(driver bindings.ExecutorDriver, taskId s
k.lock.Lock()
defer k.lock.Unlock()
// Add the task.
// find task
task, found := k.tasks[taskId]
if !found {
log.V(1).Infof("task %v not found, probably killed: aborting launch, reporting lost", taskId)
@ -560,18 +534,23 @@ func (k *KubernetesExecutor) launchTask(driver bindings.ExecutorDriver, taskId s
//TODO(jdef) check for duplicate pod name, if found send TASK_ERROR
// from here on, we need to delete containers associated with the task
// upon it going into a terminal state
// send the new pod to the kubelet which will spin it up
ok := k.sendPodUpdate(&kubetypes.PodUpdate{
Op: kubetypes.ADD,
Pods: []*api.Pod{pod},
})
if !ok {
return // executor is terminating, cancel launch
}
// mark task as sent by setting the podName and register the sent pod
task.podName = podFullName
k.pods[podFullName] = pod
// send the new pod to the kubelet which will spin it up
update := kubetypes.PodUpdate{
Op: kubetypes.ADD,
Pods: []*api.Pod{pod},
}
k.updateChan <- update
// From here on, we need to delete containers associated with the task upon
// it going into a terminal state.
// report task is starting to scheduler
statusUpdate := &mesos.TaskStatus{
TaskId: mutil.NewTaskID(taskId),
State: mesos.TaskState_TASK_STARTING.Enum(),
@ -582,20 +561,8 @@ func (k *KubernetesExecutor) launchTask(driver bindings.ExecutorDriver, taskId s
// Delay reporting 'task running' until container is up.
psf := podStatusFunc(func() (*api.PodStatus, error) {
status, err := k.podStatusFunc(k.kl, pod)
if err != nil {
return nil, err
}
status.Phase = kubelet.GetPhase(&pod.Spec, status.ContainerStatuses)
hostIP, err := k.kl.GetHostIP()
if err != nil {
log.Errorf("Cannot get host IP: %v", err)
} else {
status.HostIP = hostIP.String()
}
return status, nil
return k.podStatusFunc(pod)
})
go k._launchTask(driver, taskId, podFullName, psf)
}
@ -771,11 +738,10 @@ func (k *KubernetesExecutor) removePodTask(driver bindings.ExecutorDriver, tid,
delete(k.pods, pid)
// tell the kubelet to remove the pod
update := kubetypes.PodUpdate{
k.sendPodUpdate(&kubetypes.PodUpdate{
Op: kubetypes.REMOVE,
Pods: []*api.Pod{pod},
}
k.updateChan <- update
})
}
// TODO(jdef): ensure that the update propagates, perhaps return a signal chan?
k.sendStatus(driver, newStatus(mutil.NewTaskID(tid), state, reason))
@ -828,7 +794,8 @@ func (k *KubernetesExecutor) doShutdown(driver bindings.ExecutorDriver) {
(&k.state).transitionTo(terminalState)
// signal to all listeners that this KubeletExecutor is done!
close(k.done)
close(k.terminate)
close(k.updateChan)
if k.shutdownAlert != nil {
func() {
@ -902,7 +869,7 @@ func newStatus(taskId *mesos.TaskID, state mesos.TaskState, message string) *mes
func (k *KubernetesExecutor) sendStatus(driver bindings.ExecutorDriver, status *mesos.TaskStatus) {
select {
case <-k.done:
case <-k.terminate:
default:
k.outgoing <- func() (mesos.Status, error) { return driver.SendStatusUpdate(status) }
}
@ -910,7 +877,7 @@ func (k *KubernetesExecutor) sendStatus(driver bindings.ExecutorDriver, status *
func (k *KubernetesExecutor) sendFrameworkMessage(driver bindings.ExecutorDriver, msg string) {
select {
case <-k.done:
case <-k.terminate:
default:
k.outgoing <- func() (mesos.Status, error) { return driver.SendFrameworkMessage(msg) }
}
@ -920,12 +887,12 @@ func (k *KubernetesExecutor) sendLoop() {
defer log.V(1).Info("sender loop exiting")
for {
select {
case <-k.done:
case <-k.terminate:
return
default:
if !k.isConnected() {
select {
case <-k.done:
case <-k.terminate:
case <-time.After(1 * time.Second):
}
continue
@ -945,7 +912,7 @@ func (k *KubernetesExecutor) sendLoop() {
}
// attempt to re-queue the sender
select {
case <-k.done:
case <-k.terminate:
case k.outgoing <- sender:
}
}

View File

@ -45,6 +45,7 @@ import (
"k8s.io/kubernetes/pkg/kubelet/dockertools"
kubetypes "k8s.io/kubernetes/pkg/kubelet/types"
"k8s.io/kubernetes/pkg/runtime"
"k8s.io/kubernetes/pkg/util"
"k8s.io/kubernetes/pkg/watch"
"github.com/mesos/mesos-go/mesosproto"
@ -57,12 +58,7 @@ import (
// after Register is called.
func TestExecutorRegister(t *testing.T) {
mockDriver := &MockExecutorDriver{}
updates := make(chan interface{}, 1024)
executor := New(Config{
Docker: dockertools.ConnectToDockerOrDie("fake://"),
Updates: updates,
SourceName: "executor_test",
})
executor, updates := NewTestKubernetesExecutor()
executor.Init(mockDriver)
executor.Registered(mockDriver, nil, nil, nil)
@ -70,18 +66,14 @@ func TestExecutorRegister(t *testing.T) {
initialPodUpdate := kubetypes.PodUpdate{
Pods: []*api.Pod{},
Op: kubetypes.SET,
Source: executor.sourcename,
}
receivedInitialPodUpdate := false
select {
case m := <-updates:
update, ok := m.(kubetypes.PodUpdate)
if ok {
case update := <-updates:
if reflect.DeepEqual(initialPodUpdate, update) {
receivedInitialPodUpdate = true
}
}
case <-time.After(time.Second):
case <-time.After(util.ForeverTestTimeout):
}
assert.Equal(t, true, receivedInitialPodUpdate,
"executor should have sent an initial PodUpdate "+
@ -95,7 +87,7 @@ func TestExecutorRegister(t *testing.T) {
// connected after a call to Disconnected has occurred.
func TestExecutorDisconnect(t *testing.T) {
mockDriver := &MockExecutorDriver{}
executor := NewTestKubernetesExecutor()
executor, _ := NewTestKubernetesExecutor()
executor.Init(mockDriver)
executor.Registered(mockDriver, nil, nil, nil)
@ -110,7 +102,7 @@ func TestExecutorDisconnect(t *testing.T) {
// after a connection problem happens, followed by a call to Reregistered.
func TestExecutorReregister(t *testing.T) {
mockDriver := &MockExecutorDriver{}
executor := NewTestKubernetesExecutor()
executor, _ := NewTestKubernetesExecutor()
executor.Init(mockDriver)
executor.Registered(mockDriver, nil, nil, nil)
@ -141,7 +133,7 @@ func TestExecutorLaunchAndKillTask(t *testing.T) {
defer testApiServer.server.Close()
mockDriver := &MockExecutorDriver{}
updates := make(chan interface{}, 1024)
updates := make(chan kubetypes.PodUpdate, 1024)
config := Config{
Docker: dockertools.ConnectToDockerOrDie("fake://"),
Updates: updates,
@ -149,11 +141,7 @@ func TestExecutorLaunchAndKillTask(t *testing.T) {
Host: testApiServer.server.URL,
Version: testapi.Default.Version(),
}),
Kubelet: &fakeKubelet{
Kubelet: &kubelet.Kubelet{},
hostIP: net.IPv4(127, 0, 0, 1),
},
PodStatusFunc: func(kl KubeletInterface, pod *api.Pod) (*api.PodStatus, error) {
PodStatusFunc: func(pod *api.Pod) (*api.PodStatus, error) {
return &api.PodStatus{
ContainerStatuses: []api.ContainerStatus{
{
@ -164,8 +152,10 @@ func TestExecutorLaunchAndKillTask(t *testing.T) {
},
},
Phase: api.PodRunning,
HostIP: "127.0.0.1",
}, nil
},
PodLW: &NewMockPodsListWatch(api.PodList{}).ListWatch,
}
executor := New(config)
@ -174,7 +164,7 @@ func TestExecutorLaunchAndKillTask(t *testing.T) {
select {
case <-updates:
case <-time.After(time.Second):
case <-time.After(util.ForeverTestTimeout):
t.Fatalf("Executor should send an initial update on Registration")
}
@ -204,7 +194,7 @@ func TestExecutorLaunchAndKillTask(t *testing.T) {
executor.LaunchTask(mockDriver, taskInfo)
assertext.EventuallyTrue(t, 5*time.Second, func() bool {
assertext.EventuallyTrue(t, util.ForeverTestTimeout, func() bool {
executor.lock.Lock()
defer executor.lock.Unlock()
return len(executor.tasks) == 1 && len(executor.pods) == 1
@ -212,12 +202,11 @@ func TestExecutorLaunchAndKillTask(t *testing.T) {
gotPodUpdate := false
select {
case m := <-updates:
update, ok := m.(kubetypes.PodUpdate)
if ok && len(update.Pods) == 1 {
case update := <-updates:
if len(update.Pods) == 1 {
gotPodUpdate = true
}
case <-time.After(time.Second):
case <-time.After(util.ForeverTestTimeout):
}
assert.Equal(t, true, gotPodUpdate,
"the executor should send an update about a new pod to "+
@ -227,7 +216,7 @@ func TestExecutorLaunchAndKillTask(t *testing.T) {
finished := kmruntime.After(statusUpdateCalls.Wait)
select {
case <-finished:
case <-time.After(5 * time.Second):
case <-time.After(util.ForeverTestTimeout):
t.Fatalf("timed out waiting for status update calls to finish")
}
@ -239,7 +228,7 @@ func TestExecutorLaunchAndKillTask(t *testing.T) {
executor.KillTask(mockDriver, taskInfo.TaskId)
assertext.EventuallyTrue(t, 5*time.Second, func() bool {
assertext.EventuallyTrue(t, util.ForeverTestTimeout, func() bool {
executor.lock.Lock()
defer executor.lock.Unlock()
return len(executor.tasks) == 0 && len(executor.pods) == 0
@ -249,7 +238,7 @@ func TestExecutorLaunchAndKillTask(t *testing.T) {
finished = kmruntime.After(statusUpdateCalls.Wait)
select {
case <-finished:
case <-time.After(5 * time.Second):
case <-time.After(util.ForeverTestTimeout):
t.Fatalf("timed out waiting for status update calls to finish")
}
mockDriver.AssertExpectations(t)
@ -307,16 +296,14 @@ func TestExecutorStaticPods(t *testing.T) {
defer os.RemoveAll(staticPodsConfigPath)
mockDriver := &MockExecutorDriver{}
updates := make(chan interface{}, 1024)
config := Config{
Docker: dockertools.ConnectToDockerOrDie("fake://"),
Updates: make(chan interface{}, 1), // allow kube-executor source to proceed past init
Updates: make(chan kubetypes.PodUpdate, 1), // allow kube-executor source to proceed past init
APIClient: client.NewOrDie(&client.Config{
Host: testApiServer.server.URL,
Version: testapi.Default.Version(),
}),
Kubelet: &kubelet.Kubelet{},
PodStatusFunc: func(kl KubeletInterface, pod *api.Pod) (*api.PodStatus, error) {
PodStatusFunc: func(pod *api.Pod) (*api.PodStatus, error) {
return &api.PodStatus{
ContainerStatuses: []api.ContainerStatus{
{
@ -330,12 +317,14 @@ func TestExecutorStaticPods(t *testing.T) {
}, nil
},
StaticPodsConfigPath: staticPodsConfigPath,
PodLW: &NewMockPodsListWatch(api.PodList{}).ListWatch,
}
executor := New(config)
// register static pod source
hostname := "h1"
go executor.InitializeStaticPodsSource(func() {
kconfig.NewSourceFile(staticPodsConfigPath, hostname, 1*time.Second, updates)
})
fileSourceUpdates := make(chan interface{}, 1024)
kconfig.NewSourceFile(staticPodsConfigPath, hostname, 1*time.Second, fileSourceUpdates)
// create ExecutorInfo with static pod zip in data field
executorInfo := mesosutil.NewExecutorInfo(
@ -350,14 +339,14 @@ func TestExecutorStaticPods(t *testing.T) {
// wait for static pod to start
seenPods := map[string]struct{}{}
timeout := time.After(time.Second)
timeout := time.After(util.ForeverTestTimeout)
defer mockDriver.AssertExpectations(t)
for {
// filter by PodUpdate type
select {
case <-timeout:
t.Fatalf("Executor should send pod updates for %v pods, only saw %v", expectedStaticPodsNum, len(seenPods))
case update, ok := <-updates:
case update, ok := <-fileSourceUpdates:
if !ok {
return
}
@ -391,16 +380,12 @@ func TestExecutorFrameworkMessage(t *testing.T) {
kubeletFinished := make(chan struct{})
config := Config{
Docker: dockertools.ConnectToDockerOrDie("fake://"),
Updates: make(chan interface{}, 1024),
Updates: make(chan kubetypes.PodUpdate, 1024),
APIClient: client.NewOrDie(&client.Config{
Host: testApiServer.server.URL,
Version: testapi.Default.Version(),
}),
Kubelet: &fakeKubelet{
Kubelet: &kubelet.Kubelet{},
hostIP: net.IPv4(127, 0, 0, 1),
},
PodStatusFunc: func(kl KubeletInterface, pod *api.Pod) (*api.PodStatus, error) {
PodStatusFunc: func(pod *api.Pod) (*api.PodStatus, error) {
return &api.PodStatus{
ContainerStatuses: []api.ContainerStatus{
{
@ -411,12 +396,14 @@ func TestExecutorFrameworkMessage(t *testing.T) {
},
},
Phase: api.PodRunning,
HostIP: "127.0.0.1",
}, nil
},
ShutdownAlert: func() {
close(kubeletFinished)
},
KubeletFinished: kubeletFinished,
PodLW: &NewMockPodsListWatch(api.PodList{}).ListWatch,
}
executor := New(config)
@ -452,7 +439,7 @@ func TestExecutorFrameworkMessage(t *testing.T) {
// when removing the task from k.tasks through the "task-lost:foo" message below.
select {
case <-called:
case <-time.After(5 * time.Second):
case <-time.After(util.ForeverTestTimeout):
t.Fatalf("timed out waiting for SendStatusUpdate for the running task")
}
@ -464,7 +451,7 @@ func TestExecutorFrameworkMessage(t *testing.T) {
).Return(mesosproto.Status_DRIVER_RUNNING, nil).Run(func(_ mock.Arguments) { close(called) }).Once()
executor.FrameworkMessage(mockDriver, "task-lost:foo")
assertext.EventuallyTrue(t, 5*time.Second, func() bool {
assertext.EventuallyTrue(t, util.ForeverTestTimeout, func() bool {
executor.lock.Lock()
defer executor.lock.Unlock()
return len(executor.tasks) == 0 && len(executor.pods) == 0
@ -472,7 +459,7 @@ func TestExecutorFrameworkMessage(t *testing.T) {
select {
case <-called:
case <-time.After(5 * time.Second):
case <-time.After(util.ForeverTestTimeout):
t.Fatalf("timed out waiting for SendStatusUpdate")
}
@ -569,9 +556,10 @@ func TestExecutorShutdown(t *testing.T) {
mockDriver := &MockExecutorDriver{}
kubeletFinished := make(chan struct{})
var exitCalled int32 = 0
updates := make(chan kubetypes.PodUpdate, 1024)
config := Config{
Docker: dockertools.ConnectToDockerOrDie("fake://"),
Updates: make(chan interface{}, 1024),
Updates: updates,
ShutdownAlert: func() {
close(kubeletFinished)
},
@ -579,6 +567,7 @@ func TestExecutorShutdown(t *testing.T) {
ExitFunc: func(_ int) {
atomic.AddInt32(&exitCalled, 1)
},
PodLW: &NewMockPodsListWatch(api.PodList{}).ListWatch,
}
executor := New(config)
@ -594,11 +583,21 @@ func TestExecutorShutdown(t *testing.T) {
assert.Equal(t, true, executor.isDone(),
"executor should be in Done state after Shutdown")
// channel should be closed now, only a constant number of updates left
num := len(updates)
drainLoop:
for {
select {
case <-executor.Done():
default:
t.Fatal("done channel should be closed after shutdown")
case _, ok := <-updates:
if !ok {
break drainLoop
}
num -= 1
default:
t.Fatal("Updates chan should be closed after Shutdown")
}
}
assert.Equal(t, num, 0, "Updates chan should get no new updates after Shutdown")
assert.Equal(t, true, atomic.LoadInt32(&exitCalled) > 0,
"the executor should call its ExitFunc when it is ready to close down")
@ -608,7 +607,7 @@ func TestExecutorShutdown(t *testing.T) {
func TestExecutorsendFrameworkMessage(t *testing.T) {
mockDriver := &MockExecutorDriver{}
executor := NewTestKubernetesExecutor()
executor, _ := NewTestKubernetesExecutor()
executor.Init(mockDriver)
executor.Registered(mockDriver, nil, nil, nil)
@ -623,7 +622,7 @@ func TestExecutorsendFrameworkMessage(t *testing.T) {
// guard against data race in mock driver between AssertExpectations and Called
select {
case <-called: // expected
case <-time.After(5 * time.Second):
case <-time.After(util.ForeverTestTimeout):
t.Fatalf("expected call to SendFrameworkMessage")
}
mockDriver.AssertExpectations(t)

View File

@ -22,7 +22,9 @@ import (
"github.com/mesos/mesos-go/mesosproto"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/mock"
"k8s.io/kubernetes/pkg/api"
"k8s.io/kubernetes/pkg/kubelet/dockertools"
kubetypes "k8s.io/kubernetes/pkg/kubelet/types"
)
type MockExecutorDriver struct {
@ -64,16 +66,18 @@ func (m *MockExecutorDriver) SendFrameworkMessage(msg string) (mesosproto.Status
return args.Get(0).(mesosproto.Status), args.Error(1)
}
func NewTestKubernetesExecutor() *KubernetesExecutor {
func NewTestKubernetesExecutor() (*KubernetesExecutor, chan kubetypes.PodUpdate) {
updates := make(chan kubetypes.PodUpdate, 1024)
return New(Config{
Docker: dockertools.ConnectToDockerOrDie("fake://"),
Updates: make(chan interface{}, 1024),
})
Updates: updates,
PodLW: &NewMockPodsListWatch(api.PodList{}).ListWatch,
}), updates
}
func TestExecutorNew(t *testing.T) {
mockDriver := &MockExecutorDriver{}
executor := NewTestKubernetesExecutor()
executor, _ := NewTestKubernetesExecutor()
executor.Init(mockDriver)
assert.Equal(t, executor.isDone(), false, "executor should not be in Done state on initialization")

View File

@ -0,0 +1,83 @@
/*
Copyright 2015 The Kubernetes Authors All rights reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package service
import (
log "github.com/golang/glog"
"k8s.io/kubernetes/pkg/kubelet"
kubetypes "k8s.io/kubernetes/pkg/kubelet/types"
"k8s.io/kubernetes/pkg/util"
)
// executorKubelet decorates the kubelet with a Run function that notifies the
// executor by closing kubeletDone before entering blocking state.
type executorKubelet struct {
*kubelet.Kubelet
kubeletDone chan<- struct{} // closed once kubelet.Run() returns
executorDone <-chan struct{} // closed when executor terminates
}
// Run runs the main kubelet loop, closing the kubeletFinished chan when the
// loop exits. Like the upstream Run, it will never return.
func (kl *executorKubelet) Run(mergedUpdates <-chan kubetypes.PodUpdate) {
defer func() {
// When this Run function is called, we close it here.
// Otherwise, KubeletExecutorServer.runKubelet will.
close(kl.kubeletDone)
util.HandleCrash()
log.Infoln("kubelet run terminated") //TODO(jdef) turn down verbosity
// important: never return! this is in our contract
select {}
}()
// push merged updates into another, closable update channel which is closed
// when the executor shuts down.
closableUpdates := make(chan kubetypes.PodUpdate)
go func() {
// closing closableUpdates will cause our patched kubelet's syncLoop() to exit
defer close(closableUpdates)
pipeLoop:
for {
select {
case <-kl.executorDone:
break pipeLoop
default:
select {
case u := <-mergedUpdates:
select {
case closableUpdates <- u: // noop
case <-kl.executorDone:
break pipeLoop
}
case <-kl.executorDone:
break pipeLoop
}
}
}
}()
// we expect that Run() will complete after closableUpdates is closed and the
// kubelet's syncLoop() has finished processing its backlog, which hopefully
// will not take very long. Peeking into the future (current k8s master) it
// seems that the backlog has grown from 1 to 50 -- this may negatively impact
// us going forward, time will tell.
util.Until(func() { kl.Kubelet.Run(closableUpdates) }, 0, kl.executorDone)
//TODO(jdef) revisit this if/when executor failover lands
// Force kubelet to delete all pods.
kl.HandlePodDeletions(kl.GetPods())
}

View File

@ -18,13 +18,9 @@ package service
import (
"fmt"
"math/rand"
"net"
"net/http"
"os"
"path/filepath"
"strconv"
"strings"
"sync"
"time"
@ -38,19 +34,11 @@ import (
"k8s.io/kubernetes/pkg/api"
"k8s.io/kubernetes/pkg/client/cache"
client "k8s.io/kubernetes/pkg/client/unversioned"
"k8s.io/kubernetes/pkg/credentialprovider"
"k8s.io/kubernetes/pkg/fields"
"k8s.io/kubernetes/pkg/healthz"
"k8s.io/kubernetes/pkg/kubelet"
"k8s.io/kubernetes/pkg/kubelet/cadvisor"
kconfig "k8s.io/kubernetes/pkg/kubelet/config"
kubecontainer "k8s.io/kubernetes/pkg/kubelet/container"
"k8s.io/kubernetes/pkg/kubelet/dockertools"
kubetypes "k8s.io/kubernetes/pkg/kubelet/types"
"k8s.io/kubernetes/pkg/util"
utilio "k8s.io/kubernetes/pkg/util/io"
"k8s.io/kubernetes/pkg/util/mount"
"k8s.io/kubernetes/pkg/util/oom"
)
const (
@ -63,6 +51,9 @@ type KubeletExecutorServer struct {
*app.KubeletServer
SuicideTimeout time.Duration
LaunchGracePeriod time.Duration
kletLock sync.Mutex // TODO(sttts): remove necessity to access the kubelet from the executor
klet *kubelet.Kubelet
}
func NewKubeletExecutorServer() *KubeletExecutorServer {
@ -87,19 +78,152 @@ func (s *KubeletExecutorServer) AddFlags(fs *pflag.FlagSet) {
fs.DurationVar(&s.LaunchGracePeriod, "mesos-launch-grace-period", s.LaunchGracePeriod, "Launch grace period after which launching tasks will be cancelled. Zero disables launch cancellation.")
}
// Run runs the specified KubeletExecutorServer.
func (s *KubeletExecutorServer) Run(hks hyperkube.Interface, _ []string) error {
rand.Seed(time.Now().UTC().UnixNano())
func (s *KubeletExecutorServer) runExecutor(execUpdates chan<- kubetypes.PodUpdate, kubeletFinished <-chan struct{},
staticPodsConfigPath string, apiclient *client.Client) error {
exec := executor.New(executor.Config{
Updates: execUpdates,
APIClient: apiclient,
Docker: dockertools.ConnectToDockerOrDie(s.DockerEndpoint),
SuicideTimeout: s.SuicideTimeout,
KubeletFinished: kubeletFinished,
ExitFunc: os.Exit,
PodStatusFunc: func(pod *api.Pod) (*api.PodStatus, error) {
s.kletLock.Lock()
defer s.kletLock.Unlock()
oomAdjuster := oom.NewOOMAdjuster()
if err := oomAdjuster.ApplyOOMScoreAdj(0, s.OOMScoreAdj); err != nil {
log.Info(err)
if s.klet == nil {
return nil, fmt.Errorf("PodStatucFunc called before kubelet is initialized")
}
// empty string for the docker and system containers (= cgroup paths). This
// stops the kubelet taking any control over other system processes.
s.SystemContainer = ""
s.DockerDaemonContainer = ""
status, err := s.klet.GetRuntime().GetPodStatus(pod)
if err != nil {
return nil, err
}
status.Phase = kubelet.GetPhase(&pod.Spec, status.ContainerStatuses)
hostIP, err := s.klet.GetHostIP()
if err != nil {
log.Errorf("Cannot get host IP: %v", err)
} else {
status.HostIP = hostIP.String()
}
return status, nil
},
StaticPodsConfigPath: staticPodsConfigPath,
PodLW: cache.NewListWatchFromClient(apiclient, "pods", api.NamespaceAll,
fields.OneTermEqualSelector(client.PodHost, s.HostnameOverride),
),
})
// initialize driver and initialize the executor with it
dconfig := bindings.DriverConfig{
Executor: exec,
HostnameOverride: s.HostnameOverride,
BindingAddress: s.Address,
}
driver, err := bindings.NewMesosExecutorDriver(dconfig)
if err != nil {
return fmt.Errorf("failed to create executor driver: %v", err)
}
log.V(2).Infof("Initialize executor driver...")
exec.Init(driver)
// start the driver
go func() {
if _, err := driver.Run(); err != nil {
log.Fatalf("executor driver failed: %v", err)
}
log.Info("executor Run completed")
}()
return nil
}
func (s *KubeletExecutorServer) runKubelet(execUpdates <-chan kubetypes.PodUpdate, kubeletDone chan<- struct{},
staticPodsConfigPath string, apiclient *client.Client) error {
kcfg, err := s.UnsecuredKubeletConfig()
if err == nil {
// apply Messo specific settings
executorDone := make(chan struct{})
kcfg.Builder = func(kc *app.KubeletConfig) (app.KubeletBootstrap, *kconfig.PodConfig, error) {
k, pc, err := app.CreateAndInitKubelet(kc)
if err != nil {
return k, pc, err
}
klet := k.(*kubelet.Kubelet)
s.kletLock.Lock()
s.klet = klet
s.kletLock.Unlock()
// decorate kubelet such that it shuts down when the executor is
decorated := &executorKubelet{
Kubelet: klet,
kubeletDone: kubeletDone,
executorDone: executorDone,
}
return decorated, pc, nil
}
kcfg.DockerDaemonContainer = "" // don't move the docker daemon into a cgroup
kcfg.Hostname = kcfg.HostnameOverride
kcfg.KubeClient = apiclient
kcfg.NodeName = kcfg.HostnameOverride
kcfg.PodConfig = kconfig.NewPodConfig(kconfig.PodConfigNotificationIncremental, kcfg.Recorder) // override the default pod source
kcfg.StandaloneMode = false
kcfg.SystemContainer = "" // don't take control over other system processes.
if kcfg.Cloud != nil {
// fail early and hard because having the cloud provider loaded would go unnoticed,
// but break bigger cluster because accessing the state.json from every slave kills the master.
panic("cloud provider must not be set")
}
// create main pod source
updates := kcfg.PodConfig.Channel(MESOS_CFG_SOURCE)
go func() {
// execUpdates will be closed by the executor on shutdown
defer close(executorDone)
for u := range execUpdates {
u.Source = MESOS_CFG_SOURCE
updates <- u
}
}()
// create static-pods directory file source
log.V(2).Infof("initializing static pods source factory, configured at path %q", staticPodsConfigPath)
fileSourceUpdates := kcfg.PodConfig.Channel(kubetypes.FileSource)
kconfig.NewSourceFile(staticPodsConfigPath, kcfg.HostnameOverride, kcfg.FileCheckFrequency, fileSourceUpdates)
// run the kubelet, until execUpdates is closed
// NOTE: because kcfg != nil holds, the upstream Run function will not
// initialize the cloud provider. We explicitly wouldn't want
// that because then every kubelet instance would query the master
// state.json which does not scale.
err = s.KubeletServer.Run(kcfg)
}
if err != nil {
// close the channel here. When Run returns without error, the executorKubelet is
// responsible to do this. If it returns with an error, we are responsible here.
close(kubeletDone)
}
return err
}
// Run runs the specified KubeletExecutorServer.
func (s *KubeletExecutorServer) Run(hks hyperkube.Interface, _ []string) error {
// create shared channels
kubeletFinished := make(chan struct{})
execUpdates := make(chan kubetypes.PodUpdate, 1)
// create static pods directory
staticPodsConfigPath := filepath.Join(s.RootDirectory, "static-pods")
err := os.Mkdir(staticPodsConfigPath, 0750)
if err != nil {
return err
}
// create apiserver client
var apiclient *client.Client
@ -108,167 +232,18 @@ func (s *KubeletExecutorServer) Run(hks hyperkube.Interface, _ []string) error {
apiclient, err = client.New(clientConfig)
}
if err != nil {
// required for k8sm since we need to send api.Binding information
// back to the apiserver
log.Fatalf("No API client: %v", err)
// required for k8sm since we need to send api.Binding information back to the apiserver
return fmt.Errorf("cannot create API client: %v", err)
}
log.Infof("Using root directory: %v", s.RootDirectory)
credentialprovider.SetPreferredDockercfgPath(s.RootDirectory)
cAdvisorInterface, err := cadvisor.New(s.CAdvisorPort)
// start executor
err = s.runExecutor(execUpdates, kubeletFinished, staticPodsConfigPath, apiclient)
if err != nil {
return err
}
imageGCPolicy := kubelet.ImageGCPolicy{
HighThresholdPercent: s.ImageGCHighThresholdPercent,
LowThresholdPercent: s.ImageGCLowThresholdPercent,
}
diskSpacePolicy := kubelet.DiskSpacePolicy{
DockerFreeDiskMB: s.LowDiskSpaceThresholdMB,
RootFreeDiskMB: s.LowDiskSpaceThresholdMB,
}
//TODO(jdef) intentionally NOT initializing a cloud provider here since:
//(a) the kubelet doesn't actually use it
//(b) we don't need to create N-kubelet connections to zookeeper for no good reason
//cloud := cloudprovider.InitCloudProvider(s.CloudProvider, s.CloudConfigFile)
//log.Infof("Successfully initialized cloud provider: %q from the config file: %q\n", s.CloudProvider, s.CloudConfigFile)
hostNetworkSources, err := kubetypes.GetValidatedSources(strings.Split(s.HostNetworkSources, ","))
if err != nil {
return err
}
hostPIDSources, err := kubetypes.GetValidatedSources(strings.Split(s.HostPIDSources, ","))
if err != nil {
return err
}
hostIPCSources, err := kubetypes.GetValidatedSources(strings.Split(s.HostIPCSources, ","))
if err != nil {
return err
}
tlsOptions, err := s.InitializeTLS()
if err != nil {
return err
}
mounter := mount.New()
if s.Containerized {
log.V(2).Info("Running kubelet in containerized mode (experimental)")
mounter = &mount.NsenterMounter{}
}
var writer utilio.Writer = &utilio.StdWriter{}
var dockerExecHandler dockertools.ExecHandler
switch s.DockerExecHandlerName {
case "native":
dockerExecHandler = &dockertools.NativeExecHandler{}
case "nsenter":
writer = &utilio.NsenterWriter{}
dockerExecHandler = &dockertools.NsenterExecHandler{}
default:
log.Warningf("Unknown Docker exec handler %q; defaulting to native", s.DockerExecHandlerName)
dockerExecHandler = &dockertools.NativeExecHandler{}
}
manifestURLHeader := make(http.Header)
if s.ManifestURLHeader != "" {
pieces := strings.Split(s.ManifestURLHeader, ":")
if len(pieces) != 2 {
return fmt.Errorf("manifest-url-header must have a single ':' key-value separator, got %q", s.ManifestURLHeader)
}
manifestURLHeader.Set(pieces[0], pieces[1])
}
kcfg := app.KubeletConfig{
Address: s.Address,
AllowPrivileged: s.AllowPrivileged,
CAdvisorInterface: cAdvisorInterface,
CgroupRoot: s.CgroupRoot,
Cloud: nil, // TODO(jdef) Cloud, specifying null here because we don't want all kubelets polling mesos-master; need to account for this in the cloudprovider impl
ClusterDNS: s.ClusterDNS,
ClusterDomain: s.ClusterDomain,
// ConfigFile: ""
ConfigureCBR0: s.ConfigureCBR0,
ContainerRuntime: s.ContainerRuntime,
CPUCFSQuota: s.CPUCFSQuota,
DiskSpacePolicy: diskSpacePolicy,
DockerClient: dockertools.ConnectToDockerOrDie(s.DockerEndpoint),
DockerDaemonContainer: s.DockerDaemonContainer,
DockerExecHandler: dockerExecHandler,
EnableDebuggingHandlers: s.EnableDebuggingHandlers,
EnableServer: s.EnableServer,
EventBurst: s.EventBurst,
EventRecordQPS: s.EventRecordQPS,
FileCheckFrequency: s.FileCheckFrequency,
HostnameOverride: s.HostnameOverride,
HostNetworkSources: hostNetworkSources,
HostPIDSources: hostPIDSources,
HostIPCSources: hostIPCSources,
// HTTPCheckFrequency
ImageGCPolicy: imageGCPolicy,
KubeClient: apiclient,
// ManifestURL: ""
ManifestURLHeader: manifestURLHeader,
MasterServiceNamespace: s.MasterServiceNamespace,
MaxContainerCount: s.MaxContainerCount,
MaxOpenFiles: s.MaxOpenFiles,
MaxPerPodContainerCount: s.MaxPerPodContainerCount,
MaxPods: s.MaxPods,
MinimumGCAge: s.MinimumGCAge,
Mounter: mounter,
NetworkPluginName: s.NetworkPluginName,
NetworkPlugins: app.ProbeNetworkPlugins(s.NetworkPluginDir),
NodeStatusUpdateFrequency: s.NodeStatusUpdateFrequency,
OOMAdjuster: oomAdjuster,
OSInterface: kubecontainer.RealOS{},
PodCIDR: s.PodCIDR,
PodInfraContainerImage: s.PodInfraContainerImage,
Port: s.Port,
ReadOnlyPort: s.ReadOnlyPort,
RegisterNode: s.RegisterNode,
RegistryBurst: s.RegistryBurst,
RegistryPullQPS: s.RegistryPullQPS,
ResolverConfig: s.ResolverConfig,
ResourceContainer: s.ResourceContainer,
RootDirectory: s.RootDirectory,
Runonce: s.RunOnce,
// StandaloneMode: false
StreamingConnectionIdleTimeout: s.StreamingConnectionIdleTimeout,
SyncFrequency: s.SyncFrequency,
SystemContainer: s.SystemContainer,
TLSOptions: tlsOptions,
VolumePlugins: app.ProbeVolumePlugins(),
Writer: writer,
}
kcfg.NodeName = kcfg.Hostname
kcfg.Builder = app.KubeletBuilder(func(kc *app.KubeletConfig) (app.KubeletBootstrap, *kconfig.PodConfig, error) {
return s.createAndInitKubelet(kc, hks, clientConfig)
})
err = app.RunKubelet(&kcfg)
if err != nil {
return err
}
if s.HealthzPort > 0 {
healthz.DefaultHealthz()
go util.Until(func() {
err := http.ListenAndServe(net.JoinHostPort(s.HealthzBindAddress.String(), strconv.Itoa(s.HealthzPort)), nil)
if err != nil {
log.Errorf("Starting health server failed: %v", err)
}
}, 5*time.Second, util.NeverStop)
}
// block until executor is shut down or commits shutdown
select {}
// start kubelet, blocking
return s.runKubelet(execUpdates, kubeletFinished, staticPodsConfigPath, apiclient)
}
func defaultBindingAddress() string {
@ -279,222 +254,3 @@ func defaultBindingAddress() string {
return libProcessIP
}
}
func (ks *KubeletExecutorServer) createAndInitKubelet(
kc *app.KubeletConfig,
hks hyperkube.Interface,
clientConfig *client.Config,
) (app.KubeletBootstrap, *kconfig.PodConfig, error) {
// TODO(k8s): block until all sources have delivered at least one update to the channel, or break the sync loop
// up into "per source" synchronizations
// TODO(k8s): KubeletConfig.KubeClient should be a client interface, but client interface misses certain methods
// used by kubelet. Since NewMainKubelet expects a client interface, we need to make sure we are not passing
// a nil pointer to it when what we really want is a nil interface.
var kubeClient client.Interface
if kc.KubeClient == nil {
kubeClient = nil
} else {
kubeClient = kc.KubeClient
}
gcPolicy := kubecontainer.ContainerGCPolicy{
MinAge: kc.MinimumGCAge,
MaxPerPodContainer: kc.MaxPerPodContainerCount,
MaxContainers: kc.MaxContainerCount,
}
pc := kconfig.NewPodConfig(kconfig.PodConfigNotificationIncremental, kc.Recorder)
updates := pc.Channel(MESOS_CFG_SOURCE)
klet, err := kubelet.NewMainKubelet(
kc.Hostname,
kc.NodeName,
kc.DockerClient,
kubeClient,
kc.RootDirectory,
kc.PodInfraContainerImage,
kc.SyncFrequency,
float32(kc.RegistryPullQPS),
kc.RegistryBurst,
kc.EventRecordQPS,
kc.EventBurst,
gcPolicy,
pc.SeenAllSources,
kc.RegisterNode,
kc.RegisterSchedulable,
kc.StandaloneMode,
kc.ClusterDomain,
net.IP(kc.ClusterDNS),
kc.MasterServiceNamespace,
kc.VolumePlugins,
kc.NetworkPlugins,
kc.NetworkPluginName,
kc.StreamingConnectionIdleTimeout,
kc.Recorder,
kc.CAdvisorInterface,
kc.ImageGCPolicy,
kc.DiskSpacePolicy,
kc.Cloud,
kc.NodeStatusUpdateFrequency,
kc.ResourceContainer,
kc.OSInterface,
kc.CgroupRoot,
kc.ContainerRuntime,
kc.RktPath,
kc.RktStage1Image,
kc.Mounter,
kc.Writer,
kc.DockerDaemonContainer,
kc.SystemContainer,
kc.ConfigureCBR0,
kc.PodCIDR,
kc.ReconcileCIDR,
kc.MaxPods,
kc.DockerExecHandler,
kc.ResolverConfig,
kc.CPUCFSQuota,
&api.NodeDaemonEndpoints{
KubeletEndpoint: api.DaemonEndpoint{Port: int(kc.Port)},
},
kc.OOMAdjuster,
)
if err != nil {
return nil, nil, err
}
//TODO(jdef) either configure Watch here with something useful, or else
// get rid of it from executor.Config
kubeletFinished := make(chan struct{})
staticPodsConfigPath := filepath.Join(kc.RootDirectory, "static-pods")
exec := executor.New(executor.Config{
Kubelet: klet,
Updates: updates,
SourceName: MESOS_CFG_SOURCE,
APIClient: kc.KubeClient,
Docker: kc.DockerClient,
SuicideTimeout: ks.SuicideTimeout,
LaunchGracePeriod: ks.LaunchGracePeriod,
KubeletFinished: kubeletFinished,
ExitFunc: os.Exit,
PodStatusFunc: func(_ executor.KubeletInterface, pod *api.Pod) (*api.PodStatus, error) {
return klet.GetRuntime().GetPodStatus(pod)
},
StaticPodsConfigPath: staticPodsConfigPath,
PodLW: cache.NewListWatchFromClient(kc.KubeClient, "pods", api.NamespaceAll, fields.OneTermEqualSelector(client.PodHost, kc.NodeName)),
})
go exec.InitializeStaticPodsSource(func() {
// Create file source only when we are called back. Otherwise, it is never marked unseen.
fileSourceUpdates := pc.Channel(kubetypes.FileSource)
kconfig.NewSourceFile(staticPodsConfigPath, kc.Hostname, kc.FileCheckFrequency, fileSourceUpdates)
})
k := &kubeletExecutor{
Kubelet: klet,
address: ks.Address,
dockerClient: kc.DockerClient,
hks: hks,
kubeletFinished: kubeletFinished,
executorDone: exec.Done(),
clientConfig: clientConfig,
}
dconfig := bindings.DriverConfig{
Executor: exec,
HostnameOverride: ks.HostnameOverride,
BindingAddress: ks.Address,
}
if driver, err := bindings.NewMesosExecutorDriver(dconfig); err != nil {
log.Fatalf("failed to create executor driver: %v", err)
} else {
k.driver = driver
}
log.V(2).Infof("Initialize executor driver...")
k.BirthCry()
exec.Init(k.driver)
k.StartGarbageCollection()
return k, pc, nil
}
// kubelet decorator
type kubeletExecutor struct {
*kubelet.Kubelet
initialize sync.Once
driver bindings.ExecutorDriver
address net.IP
dockerClient dockertools.DockerInterface
hks hyperkube.Interface
kubeletFinished chan struct{} // closed once kubelet.Run() returns
executorDone <-chan struct{} // from KubeletExecutor.Done()
clientConfig *client.Config
}
func (kl *kubeletExecutor) ListenAndServe(address net.IP, port uint, tlsOptions *kubelet.TLSOptions, auth kubelet.AuthInterface, enableDebuggingHandlers bool) {
// this func could be called many times, depending how often the HTTP server crashes,
// so only execute certain initialization procs once
kl.initialize.Do(func() {
go func() {
if _, err := kl.driver.Run(); err != nil {
log.Fatalf("executor driver failed: %v", err)
}
log.Info("executor Run completed")
}()
})
log.Infof("Starting kubelet server...")
kubelet.ListenAndServeKubeletServer(kl, address, port, tlsOptions, auth, enableDebuggingHandlers)
}
// runs the main kubelet loop, closing the kubeletFinished chan when the loop exits.
// never returns.
func (kl *kubeletExecutor) Run(updates <-chan kubetypes.PodUpdate) {
defer func() {
close(kl.kubeletFinished)
util.HandleCrash()
log.Infoln("kubelet run terminated") //TODO(jdef) turn down verbosity
// important: never return! this is in our contract
select {}
}()
// push updates through a closable pipe. when the executor indicates shutdown
// via Done() we want to stop the Kubelet from processing updates.
pipe := make(chan kubetypes.PodUpdate)
go func() {
// closing pipe will cause our patched kubelet's syncLoop() to exit
defer close(pipe)
pipeLoop:
for {
select {
case <-kl.executorDone:
break pipeLoop
default:
select {
case u := <-updates:
select {
case pipe <- u: // noop
case <-kl.executorDone:
break pipeLoop
}
case <-kl.executorDone:
break pipeLoop
}
}
}
}()
// we expect that Run() will complete after the pipe is closed and the
// kubelet's syncLoop() has finished processing its backlog, which hopefully
// will not take very long. Peeking into the future (current k8s master) it
// seems that the backlog has grown from 1 to 50 -- this may negatively impact
// us going forward, time will tell.
util.Until(func() { kl.Kubelet.Run(pipe) }, 0, kl.executorDone)
//TODO(jdef) revisit this if/when executor failover lands
// Force kubelet to delete all pods.
kl.HandlePodDeletions(kl.GetPods())
}

View File

@ -67,7 +67,7 @@ func (t *suicideTracker) makeJumper(_ jumper) jumper {
func TestSuicide_zeroTimeout(t *testing.T) {
defer glog.Flush()
k := New(Config{})
k, _ := NewTestKubernetesExecutor()
tracker := &suicideTracker{suicideWatcher: k.suicideWatch}
k.suicideWatch = tracker
@ -92,9 +92,8 @@ func TestSuicide_zeroTimeout(t *testing.T) {
func TestSuicide_WithTasks(t *testing.T) {
defer glog.Flush()
k := New(Config{
SuicideTimeout: 50 * time.Millisecond,
})
k, _ := NewTestKubernetesExecutor()
k.suicideTimeout = 50 * time.Millisecond
jumps := uint32(0)
tracker := &suicideTracker{suicideWatcher: k.suicideWatch, jumps: &jumps}