Remove redundant kubelet dependency of executor

This commit is contained in:
Dr. Stefan Schimanski 2015-08-21 15:23:44 +02:00
parent 6af86cbaad
commit 686b767f28
3 changed files with 34 additions and 57 deletions

View File

@ -19,7 +19,6 @@ package executor
import (
"encoding/json"
"fmt"
"net"
"strings"
"sync"
"sync/atomic"
@ -94,15 +93,9 @@ type kuberTask struct {
type podStatusFunc func() (*api.PodStatus, error)
// KubeletInterface consists of the kubelet.Kubelet API's that we actually use
type KubeletInterface interface {
GetHostIP() (net.IP, error)
}
// KubernetesExecutor is an mesos executor that runs pods
// in a minion machine.
type KubernetesExecutor struct {
kl KubeletInterface // the kubelet instance.
updateChan chan<- interface{} // to send pod config updates to the kubelet
state stateType
tasks map[string]*kuberTask
@ -119,8 +112,7 @@ type KubernetesExecutor struct {
kubeletFinished <-chan struct{} // signals that kubelet Run() died
initialRegistration sync.Once
exitFunc func(int)
podStatusFunc func(KubeletInterface, *api.Pod) (*api.PodStatus, error)
staticPodsConfig []byte
podStatusFunc func(*api.Pod) (*api.PodStatus, error)
staticPodsConfigPath string
initialRegComplete chan struct{}
podController *framework.Controller
@ -128,7 +120,6 @@ type KubernetesExecutor struct {
}
type Config struct {
Kubelet KubeletInterface
Updates chan<- interface{} // to send pod config updates to the kubelet
SourceName string
APIClient *client.Client
@ -137,7 +128,7 @@ type Config struct {
SuicideTimeout time.Duration
KubeletFinished <-chan struct{} // signals that kubelet Run() died
ExitFunc func(int)
PodStatusFunc func(KubeletInterface, *api.Pod) (*api.PodStatus, error)
PodStatusFunc func(*api.Pod) (*api.PodStatus, error)
StaticPodsConfigPath string
PodLW cache.ListerWatcher // mandatory, otherwise initialiation will panic
LaunchGracePeriod time.Duration
@ -150,7 +141,6 @@ func (k *KubernetesExecutor) isConnected() bool {
// New creates a new kubernetes executor.
func New(config Config) *KubernetesExecutor {
k := &KubernetesExecutor{
kl: config.Kubelet,
updateChan: config.Updates,
state: disconnectedState,
tasks: make(map[string]*kuberTask),
@ -196,6 +186,10 @@ func New(config Config) *KubernetesExecutor {
return k
}
func (k *KubernetesExecutor) InitialRegComplete() <-chan struct{} {
return k.initialRegComplete
}
func (k *KubernetesExecutor) Init(driver bindings.ExecutorDriver) {
k.killKubeletContainers()
k.resetSuicideWatch(driver)
@ -231,7 +225,7 @@ func (k *KubernetesExecutor) Registered(driver bindings.ExecutorDriver,
}
if executorInfo != nil && executorInfo.Data != nil {
k.staticPodsConfig = executorInfo.Data
k.initializeStaticPodsSource(executorInfo.Data)
}
if slaveInfo != nil {
@ -281,24 +275,14 @@ func (k *KubernetesExecutor) onInitialRegistration() {
}
}
// InitializeStaticPodsSource blocks until initial regstration is complete and
// then creates a static pod source using the given factory func.
func (k *KubernetesExecutor) InitializeStaticPodsSource(sourceFactory func()) {
<-k.initialRegComplete
if k.staticPodsConfig == nil {
return
}
// initializeStaticPodsSource unzips the data slice into the static-pods directory
func (k *KubernetesExecutor) initializeStaticPodsSource(data []byte) {
log.V(2).Infof("extracting static pods config to %s", k.staticPodsConfigPath)
err := archive.UnzipDir(k.staticPodsConfig, k.staticPodsConfigPath)
err := archive.UnzipDir(data, k.staticPodsConfigPath)
if err != nil {
log.Errorf("Failed to extract static pod config: %v", err)
return
}
log.V(2).Infof("initializing static pods source factory, configured at path %q", k.staticPodsConfigPath)
sourceFactory()
}
// Disconnected is called when the executor is disconnected from the slave.
@ -597,18 +581,7 @@ func (k *KubernetesExecutor) launchTask(driver bindings.ExecutorDriver, taskId s
// Delay reporting 'task running' until container is up.
psf := podStatusFunc(func() (*api.PodStatus, error) {
status, err := k.podStatusFunc(k.kl, pod)
if err != nil {
return nil, err
}
status.Phase = kubelet.GetPhase(&pod.Spec, status.ContainerStatuses)
hostIP, err := k.kl.GetHostIP()
if err != nil {
log.Errorf("Cannot get host IP: %v", err)
} else {
status.HostIP = hostIP.String()
}
return status, nil
return k.podStatusFunc(pod)
})
go k._launchTask(driver, taskId, podFullName, psf)

View File

@ -144,10 +144,6 @@ func TestExecutorLaunchAndKillTask(t *testing.T) {
Host: testApiServer.server.URL,
Version: testapi.Default.Version(),
}),
Kubelet: &fakeKubelet{
Kubelet: &kubelet.Kubelet{},
hostIP: net.IPv4(127, 0, 0, 1),
},
PodStatusFunc: func(kl KubeletInterface, pod *api.Pod) (*api.PodStatus, error) {
return &api.PodStatus{
ContainerStatuses: []api.ContainerStatus{
@ -159,6 +155,7 @@ func TestExecutorLaunchAndKillTask(t *testing.T) {
},
},
Phase: api.PodRunning,
HostIP: "127.0.0.1",
}, nil
},
PodLW: &NewMockPodsListWatch(api.PodList{}).ListWatch,
@ -311,7 +308,6 @@ func TestExecutorStaticPods(t *testing.T) {
Host: testApiServer.server.URL,
Version: testapi.Default.Version(),
}),
Kubelet: &kubelet.Kubelet{},
PodStatusFunc: func(kl KubeletInterface, pod *api.Pod) (*api.PodStatus, error) {
return &api.PodStatus{
ContainerStatuses: []api.ContainerStatus{
@ -393,10 +389,6 @@ func TestExecutorFrameworkMessage(t *testing.T) {
Host: testApiServer.server.URL,
Version: testapi.Default.Version(),
}),
Kubelet: &fakeKubelet{
Kubelet: &kubelet.Kubelet{},
hostIP: net.IPv4(127, 0, 0, 1),
},
PodStatusFunc: func(kl KubeletInterface, pod *api.Pod) (*api.PodStatus, error) {
return &api.PodStatus{
ContainerStatuses: []api.ContainerStatus{
@ -408,6 +400,7 @@ func TestExecutorFrameworkMessage(t *testing.T) {
},
},
Phase: api.PodRunning,
HostIP: "127.0.0.1",
}, nil
},
ShutdownAlert: func() {

View File

@ -368,17 +368,28 @@ func (ks *KubeletExecutorServer) createAndInitKubelet(
kubeletFinished := make(chan struct{})
staticPodsConfigPath := filepath.Join(kc.RootDirectory, "static-pods")
exec := executor.New(executor.Config{
Kubelet: klet,
Updates: updates,
SourceName: MESOS_CFG_SOURCE,
APIClient: kc.KubeClient,
Docker: kc.DockerClient,
SuicideTimeout: ks.SuicideTimeout,
Updates: updates,
SourceName: MESOS_CFG_SOURCE,
APIClient: kc.KubeClient,
Docker: kc.DockerClient,
SuicideTimeout: ks.SuicideTimeout,
LaunchGracePeriod: ks.LaunchGracePeriod,
KubeletFinished: kubeletFinished,
ExitFunc: os.Exit,
PodStatusFunc: func(_ executor.KubeletInterface, pod *api.Pod) (*api.PodStatus, error) {
return klet.GetRuntime().GetPodStatus(pod)
KubeletFinished: kubeletFinished,
ExitFunc: os.Exit,
PodStatusFunc: func(pod *api.Pod) (*api.PodStatus, error) {
status, err := klet.GetRuntime().GetPodStatus(pod)
if err != nil {
return nil, err
}
status.Phase = kubelet.GetPhase(&pod.Spec, status.ContainerStatuses)
hostIP, err := klet.GetHostIP()
if err != nil {
log.Errorf("Cannot get host IP: %v", err)
} else {
status.HostIP = hostIP.String()
}
return status, nil
},
StaticPodsConfigPath: staticPodsConfigPath,
PodLW: cache.NewListWatchFromClient(kc.KubeClient, "pods", api.NamespaceAll, fields.OneTermEqualSelector(client.PodHost, kc.NodeName)),