diff --git a/contrib/mesos/pkg/archive/doc.go b/contrib/mesos/pkg/archive/doc.go new file mode 100644 index 00000000000..a4e311de230 --- /dev/null +++ b/contrib/mesos/pkg/archive/doc.go @@ -0,0 +1,19 @@ +/* +Copyright 2015 The Kubernetes Authors All rights reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +// Package archive provides utilities to archive and unarchive filesystem +// hierarchies. +package archive diff --git a/contrib/mesos/pkg/archive/zip.go b/contrib/mesos/pkg/archive/zip.go new file mode 100644 index 00000000000..792ddcbaeb8 --- /dev/null +++ b/contrib/mesos/pkg/archive/zip.go @@ -0,0 +1,137 @@ +/* +Copyright 2015 The Kubernetes Authors All rights reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package archive + +import ( + "archive/zip" + "bytes" + "fmt" + "io" + "os" + "path" + "path/filepath" +) + +// ZipWalker returns a filepath.WalkFunc that adds every filesystem node +// to the given *zip.Writer. +func ZipWalker(zw *zip.Writer) filepath.WalkFunc { + var base string + return func(path string, info os.FileInfo, err error) error { + if base == "" { + base = path + } + + header, err := zip.FileInfoHeader(info) + if err != nil { + return err + } + + if header.Name, err = filepath.Rel(base, path); err != nil { + return err + } else if info.IsDir() { + header.Name = header.Name + string(filepath.Separator) + } else { + header.Method = zip.Deflate + } + + w, err := zw.CreateHeader(header) + if err != nil { + return err + } + + if info.IsDir() { + return nil + } + + f, err := os.Open(path) + if err != nil { + return err + } + + _, err = io.Copy(w, f) + f.Close() + return err + } +} + +// Create a zip of all files in a directory recursively, return a byte array and +// the number of files archived. +func ZipDir(path string) ([]byte, int, error) { + var buf bytes.Buffer + zw := zip.NewWriter(&buf) + zipWalker := ZipWalker(zw) + numberManifests := 0 + err := filepath.Walk(path, filepath.WalkFunc(func(path string, info os.FileInfo, err error) error { + if !info.IsDir() { + numberManifests++ + } + return zipWalker(path, info, err) + })) + + if err != nil { + return nil, 0, err + } else if err = zw.Close(); err != nil { + return nil, 0, err + } + return buf.Bytes(), numberManifests, nil +} + +// UnzipDir unzips all files from a given zip byte array into a given directory. +// The directory is created if it does not exist yet. +func UnzipDir(data []byte, destPath string) error { + // open zip + zr, err := zip.NewReader(bytes.NewReader(data), int64(len(data))) + if err != nil { + return fmt.Errorf("Unzip archive read error: %v", err) + } + + for _, file := range zr.File { + // skip directories + if file.FileInfo().IsDir() { + continue + } + + // open file + rc, err := file.Open() + defer rc.Close() + if err != nil { + return fmt.Errorf("Unzip file read error: %v", err) + } + + // make sure the directory of the file exists, otherwise create + destPath := filepath.Clean(filepath.Join(destPath, file.Name)) + destBasedir := path.Dir(destPath) + err = os.MkdirAll(destBasedir, 0755) + if err != nil { + return fmt.Errorf("Unzip mkdir error: %v", err) + } + + // create file + f, err := os.Create(destPath) + if err != nil { + return fmt.Errorf("Unzip file creation error: %v", err) + } + defer f.Close() + + // write file + if _, err := io.Copy(f, rc); err != nil { + return fmt.Errorf("Unzip file write error: %v", err) + } + } + + return nil +} diff --git a/contrib/mesos/pkg/archive/zip_test.go b/contrib/mesos/pkg/archive/zip_test.go new file mode 100644 index 00000000000..1889a124d5f --- /dev/null +++ b/contrib/mesos/pkg/archive/zip_test.go @@ -0,0 +1,66 @@ +/* +Copyright 2015 The Kubernetes Authors All rights reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package archive + +import ( + "archive/zip" + "bytes" + "io/ioutil" + "os" + "path/filepath" + "testing" +) + +func TestZipWalker(t *testing.T) { + dir, err := ioutil.TempDir(os.TempDir(), "") + if err != nil { + t.Fatal(err) + } + + tree := map[string]string{"a/b/c": "12345", "a/b/d": "54321", "a/e": "00000"} + for path, content := range tree { + path = filepath.Join(dir, path) + if err := os.MkdirAll(filepath.Dir(path), os.ModeTemporary|0700); err != nil { + t.Fatal(err) + } else if err = ioutil.WriteFile(path, []byte(content), 0700); err != nil { + t.Fatal(err) + } + } + + var buf bytes.Buffer + zw := zip.NewWriter(&buf) + if err := filepath.Walk(dir, ZipWalker(zw)); err != nil { + t.Fatal(err) + } else if err = zw.Close(); err != nil { + t.Fatal(err) + } + + zr, err := zip.NewReader(bytes.NewReader(buf.Bytes()), int64(buf.Len())) + if err != nil { + t.Fatal(err) + } + + for _, file := range zr.File { + if rc, err := file.Open(); err != nil { + t.Fatal(err) + } else if got, err := ioutil.ReadAll(rc); err != nil { + t.Error(err) + } else if want := []byte(tree[file.Name]); !bytes.Equal(got, want) { + t.Errorf("%s\ngot: %s\nwant: %s", file.Name, got, want) + } + } +} diff --git a/contrib/mesos/pkg/executor/executor.go b/contrib/mesos/pkg/executor/executor.go index 62c2bba44bd..c8da632abd2 100644 --- a/contrib/mesos/pkg/executor/executor.go +++ b/contrib/mesos/pkg/executor/executor.go @@ -25,6 +25,7 @@ import ( "sync/atomic" "time" + "github.com/GoogleCloudPlatform/kubernetes/contrib/mesos/pkg/archive" "github.com/GoogleCloudPlatform/kubernetes/contrib/mesos/pkg/executor/messages" "github.com/GoogleCloudPlatform/kubernetes/contrib/mesos/pkg/scheduler/meta" "github.com/GoogleCloudPlatform/kubernetes/pkg/api" @@ -106,39 +107,43 @@ type KubeletInterface interface { // KubernetesExecutor is an mesos executor that runs pods // in a minion machine. type KubernetesExecutor struct { - kl KubeletInterface // the kubelet instance. - updateChan chan<- interface{} // to send pod config updates to the kubelet - state stateType - tasks map[string]*kuberTask - pods map[string]*api.Pod - lock sync.RWMutex - sourcename string - client *client.Client - events <-chan watch.Event - done chan struct{} // signals shutdown - outgoing chan func() (mesos.Status, error) // outgoing queue to the mesos driver - dockerClient dockertools.DockerInterface - suicideWatch suicideWatcher - suicideTimeout time.Duration - shutdownAlert func() // invoked just prior to executor shutdown - kubeletFinished <-chan struct{} // signals that kubelet Run() died - initialRegistration sync.Once - exitFunc func(int) - podStatusFunc func(KubeletInterface, *api.Pod) (*api.PodStatus, error) + kl KubeletInterface // the kubelet instance. + updateChan chan<- interface{} // to send pod config updates to the kubelet + state stateType + tasks map[string]*kuberTask + pods map[string]*api.Pod + lock sync.RWMutex + sourcename string + client *client.Client + events <-chan watch.Event + done chan struct{} // signals shutdown + outgoing chan func() (mesos.Status, error) // outgoing queue to the mesos driver + dockerClient dockertools.DockerInterface + suicideWatch suicideWatcher + suicideTimeout time.Duration + shutdownAlert func() // invoked just prior to executor shutdown + kubeletFinished <-chan struct{} // signals that kubelet Run() died + initialRegistration sync.Once + exitFunc func(int) + podStatusFunc func(KubeletInterface, *api.Pod) (*api.PodStatus, error) + staticPodsConfig []byte + staticPodsConfigPath string + initialRegComplete chan struct{} } type Config struct { - Kubelet KubeletInterface - Updates chan<- interface{} // to send pod config updates to the kubelet - SourceName string - APIClient *client.Client - Watch watch.Interface - Docker dockertools.DockerInterface - ShutdownAlert func() - SuicideTimeout time.Duration - KubeletFinished <-chan struct{} // signals that kubelet Run() died - ExitFunc func(int) - PodStatusFunc func(KubeletInterface, *api.Pod) (*api.PodStatus, error) + Kubelet KubeletInterface + Updates chan<- interface{} // to send pod config updates to the kubelet + SourceName string + APIClient *client.Client + Watch watch.Interface + Docker dockertools.DockerInterface + ShutdownAlert func() + SuicideTimeout time.Duration + KubeletFinished <-chan struct{} // signals that kubelet Run() died + ExitFunc func(int) + PodStatusFunc func(KubeletInterface, *api.Pod) (*api.PodStatus, error) + StaticPodsConfigPath string } func (k *KubernetesExecutor) isConnected() bool { @@ -148,22 +153,24 @@ func (k *KubernetesExecutor) isConnected() bool { // New creates a new kubernetes executor. func New(config Config) *KubernetesExecutor { k := &KubernetesExecutor{ - kl: config.Kubelet, - updateChan: config.Updates, - state: disconnectedState, - tasks: make(map[string]*kuberTask), - pods: make(map[string]*api.Pod), - sourcename: config.SourceName, - client: config.APIClient, - done: make(chan struct{}), - outgoing: make(chan func() (mesos.Status, error), 1024), - dockerClient: config.Docker, - suicideTimeout: config.SuicideTimeout, - kubeletFinished: config.KubeletFinished, - suicideWatch: &suicideTimer{}, - shutdownAlert: config.ShutdownAlert, - exitFunc: config.ExitFunc, - podStatusFunc: config.PodStatusFunc, + kl: config.Kubelet, + updateChan: config.Updates, + state: disconnectedState, + tasks: make(map[string]*kuberTask), + pods: make(map[string]*api.Pod), + sourcename: config.SourceName, + client: config.APIClient, + done: make(chan struct{}), + outgoing: make(chan func() (mesos.Status, error), 1024), + dockerClient: config.Docker, + suicideTimeout: config.SuicideTimeout, + kubeletFinished: config.KubeletFinished, + suicideWatch: &suicideTimer{}, + shutdownAlert: config.ShutdownAlert, + exitFunc: config.ExitFunc, + podStatusFunc: config.PodStatusFunc, + initialRegComplete: make(chan struct{}), + staticPodsConfigPath: config.StaticPodsConfigPath, } //TODO(jdef) do something real with these events.. if config.Watch != nil { @@ -212,6 +219,11 @@ func (k *KubernetesExecutor) Registered(driver bindings.ExecutorDriver, if !(&k.state).transition(disconnectedState, connectedState) { log.Errorf("failed to register/transition to a connected state") } + + if executorInfo != nil && executorInfo.Data != nil { + k.staticPodsConfig = executorInfo.Data + } + k.initialRegistration.Do(k.onInitialRegistration) } @@ -225,10 +237,12 @@ func (k *KubernetesExecutor) Reregistered(driver bindings.ExecutorDriver, slaveI if !(&k.state).transition(disconnectedState, connectedState) { log.Errorf("failed to reregister/transition to a connected state") } + k.initialRegistration.Do(k.onInitialRegistration) } func (k *KubernetesExecutor) onInitialRegistration() { + defer close(k.initialRegComplete) // emit an empty update to allow the mesos "source" to be marked as seen k.updateChan <- kubelet.PodUpdate{ Pods: []*api.Pod{}, @@ -237,6 +251,26 @@ func (k *KubernetesExecutor) onInitialRegistration() { } } +// InitializeStaticPodsSource blocks until initial regstration is complete and +// then creates a static pod source using the given factory func. +func (k *KubernetesExecutor) InitializeStaticPodsSource(sourceFactory func()) { + <-k.initialRegComplete + + if k.staticPodsConfig == nil { + return + } + + log.V(2).Infof("extracting static pods config to %s", k.staticPodsConfigPath) + err := archive.UnzipDir(k.staticPodsConfig, k.staticPodsConfigPath) + if err != nil { + log.Errorf("Failed to extract static pod config: %v", err) + return + } + + log.V(2).Infof("initializing static pods source factory, configured at path %q", k.staticPodsConfigPath) + sourceFactory() +} + // Disconnected is called when the executor is disconnected from the slave. func (k *KubernetesExecutor) Disconnected(driver bindings.ExecutorDriver) { if k.isDone() { @@ -772,7 +806,6 @@ func (k *KubernetesExecutor) doShutdown(driver bindings.ExecutorDriver) { case <-time.After(15 * time.Second): log.Errorf("timed out waiting for kubelet Run() to die") } - log.Infoln("exiting") if k.exitFunc != nil { k.exitFunc(0) diff --git a/contrib/mesos/pkg/executor/executor_test.go b/contrib/mesos/pkg/executor/executor_test.go index b94deaee182..d38b9936bec 100644 --- a/contrib/mesos/pkg/executor/executor_test.go +++ b/contrib/mesos/pkg/executor/executor_test.go @@ -17,10 +17,14 @@ limitations under the License. package executor import ( + "archive/zip" + "bytes" "fmt" + "io/ioutil" "net" "net/http" "net/http/httptest" + "os" "reflect" "sync" "sync/atomic" @@ -36,6 +40,7 @@ import ( "github.com/GoogleCloudPlatform/kubernetes/pkg/client" "github.com/GoogleCloudPlatform/kubernetes/pkg/client/cache" "github.com/GoogleCloudPlatform/kubernetes/pkg/kubelet" + kconfig "github.com/GoogleCloudPlatform/kubernetes/pkg/kubelet/config" "github.com/GoogleCloudPlatform/kubernetes/pkg/kubelet/dockertools" "github.com/GoogleCloudPlatform/kubernetes/pkg/runtime" "github.com/GoogleCloudPlatform/kubernetes/pkg/watch" @@ -43,6 +48,7 @@ import ( "github.com/golang/glog" bindings "github.com/mesos/mesos-go/executor" "github.com/mesos/mesos-go/mesosproto" + "github.com/mesos/mesos-go/mesosutil" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/mock" ) @@ -420,6 +426,126 @@ func TestExecutorLaunchAndKillTask(t *testing.T) { mockDriver.AssertExpectations(t) } +// TestExecutorStaticPods test that the ExecutorInfo.data is parsed +// as a zip archive with pod definitions. +func TestExecutorStaticPods(t *testing.T) { + // create some zip with static pod definition + var buf bytes.Buffer + zw := zip.NewWriter(&buf) + createStaticPodFile := func(fileName, id, name string) { + w, err := zw.Create(fileName) + assert.NoError(t, err) + spod := `{ + "apiVersion": "v1beta3", + "name": "%v", + "kind": "Pod", + "metadata": { + "labels": { "name": "foo", "cluster": "bar" } + }, + "spec": { + "containers": [{ + "name": "%v", + "image": "library/nginx", + "ports": [{ "containerPort": 80, "name": "http" }], + "livenessProbe": { + "enabled": true, + "type": "http", + "initialDelaySeconds": 30, + "httpGet": { "path": "/", "port": "80" } + } + }] + } + }` + _, err = w.Write([]byte(fmt.Sprintf(spod, id, name))) + assert.NoError(t, err) + } + createStaticPodFile("spod.json", "spod-id-01", "spod-01") + createStaticPodFile("spod2.json", "spod-id-02", "spod-02") + createStaticPodFile("dir/spod.json", "spod-id-03", "spod-03") // same file name as first one to check for overwriting + + expectedStaticPodsNum := 2 // subdirectories are ignored by FileSource, hence only 2 + + err := zw.Close() + assert.NoError(t, err) + + // create fake apiserver + testApiServer := NewTestServer(t, api.NamespaceDefault, nil) + defer testApiServer.server.Close() + + // temporary directory which is normally located in the executor sandbox + staticPodsConfigPath, err := ioutil.TempDir("/tmp", "executor-k8sm-archive") + assert.NoError(t, err) + defer os.RemoveAll(staticPodsConfigPath) + + mockDriver := &MockExecutorDriver{} + updates := make(chan interface{}, 1024) + config := Config{ + Docker: dockertools.ConnectToDockerOrDie("fake://"), + Updates: make(chan interface{}, 1), // allow kube-executor source to proceed past init + APIClient: client.NewOrDie(&client.Config{ + Host: testApiServer.server.URL, + Version: testapi.Version(), + }), + Kubelet: &kubelet.Kubelet{}, + PodStatusFunc: func(kl KubeletInterface, pod *api.Pod) (*api.PodStatus, error) { + return &api.PodStatus{ + ContainerStatuses: []api.ContainerStatus{ + { + Name: "foo", + State: api.ContainerState{ + Running: &api.ContainerStateRunning{}, + }, + }, + }, + Phase: api.PodRunning, + }, nil + }, + StaticPodsConfigPath: staticPodsConfigPath, + } + executor := New(config) + hostname := "h1" + go executor.InitializeStaticPodsSource(func() { + kconfig.NewSourceFile(staticPodsConfigPath, hostname, 1*time.Second, updates) + }) + + // create ExecutorInfo with static pod zip in data field + executorInfo := mesosutil.NewExecutorInfo( + mesosutil.NewExecutorID("ex1"), + mesosutil.NewCommandInfo("k8sm-executor"), + ) + executorInfo.Data = buf.Bytes() + + // start the executor with the static pod data + executor.Init(mockDriver) + executor.Registered(mockDriver, executorInfo, nil, nil) + + // wait for static pod to start + seenPods := map[string]struct{}{} + timeout := time.After(time.Second) + defer mockDriver.AssertExpectations(t) + for { + // filter by PodUpdate type + select { + case <-timeout: + t.Fatalf("Executor should send pod updates for %v pods, only saw %v", expectedStaticPodsNum, len(seenPods)) + case update, ok := <-updates: + if !ok { + return + } + podUpdate, ok := update.(kubelet.PodUpdate) + if !ok { + continue + } + for _, pod := range podUpdate.Pods { + seenPods[pod.Name] = struct{}{} + } + if len(seenPods) == expectedStaticPodsNum { + return + } + } + } +} + // TestExecutorFrameworkMessage ensures that the executor is able to // handle messages from the framework, specifically about lost tasks // and Kamikaze. When a task is lost, the executor needs to clean up diff --git a/contrib/mesos/pkg/executor/service/service.go b/contrib/mesos/pkg/executor/service/service.go index dc8ba9beabe..ad738fd8a24 100644 --- a/contrib/mesos/pkg/executor/service/service.go +++ b/contrib/mesos/pkg/executor/service/service.go @@ -25,6 +25,7 @@ import ( "net/http" "os" "os/exec" + "path/filepath" "strconv" "strings" "sync" @@ -216,7 +217,7 @@ func (s *KubeletExecutorServer) Run(hks hyperkube.Interface, _ []string) error { RootDirectory: s.RootDirectory, // ConfigFile: "" // ManifestURL: "" - // FileCheckFrequency + FileCheckFrequency: s.FileCheckFrequency, // HTTPCheckFrequency PodInfraContainerImage: s.PodInfraContainerImage, SyncFrequency: s.SyncFrequency, @@ -360,6 +361,7 @@ func (ks *KubeletExecutorServer) createAndInitKubelet( //TODO(jdef) either configure Watch here with something useful, or else // get rid of it from executor.Config kubeletFinished := make(chan struct{}) + staticPodsConfigPath := filepath.Join(kc.RootDirectory, "static-pods") exec := executor.New(executor.Config{ Kubelet: klet, Updates: updates, @@ -379,6 +381,12 @@ func (ks *KubeletExecutorServer) createAndInitKubelet( PodStatusFunc: func(_ executor.KubeletInterface, pod *api.Pod) (*api.PodStatus, error) { return klet.GetRuntime().GetPodStatus(pod) }, + StaticPodsConfigPath: staticPodsConfigPath, + }) + + fileSourceUpdates := pc.Channel(kubelet.FileSource) + go exec.InitializeStaticPodsSource(func() { + kconfig.NewSourceFile(staticPodsConfigPath, kc.Hostname, kc.FileCheckFrequency, fileSourceUpdates) }) k := &kubeletExecutor{ diff --git a/contrib/mesos/pkg/scheduler/plugin_test.go b/contrib/mesos/pkg/scheduler/plugin_test.go index 91246e40b91..aaa3c59b08a 100644 --- a/contrib/mesos/pkg/scheduler/plugin_test.go +++ b/contrib/mesos/pkg/scheduler/plugin_test.go @@ -379,12 +379,16 @@ func TestPlugin_LifeCycle(t *testing.T) { testApiServer := NewTestServer(t, api.NamespaceDefault, podListWatch) defer testApiServer.server.Close() + // create executor with some data for static pods if set + executor := util.NewExecutorInfo( + util.NewExecutorID("executor-id"), + util.NewCommandInfo("executor-cmd"), + ) + executor.Data = []byte{0, 1, 2} + // create scheduler testScheduler := New(Config{ - Executor: util.NewExecutorInfo( - util.NewExecutorID("executor-id"), - util.NewCommandInfo("executor-cmd"), - ), + Executor: executor, Client: client.NewOrDie(&client.Config{Host: testApiServer.server.URL, Version: testapi.Version()}), ScheduleFunc: FCFSScheduleFunc, Schedcfg: *schedcfg.CreateDefaultConfig(), @@ -477,6 +481,9 @@ func TestPlugin_LifeCycle(t *testing.T) { testScheduler.StatusUpdate(mockDriver, newTaskStatusForTask(launchedTask, mesos.TaskState_TASK_STAGING)) testScheduler.StatusUpdate(mockDriver, newTaskStatusForTask(launchedTask, mesos.TaskState_TASK_RUNNING)) + // check that ExecutorInfo.data has the static pod data + assert.Len(launchedTask.Executor.Data, 3) + // report back that the task has been lost mockDriver.AssertNumberOfCalls(t, "SendFrameworkMessage", 0) testScheduler.StatusUpdate(mockDriver, newTaskStatusForTask(launchedTask, mesos.TaskState_TASK_LOST)) diff --git a/contrib/mesos/pkg/scheduler/podtask/pod_task.go b/contrib/mesos/pkg/scheduler/podtask/pod_task.go index a90aa1d3c00..f30dfd8c89f 100644 --- a/contrib/mesos/pkg/scheduler/podtask/pod_task.go +++ b/contrib/mesos/pkg/scheduler/podtask/pod_task.go @@ -33,8 +33,8 @@ import ( ) const ( - containerCpus = 0.25 // initial CPU allocated for executor - containerMem = 64 // initial MB of memory allocated for executor + DefaultContainerCpus = 0.25 // initial CPU allocated for executor + DefaultContainerMem = 64 // initial MB of memory allocated for executor ) type StateType int @@ -164,8 +164,8 @@ func (t *T) FillFromDetails(details *mesos.Offer) error { t.Spec = Spec{ SlaveID: details.GetSlaveId().GetValue(), - CPU: containerCpus, - Memory: containerMem, + CPU: DefaultContainerCpus, + Memory: DefaultContainerMem, } if mapping, err := t.mapper.Generate(t, details); err != nil { @@ -238,7 +238,7 @@ func (t *T) AcceptOffer(offer *mesos.Offer) bool { // resource allocation and management. // // TODO(jdef): remove hardcoded values and make use of actual pod resource settings - if (cpus < containerCpus) || (mem < containerMem) { + if (cpus < DefaultContainerCpus) || (mem < DefaultContainerMem) { log.V(3).Infof("not enough resources: cpus: %f mem: %f", cpus, mem) return false } diff --git a/contrib/mesos/pkg/scheduler/service/service.go b/contrib/mesos/pkg/scheduler/service/service.go index 8acc7517be8..3fef41c1214 100644 --- a/contrib/mesos/pkg/scheduler/service/service.go +++ b/contrib/mesos/pkg/scheduler/service/service.go @@ -31,6 +31,7 @@ import ( "sync" "time" + "github.com/GoogleCloudPlatform/kubernetes/contrib/mesos/pkg/archive" "github.com/GoogleCloudPlatform/kubernetes/contrib/mesos/pkg/election" execcfg "github.com/GoogleCloudPlatform/kubernetes/contrib/mesos/pkg/executor/config" "github.com/GoogleCloudPlatform/kubernetes/contrib/mesos/pkg/hyperkube" @@ -41,6 +42,7 @@ import ( "github.com/GoogleCloudPlatform/kubernetes/contrib/mesos/pkg/scheduler/ha" "github.com/GoogleCloudPlatform/kubernetes/contrib/mesos/pkg/scheduler/meta" "github.com/GoogleCloudPlatform/kubernetes/contrib/mesos/pkg/scheduler/metrics" + "github.com/GoogleCloudPlatform/kubernetes/contrib/mesos/pkg/scheduler/podtask" "github.com/GoogleCloudPlatform/kubernetes/contrib/mesos/pkg/scheduler/uid" "github.com/GoogleCloudPlatform/kubernetes/pkg/client" "github.com/GoogleCloudPlatform/kubernetes/pkg/clientauth" @@ -116,6 +118,7 @@ type SchedulerServer struct { KubeletHostNetworkSources string KubeletSyncFrequency time.Duration KubeletNetworkPluginName string + StaticPodsConfigPath string executable string // path to the binary running this service client *client.Client @@ -174,6 +177,7 @@ func (s *SchedulerServer) addCoreFlags(fs *pflag.FlagSet) { fs.BoolVar(&s.AllowPrivileged, "allow-privileged", s.AllowPrivileged, "If true, allow privileged containers.") fs.StringVar(&s.ClusterDomain, "cluster-domain", s.ClusterDomain, "Domain for this cluster. If set, kubelet will configure all containers to search this domain in addition to the host's search domains") fs.Var(&s.ClusterDNS, "cluster-dns", "IP address for a cluster DNS server. If set, kubelet will configure all containers to use this for DNS resolution in addition to the host's DNS servers") + fs.StringVar(&s.StaticPodsConfigPath, "static-pods-config", s.StaticPodsConfigPath, "Path for specification of static pods. Path should point to dir containing the staticPods configuration files. Defaults to none.") fs.StringVar(&s.MesosMaster, "mesos-master", s.MesosMaster, "Location of the Mesos master. The format is a comma-delimited list of of hosts like zk://host1:port,host2:port/mesos. If using ZooKeeper, pay particular attention to the leading zk:// and trailing /mesos! If not using ZooKeeper, standard URLs like http://localhost are also acceptable.") fs.StringVar(&s.MesosUser, "mesos-user", s.MesosUser, "Mesos user for this framework, defaults to root.") @@ -353,6 +357,25 @@ func (s *SchedulerServer) prepareExecutorInfo(hks hyperkube.Interface) (*mesos.E Source: proto.String(execcfg.DefaultInfoSource), } + // Check for staticPods + if s.StaticPodsConfigPath != "" { + bs, numberStaticPods, err := archive.ZipDir(s.StaticPodsConfigPath) + if err != nil { + return nil, nil, err + } + info.Data = bs + + // Adjust the resource accounting for the executor. + // Currently each podTask accounts the default amount of resources. + // TODO(joerg84) adapt to actual resources specified by pods. + log.Infof("Detected %d staticPods in Configuration.", numberStaticPods) + + info.Resources = []*mesos.Resource{ + mutil.NewScalarResource("cpus", float64(numberStaticPods)*podtask.DefaultContainerCpus), + mutil.NewScalarResource("mem", float64(numberStaticPods)*podtask.DefaultContainerMem), + } + } + // calculate ExecutorInfo hash to be used for validating compatibility // of ExecutorInfo's generated by other HA schedulers. ehash := hashExecutorInfo(info) diff --git a/contrib/mesos/pkg/scheduler/service/service_test.go b/contrib/mesos/pkg/scheduler/service/service_test.go index 5db9c6726c1..7f4ff2656c1 100644 --- a/contrib/mesos/pkg/scheduler/service/service_test.go +++ b/contrib/mesos/pkg/scheduler/service/service_test.go @@ -19,8 +19,16 @@ limitations under the License. package service import ( + "archive/zip" + "bytes" + "io/ioutil" + "os" + "path/filepath" "testing" "time" + + "github.com/GoogleCloudPlatform/kubernetes/contrib/mesos/pkg/archive" + "github.com/stretchr/testify/assert" ) type fakeSchedulerProcess struct { @@ -106,3 +114,42 @@ func Test_awaitFailoverDoneFailover(t *testing.T) { t.Fatalf("expected call to failover handler") } } + +func Test_StaticPods(t *testing.T) { + assert := assert.New(t) + + // create static pods config files, spod1 on toplevel and spod2 in a directory "dir" + staticPodsConfigPath, err := ioutil.TempDir(os.TempDir(), "executor-k8sm-archive") + assert.NoError(err) + defer os.RemoveAll(staticPodsConfigPath) + + spod1, err := os.Create(filepath.Join(staticPodsConfigPath, "spod1.json")) + assert.NoError(err) + _, err = spod1.WriteString("content1") + assert.NoError(err) + + err = os.Mkdir(filepath.Join(staticPodsConfigPath, "dir"), 0755) + assert.NoError(err) + + spod2, err := os.Create(filepath.Join(staticPodsConfigPath, "dir", "spod2.json")) + assert.NoError(err) + _, err = spod2.WriteString("content2") + assert.NoError(err) + + // archive config files + data, fileNum, err := archive.ZipDir(staticPodsConfigPath) + assert.NoError(err) + assert.Equal(2, fileNum) + + // unarchive config files + zr, err := zip.NewReader(bytes.NewReader(data), int64(len(data))) + assert.NoError(err) + fileNames := []string{} + for _, f := range zr.File { + if !f.FileInfo().IsDir() { + fileNames = append(fileNames, f.Name) + } + } + assert.Contains(fileNames, "spod1.json") + assert.Contains(fileNames, "dir/spod2.json") +}