From 3d3577b9f381486ecae4a554a7f4a6685de71a63 Mon Sep 17 00:00:00 2001 From: James DeFelice Date: Mon, 14 Sep 2015 17:17:08 +0200 Subject: [PATCH] rewrite of static pod json zipper - add busybox static pod to mesos-docker cluster - customize static pods with binding annotations - code cleanup - removed hacky podtask.And func; support minimal resources for static pods when resource accounting is disabled - removed zip archive of static pods, changed to gzip of PodList json - pod utilities moved to package podutil - added e2e test - merge watched mirror pods into the mesos pod config stream --- cluster/mesos/docker/deploy-addons.sh | 3 +- cluster/mesos/docker/docker-compose.yml | 3 + cluster/mesos/docker/static-pod.json | 23 +++ cluster/mesos/docker/static-pods-ns.yaml | 7 + contrib/mesos/pkg/archive/zip.go | 137 --------------- contrib/mesos/pkg/archive/zip_test.go | 66 ------- contrib/mesos/pkg/executor/executor.go | 65 +++---- contrib/mesos/pkg/executor/executor_test.go | 156 ++++++----------- .../mesos/pkg/executor/service/podsource.go | 128 ++++++++++++++ contrib/mesos/pkg/executor/service/service.go | 53 +++--- contrib/mesos/pkg/{archive => podutil}/doc.go | 6 +- contrib/mesos/pkg/podutil/filters.go | 90 ++++++++++ contrib/mesos/pkg/podutil/gzip.go | 81 +++++++++ contrib/mesos/pkg/podutil/gzip_test.go | 69 ++++++++ contrib/mesos/pkg/podutil/io.go | 123 +++++++++++++ .../components/framework/framework.go | 1 - .../mesos/pkg/scheduler/meta/annotations.go | 1 + contrib/mesos/pkg/scheduler/scheduler.go | 1 - .../mesos/pkg/scheduler/service/service.go | 80 +++++---- .../pkg/scheduler/service/service_test.go | 47 ----- .../mesos/pkg/scheduler/service/validation.go | 49 ++++++ .../pkg/scheduler/service/validation_test.go | 161 ++++++++++++++++++ test/e2e/mesos.go | 18 +- 23 files changed, 909 insertions(+), 459 deletions(-) create mode 100644 cluster/mesos/docker/static-pod.json create mode 100644 cluster/mesos/docker/static-pods-ns.yaml delete mode 100644 contrib/mesos/pkg/archive/zip.go delete mode 100644 contrib/mesos/pkg/archive/zip_test.go create mode 100644 contrib/mesos/pkg/executor/service/podsource.go rename contrib/mesos/pkg/{archive => podutil}/doc.go (82%) create mode 100644 contrib/mesos/pkg/podutil/filters.go create mode 100644 contrib/mesos/pkg/podutil/gzip.go create mode 100644 contrib/mesos/pkg/podutil/gzip_test.go create mode 100644 contrib/mesos/pkg/podutil/io.go create mode 100644 contrib/mesos/pkg/scheduler/service/validation.go create mode 100644 contrib/mesos/pkg/scheduler/service/validation_test.go diff --git a/cluster/mesos/docker/deploy-addons.sh b/cluster/mesos/docker/deploy-addons.sh index b18f25de56f..81572346eef 100755 --- a/cluster/mesos/docker/deploy-addons.sh +++ b/cluster/mesos/docker/deploy-addons.sh @@ -52,8 +52,9 @@ function deploy_ui { "${kubectl}" create -f "${KUBE_ROOT}/cluster/addons/kube-ui/kube-ui-svc.yaml" } -# create the kube-system namespace +# create the kube-system and static-pods namespaces "${kubectl}" create -f "${KUBE_ROOT}/cluster/mesos/docker/kube-system-ns.yaml" +"${kubectl}" create -f "${KUBE_ROOT}/cluster/mesos/docker/static-pods-ns.yaml" if [ "${ENABLE_CLUSTER_DNS}" == true ]; then cluster::mesos::docker::run_in_temp_dir 'k8sm-dns' 'deploy_dns' diff --git a/cluster/mesos/docker/docker-compose.yml b/cluster/mesos/docker/docker-compose.yml index 0172eb5f207..31eb4ae827a 100644 --- a/cluster/mesos/docker/docker-compose.yml +++ b/cluster/mesos/docker/docker-compose.yml @@ -141,6 +141,7 @@ scheduler: --cluster-domain=cluster.local --mesos-executor-cpus=1.0 --mesos-sandbox-overlay=/opt/sandbox-overlay.tar.gz + --static-pods-config=/opt/static-pods --v=4 --executor-logv=4 --profiling=true @@ -148,6 +149,8 @@ scheduler: - etcd - mesosmaster1 - apiserver + volumes: + - ./static-pod.json:/opt/static-pods/static-pod.json keygen: image: mesosphere/kubernetes-mesos-keygen command: diff --git a/cluster/mesos/docker/static-pod.json b/cluster/mesos/docker/static-pod.json new file mode 100644 index 00000000000..46222fc9f14 --- /dev/null +++ b/cluster/mesos/docker/static-pod.json @@ -0,0 +1,23 @@ +{ + "apiVersion": "v1", + "kind": "Pod", + "metadata": { + "name": "busybox", + "namespace": "static-pods" + }, + "spec": { + "containers": [ + { + "image": "busybox", + "command": [ + "sh", + "-c", + "exec tail -f /dev/null" + ], + "imagePullPolicy": "IfNotPresent", + "name": "busybox" + } + ], + "restartPolicy": "Always" + } +} diff --git a/cluster/mesos/docker/static-pods-ns.yaml b/cluster/mesos/docker/static-pods-ns.yaml new file mode 100644 index 00000000000..7b155af87c9 --- /dev/null +++ b/cluster/mesos/docker/static-pods-ns.yaml @@ -0,0 +1,7 @@ +--- +apiVersion: v1 +kind: Namespace +metadata: + name: static-pods + labels: + name: static-pods diff --git a/contrib/mesos/pkg/archive/zip.go b/contrib/mesos/pkg/archive/zip.go deleted file mode 100644 index 60ef52611c4..00000000000 --- a/contrib/mesos/pkg/archive/zip.go +++ /dev/null @@ -1,137 +0,0 @@ -/* -Copyright 2015 The Kubernetes Authors All rights reserved. - -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. -*/ - -package archive - -import ( - "archive/zip" - "bytes" - "fmt" - "io" - "os" - "path" - "path/filepath" -) - -// ZipWalker returns a filepath.WalkFunc that adds every filesystem node -// to the given *zip.Writer. -func ZipWalker(zw *zip.Writer) filepath.WalkFunc { - var base string - return func(path string, info os.FileInfo, err error) error { - if base == "" { - base = path - } - - header, err := zip.FileInfoHeader(info) - if err != nil { - return err - } - - if header.Name, err = filepath.Rel(base, path); err != nil { - return err - } else if info.IsDir() { - header.Name = header.Name + string(filepath.Separator) - } else { - header.Method = zip.Deflate - } - - w, err := zw.CreateHeader(header) - if err != nil { - return err - } - - if info.IsDir() { - return nil - } - - f, err := os.Open(path) - if err != nil { - return err - } - - _, err = io.Copy(w, f) - f.Close() - return err - } -} - -// Create a zip of all files in a directory recursively, return a byte array and -// the number of files archived. -func ZipDir(path string) ([]byte, []string, error) { - var buf bytes.Buffer - zw := zip.NewWriter(&buf) - zipWalker := ZipWalker(zw) - paths := []string{} - err := filepath.Walk(path, filepath.WalkFunc(func(path string, info os.FileInfo, err error) error { - if !info.IsDir() { - paths = append(paths, path) - } - return zipWalker(path, info, err) - })) - - if err != nil { - return nil, nil, err - } else if err = zw.Close(); err != nil { - return nil, nil, err - } - return buf.Bytes(), paths, nil -} - -// UnzipDir unzips all files from a given zip byte array into a given directory. -// The directory is created if it does not exist yet. -func UnzipDir(data []byte, destPath string) error { - // open zip - zr, err := zip.NewReader(bytes.NewReader(data), int64(len(data))) - if err != nil { - return fmt.Errorf("Unzip archive read error: %v", err) - } - - for _, file := range zr.File { - // skip directories - if file.FileInfo().IsDir() { - continue - } - - // open file - rc, err := file.Open() - defer rc.Close() - if err != nil { - return fmt.Errorf("Unzip file read error: %v", err) - } - - // make sure the directory of the file exists, otherwise create - destPath := filepath.Clean(filepath.Join(destPath, file.Name)) - destBasedir := path.Dir(destPath) - err = os.MkdirAll(destBasedir, 0755) - if err != nil { - return fmt.Errorf("Unzip mkdir error: %v", err) - } - - // create file - f, err := os.Create(destPath) - if err != nil { - return fmt.Errorf("Unzip file creation error: %v", err) - } - defer f.Close() - - // write file - if _, err := io.Copy(f, rc); err != nil { - return fmt.Errorf("Unzip file write error: %v", err) - } - } - - return nil -} diff --git a/contrib/mesos/pkg/archive/zip_test.go b/contrib/mesos/pkg/archive/zip_test.go deleted file mode 100644 index 1889a124d5f..00000000000 --- a/contrib/mesos/pkg/archive/zip_test.go +++ /dev/null @@ -1,66 +0,0 @@ -/* -Copyright 2015 The Kubernetes Authors All rights reserved. - -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. -*/ - -package archive - -import ( - "archive/zip" - "bytes" - "io/ioutil" - "os" - "path/filepath" - "testing" -) - -func TestZipWalker(t *testing.T) { - dir, err := ioutil.TempDir(os.TempDir(), "") - if err != nil { - t.Fatal(err) - } - - tree := map[string]string{"a/b/c": "12345", "a/b/d": "54321", "a/e": "00000"} - for path, content := range tree { - path = filepath.Join(dir, path) - if err := os.MkdirAll(filepath.Dir(path), os.ModeTemporary|0700); err != nil { - t.Fatal(err) - } else if err = ioutil.WriteFile(path, []byte(content), 0700); err != nil { - t.Fatal(err) - } - } - - var buf bytes.Buffer - zw := zip.NewWriter(&buf) - if err := filepath.Walk(dir, ZipWalker(zw)); err != nil { - t.Fatal(err) - } else if err = zw.Close(); err != nil { - t.Fatal(err) - } - - zr, err := zip.NewReader(bytes.NewReader(buf.Bytes()), int64(buf.Len())) - if err != nil { - t.Fatal(err) - } - - for _, file := range zr.File { - if rc, err := file.Open(); err != nil { - t.Fatal(err) - } else if got, err := ioutil.ReadAll(rc); err != nil { - t.Error(err) - } else if want := []byte(tree[file.Name]); !bytes.Equal(got, want) { - t.Errorf("%s\ngot: %s\nwant: %s", file.Name, got, want) - } - } -} diff --git a/contrib/mesos/pkg/executor/executor.go b/contrib/mesos/pkg/executor/executor.go index f063ffdc54c..f820b5153be 100644 --- a/contrib/mesos/pkg/executor/executor.go +++ b/contrib/mesos/pkg/executor/executor.go @@ -30,9 +30,9 @@ import ( bindings "github.com/mesos/mesos-go/executor" mesos "github.com/mesos/mesos-go/mesosproto" mutil "github.com/mesos/mesos-go/mesosutil" - "k8s.io/kubernetes/contrib/mesos/pkg/archive" "k8s.io/kubernetes/contrib/mesos/pkg/executor/messages" "k8s.io/kubernetes/contrib/mesos/pkg/node" + "k8s.io/kubernetes/contrib/mesos/pkg/podutil" "k8s.io/kubernetes/contrib/mesos/pkg/scheduler/meta" "k8s.io/kubernetes/pkg/api" unversionedapi "k8s.io/kubernetes/pkg/api/unversioned" @@ -205,11 +205,19 @@ func (k *Executor) isDone() bool { } } -// sendPodUpdate assumes that caller is holding state lock; returns true when update is sent otherwise false -func (k *Executor) sendPodUpdate(u *kubetypes.PodUpdate) bool { +// sendPodsSnapshot assumes that caller is holding state lock; returns true when update is sent otherwise false +func (k *Executor) sendPodsSnapshot() bool { if k.isDone() { return false } + snapshot := make([]*api.Pod, 0, len(k.pods)) + for _, v := range k.pods { + snapshot = append(snapshot, v) + } + u := &kubetypes.PodUpdate{ + Op: kubetypes.SET, + Pods: snapshot, + } k.updateChan <- *u return true } @@ -227,7 +235,10 @@ func (k *Executor) Registered(driver bindings.ExecutorDriver, } if executorInfo != nil && executorInfo.Data != nil { - k.initializeStaticPodsSource(executorInfo.Data) + err := k.initializeStaticPodsSource(slaveInfo.GetHostname(), executorInfo.Data) + if err != nil { + log.Errorf("failed to initialize static pod configuration: %v", err) + } } if slaveInfo != nil { @@ -240,10 +251,7 @@ func (k *Executor) Registered(driver bindings.ExecutorDriver, // emit an empty update to allow the mesos "source" to be marked as seen k.lock.Lock() defer k.lock.Unlock() - k.sendPodUpdate(&kubetypes.PodUpdate{ - Pods: []*api.Pod{}, - Op: kubetypes.SET, - }) + k.sendPodsSnapshot() if slaveInfo != nil && k.nodeInfos != nil { k.nodeInfos <- nodeInfo(slaveInfo, executorInfo) // leave it behind the upper lock to avoid panics @@ -280,13 +288,15 @@ func (k *Executor) Reregistered(driver bindings.ExecutorDriver, slaveInfo *mesos } // initializeStaticPodsSource unzips the data slice into the static-pods directory -func (k *Executor) initializeStaticPodsSource(data []byte) { +func (k *Executor) initializeStaticPodsSource(hostname string, data []byte) error { log.V(2).Infof("extracting static pods config to %s", k.staticPodsConfigPath) - err := archive.UnzipDir(data, k.staticPodsConfigPath) - if err != nil { - log.Errorf("Failed to extract static pod config: %v", err) - return - } + // annotate the pod with BindingHostKey so that the scheduler will ignore the pod + // once it appears in the pod registry. the stock kubelet sets the pod host in order + // to accomplish the same; we do this because the k8sm scheduler works differently. + annotator := podutil.Annotator(map[string]string{ + meta.BindingHostKey: hostname, + }) + return podutil.WriteToDir(annotator.Do(podutil.Gunzip(data)), k.staticPodsConfigPath) } // Disconnected is called when the executor is disconnected from the slave. @@ -390,10 +400,7 @@ func (k *Executor) handleChangedApiserverPod(pod *api.Pod) { oldPod.DeletionTimestamp = pod.DeletionTimestamp oldPod.DeletionGracePeriodSeconds = pod.DeletionGracePeriodSeconds - k.sendPodUpdate(&kubetypes.PodUpdate{ - Op: kubetypes.UPDATE, - Pods: []*api.Pod{oldPod}, - }) + k.sendPodsSnapshot() } } } @@ -557,17 +564,14 @@ func (k *Executor) launchTask(driver bindings.ExecutorDriver, taskId string, pod //TODO(jdef) check for duplicate pod name, if found send TASK_ERROR // send the new pod to the kubelet which will spin it up - ok := k.sendPodUpdate(&kubetypes.PodUpdate{ - Op: kubetypes.ADD, - Pods: []*api.Pod{pod}, - }) - if !ok { - return // executor is terminating, cancel launch - } - - // mark task as sent by setting the podName and register the sent pod task.podName = podFullName k.pods[podFullName] = pod + ok := k.sendPodsSnapshot() + if !ok { + task.podName = "" + delete(k.pods, podFullName) + return // executor is terminating, cancel launch + } // From here on, we need to delete containers associated with the task upon // it going into a terminal state. @@ -752,7 +756,7 @@ func (k *Executor) removePodTask(driver bindings.ExecutorDriver, tid, reason str k.resetSuicideWatch(driver) pid := task.podName - pod, found := k.pods[pid] + _, found := k.pods[pid] if !found { log.Warningf("Cannot remove unknown pod %v for task %v", pid, tid) } else { @@ -760,10 +764,7 @@ func (k *Executor) removePodTask(driver bindings.ExecutorDriver, tid, reason str delete(k.pods, pid) // tell the kubelet to remove the pod - k.sendPodUpdate(&kubetypes.PodUpdate{ - Op: kubetypes.REMOVE, - Pods: []*api.Pod{pod}, - }) + k.sendPodsSnapshot() } // TODO(jdef): ensure that the update propagates, perhaps return a signal chan? k.sendStatus(driver, newStatus(mutil.NewTaskID(tid), state, reason)) diff --git a/contrib/mesos/pkg/executor/executor_test.go b/contrib/mesos/pkg/executor/executor_test.go index 959a95398fb..166b67235bb 100644 --- a/contrib/mesos/pkg/executor/executor_test.go +++ b/contrib/mesos/pkg/executor/executor_test.go @@ -17,14 +17,13 @@ limitations under the License. package executor import ( - "archive/zip" - "bytes" "fmt" "io/ioutil" "net" "net/http" "net/http/httptest" "os" + "path/filepath" "reflect" "sync" "sync/atomic" @@ -33,6 +32,7 @@ import ( assertext "k8s.io/kubernetes/contrib/mesos/pkg/assert" "k8s.io/kubernetes/contrib/mesos/pkg/executor/messages" + "k8s.io/kubernetes/contrib/mesos/pkg/podutil" kmruntime "k8s.io/kubernetes/contrib/mesos/pkg/runtime" "k8s.io/kubernetes/contrib/mesos/pkg/scheduler/podtask" "k8s.io/kubernetes/pkg/api" @@ -41,7 +41,6 @@ import ( "k8s.io/kubernetes/pkg/client/cache" client "k8s.io/kubernetes/pkg/client/unversioned" "k8s.io/kubernetes/pkg/kubelet" - kconfig "k8s.io/kubernetes/pkg/kubelet/config" "k8s.io/kubernetes/pkg/kubelet/dockertools" kubetypes "k8s.io/kubernetes/pkg/kubelet/types" "k8s.io/kubernetes/pkg/runtime" @@ -49,7 +48,6 @@ import ( "k8s.io/kubernetes/pkg/watch" "github.com/mesos/mesos-go/mesosproto" - "github.com/mesos/mesos-go/mesosutil" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/mock" ) @@ -246,123 +244,79 @@ func TestExecutorLaunchAndKillTask(t *testing.T) { // TestExecutorStaticPods test that the ExecutorInfo.data is parsed // as a zip archive with pod definitions. -func TestExecutorStaticPods(t *testing.T) { +func TestExecutorInitializeStaticPodsSource(t *testing.T) { // create some zip with static pod definition - var buf bytes.Buffer - zw := zip.NewWriter(&buf) - createStaticPodFile := func(fileName, id, name string) { - w, err := zw.Create(fileName) - assert.NoError(t, err) - spod := `{ - "apiVersion": "v1", - "kind": "Pod", - "metadata": { - "name": "%v", - "labels": { "name": "foo", "cluster": "bar" } - }, - "spec": { - "containers": [{ - "name": "%v", - "image": "library/nginx", - "ports": [{ "containerPort": 80, "name": "http" }], - "livenessProbe": { - "enabled": true, - "type": "http", - "initialDelaySeconds": 30, - "httpGet": { "path": "/", "port": 80 } + givenPodsDir, err := ioutil.TempDir("/tmp", "executor-givenpods") + assert.NoError(t, err) + defer os.RemoveAll(givenPodsDir) + + var wg sync.WaitGroup + reportErrors := func(errCh <-chan error) { + wg.Add(1) + go func() { + defer wg.Done() + for err := range errCh { + t.Error(err) } - }] + }() } - }` - _, err = w.Write([]byte(fmt.Sprintf(spod, id, name))) + + createStaticPodFile := func(fileName, name string) { + spod := `{ + "apiVersion": "v1", + "kind": "Pod", + "metadata": { + "name": "%v", + "namespace": "staticpods", + "labels": { "name": "foo", "cluster": "bar" } + }, + "spec": { + "containers": [{ + "name": "%v", + "image": "library/nginx", + "ports": [{ "containerPort": 80, "name": "http" }] + }] + } + }` + destfile := filepath.Join(givenPodsDir, fileName) + err = os.MkdirAll(filepath.Dir(destfile), 0770) + assert.NoError(t, err) + err = ioutil.WriteFile(destfile, []byte(fmt.Sprintf(spod, name, name)), 0660) assert.NoError(t, err) } - createStaticPodFile("spod.json", "spod-id-01", "spod-01") - createStaticPodFile("spod2.json", "spod-id-02", "spod-02") - createStaticPodFile("dir/spod.json", "spod-id-03", "spod-03") // same file name as first one to check for overwriting - expectedStaticPodsNum := 2 // subdirectories are ignored by FileSource, hence only 2 + createStaticPodFile("spod.json", "spod-01") + createStaticPodFile("spod2.json", "spod-02") + createStaticPodFile("dir/spod.json", "spod-03") // same file name as first one to check for overwriting + staticpods, errs := podutil.ReadFromDir(givenPodsDir) + reportErrors(errs) - err := zw.Close() + gzipped, err := podutil.Gzip(staticpods) assert.NoError(t, err) - // create fake apiserver - testApiServer := NewTestServer(t, api.NamespaceDefault, nil) - defer testApiServer.server.Close() + expectedStaticPodsNum := 2 // subdirectories are ignored by FileSource, hence only 2 // temporary directory which is normally located in the executor sandbox staticPodsConfigPath, err := ioutil.TempDir("/tmp", "executor-k8sm-archive") assert.NoError(t, err) defer os.RemoveAll(staticPodsConfigPath) - mockDriver := &MockExecutorDriver{} - config := Config{ - Docker: dockertools.ConnectToDockerOrDie("fake://"), - Updates: make(chan kubetypes.PodUpdate, 1), // allow kube-executor source to proceed past init - NodeInfos: make(chan NodeInfo, 1), - APIClient: client.NewOrDie(&client.Config{ - Host: testApiServer.server.URL, - Version: testapi.Default.Version(), - }), - PodStatusFunc: func(pod *api.Pod) (*api.PodStatus, error) { - return &api.PodStatus{ - ContainerStatuses: []api.ContainerStatus{ - { - Name: "foo", - State: api.ContainerState{ - Running: &api.ContainerStateRunning{}, - }, - }, - }, - Phase: api.PodRunning, - }, nil - }, - StaticPodsConfigPath: staticPodsConfigPath, - PodLW: &NewMockPodsListWatch(api.PodList{}).ListWatch, + executor := &Executor{ + staticPodsConfigPath: staticPodsConfigPath, } - executor := New(config) - // register static pod source + // extract the pods into staticPodsConfigPath hostname := "h1" - fileSourceUpdates := make(chan interface{}, 1024) - kconfig.NewSourceFile(staticPodsConfigPath, hostname, 1*time.Second, fileSourceUpdates) + err = executor.initializeStaticPodsSource(hostname, gzipped) + assert.NoError(t, err) - // create ExecutorInfo with static pod zip in data field - executorInfo := mesosutil.NewExecutorInfo( - mesosutil.NewExecutorID("ex1"), - mesosutil.NewCommandInfo("k8sm-executor"), - ) - executorInfo.Data = buf.Bytes() + actualpods, errs := podutil.ReadFromDir(staticPodsConfigPath) + reportErrors(errs) - // start the executor with the static pod data - executor.Init(mockDriver) - executor.Registered(mockDriver, executorInfo, nil, nil) - - // wait for static pod to start - seenPods := map[string]struct{}{} - timeout := time.After(util.ForeverTestTimeout) - defer mockDriver.AssertExpectations(t) - for { - // filter by PodUpdate type - select { - case <-timeout: - t.Fatalf("Executor should send pod updates for %v pods, only saw %v", expectedStaticPodsNum, len(seenPods)) - case update, ok := <-fileSourceUpdates: - if !ok { - return - } - podUpdate, ok := update.(kubetypes.PodUpdate) - if !ok { - continue - } - for _, pod := range podUpdate.Pods { - seenPods[pod.Name] = struct{}{} - } - if len(seenPods) == expectedStaticPodsNum { - return - } - } - } + list := podutil.List(actualpods) + assert.NotNil(t, list) + assert.Equal(t, expectedStaticPodsNum, len(list.Items)) + wg.Wait() } // TestExecutorFrameworkMessage ensures that the executor is able to diff --git a/contrib/mesos/pkg/executor/service/podsource.go b/contrib/mesos/pkg/executor/service/podsource.go new file mode 100644 index 00000000000..1cb25488a22 --- /dev/null +++ b/contrib/mesos/pkg/executor/service/podsource.go @@ -0,0 +1,128 @@ +/* +Copyright 2015 The Kubernetes Authors All rights reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package service + +import ( + "fmt" + + log "github.com/golang/glog" + "k8s.io/kubernetes/pkg/api" + "k8s.io/kubernetes/pkg/client/cache" + kubetypes "k8s.io/kubernetes/pkg/kubelet/types" +) + +const ( + // if we don't use this source then the kubelet will do funny, mirror things. we alias + // this here for convenience. see the docs for sourceMesos for additional explanation. + // @see ConfigSourceAnnotationKey + mesosSource = kubetypes.ApiserverSource +) + +// sourceMesos merges pods from mesos, and mirror pods from the apiserver. why? +// (a) can't have two sources with the same name; +// (b) all sources, other than ApiserverSource are considered static/mirror +// sources, and; +// (c) kubelet wants to see mirror pods reflected in a non-static source. +// +// Mesos pods must appear to come from apiserver due to (b), while reflected +// static pods (mirror pods) must appear to come from apiserver due to (c). +// +// The only option I could think of was creating a source that merges the pod +// streams. I don't like it. But I could think of anything else, other than +// starting to hack up the kubelet's understanding of mirror/static pod +// sources (ouch!) +type sourceMesos struct { + sourceFinished chan struct{} // sourceFinished closes when mergeAndForward exits + out chan<- interface{} // out is the sink for merged pod snapshots + mirrorPods chan []*api.Pod // mirrorPods communicates snapshots of the current set of mirror pods + execUpdates <-chan kubetypes.PodUpdate // execUpdates receives snapshots of the current set of mesos pods +} + +// newSourceMesos creates a pod config source that merges pod updates from +// mesos (via execUpdates), and mirror pod updates from the apiserver (via +// podWatch) writing the merged update stream to the out chan. It is expected +// that execUpdates will only ever contain SET operations. The source takes +// ownership of the sourceFinished chan, closing it when the source terminates. +// Source termination happens when the execUpdates chan is closed and fully +// drained of updates. +func newSourceMesos( + sourceFinished chan struct{}, + execUpdates <-chan kubetypes.PodUpdate, + out chan<- interface{}, + podWatch *cache.ListWatch, +) { + source := &sourceMesos{ + sourceFinished: sourceFinished, + mirrorPods: make(chan []*api.Pod), + execUpdates: execUpdates, + out: out, + } + // reflect changes from the watch into a chan, filtered to include only mirror pods (have an ConfigMirrorAnnotationKey attr) + cache.NewReflector(podWatch, &api.Pod{}, cache.NewUndeltaStore(source.send, cache.MetaNamespaceKeyFunc), 0).RunUntil(sourceFinished) + go source.mergeAndForward() +} + +func (source *sourceMesos) send(objs []interface{}) { + var mirrors []*api.Pod + for _, o := range objs { + p := o.(*api.Pod) + if _, ok := p.Annotations[kubetypes.ConfigMirrorAnnotationKey]; ok { + mirrors = append(mirrors, p) + } + } + select { + case <-source.sourceFinished: + case source.mirrorPods <- mirrors: + } +} + +func (source *sourceMesos) mergeAndForward() { + // execUpdates will be closed by the executor on shutdown + defer close(source.sourceFinished) + var ( + mirrors = []*api.Pod{} + pods = []*api.Pod{} + ) +eventLoop: + for { + select { + case m := <-source.mirrorPods: + mirrors = m[:] + u := kubetypes.PodUpdate{ + Op: kubetypes.SET, + Pods: append(m, pods...), + Source: mesosSource, + } + log.V(3).Infof("mirror update, sending snapshot of size %d", len(u.Pods)) + source.out <- u + case u, ok := <-source.execUpdates: + if !ok { + break eventLoop + } + if u.Op != kubetypes.SET { + panic(fmt.Sprintf("unexpected Op type: %v", u.Op)) + } + + pods = u.Pods[:] + u.Pods = append(u.Pods, mirrors...) + u.Source = mesosSource + log.V(3).Infof("pods update, sending snapshot of size %d", len(u.Pods)) + source.out <- u + } + } + log.V(2).Infoln("mesos pod source terminating normally") +} diff --git a/contrib/mesos/pkg/executor/service/service.go b/contrib/mesos/pkg/executor/service/service.go index bce1cbec0ec..7fca7c07104 100644 --- a/contrib/mesos/pkg/executor/service/service.go +++ b/contrib/mesos/pkg/executor/service/service.go @@ -42,12 +42,6 @@ import ( kubetypes "k8s.io/kubernetes/pkg/kubelet/types" ) -const ( - // if we don't use this source then the kubelet will do funny, mirror things. - // @see ConfigSourceAnnotationKey - MESOS_CFG_SOURCE = kubetypes.ApiserverSource -) - type KubeletExecutorServer struct { *app.KubeletServer SuicideTimeout time.Duration @@ -79,8 +73,14 @@ func (s *KubeletExecutorServer) AddFlags(fs *pflag.FlagSet) { fs.DurationVar(&s.LaunchGracePeriod, "mesos-launch-grace-period", s.LaunchGracePeriod, "Launch grace period after which launching tasks will be cancelled. Zero disables launch cancellation.") } -func (s *KubeletExecutorServer) runExecutor(execUpdates chan<- kubetypes.PodUpdate, nodeInfos chan<- executor.NodeInfo, kubeletFinished <-chan struct{}, - staticPodsConfigPath string, apiclient *client.Client) error { +func (s *KubeletExecutorServer) runExecutor( + execUpdates chan<- kubetypes.PodUpdate, + nodeInfos chan<- executor.NodeInfo, + kubeletFinished <-chan struct{}, + staticPodsConfigPath string, + apiclient *client.Client, + podLW *cache.ListWatch, +) error { exec := executor.New(executor.Config{ Updates: execUpdates, APIClient: apiclient, @@ -111,10 +111,8 @@ func (s *KubeletExecutorServer) runExecutor(execUpdates chan<- kubetypes.PodUpda return status, nil }, StaticPodsConfigPath: staticPodsConfigPath, - PodLW: cache.NewListWatchFromClient(apiclient, "pods", api.NamespaceAll, - fields.OneTermEqualSelector(client.PodHost, s.HostnameOverride), - ), - NodeInfos: nodeInfos, + PodLW: podLW, + NodeInfos: nodeInfos, }) // initialize driver and initialize the executor with it @@ -141,8 +139,14 @@ func (s *KubeletExecutorServer) runExecutor(execUpdates chan<- kubetypes.PodUpda return nil } -func (s *KubeletExecutorServer) runKubelet(execUpdates <-chan kubetypes.PodUpdate, nodeInfos <-chan executor.NodeInfo, kubeletDone chan<- struct{}, - staticPodsConfigPath string, apiclient *client.Client) error { +func (s *KubeletExecutorServer) runKubelet( + execUpdates <-chan kubetypes.PodUpdate, + nodeInfos <-chan executor.NodeInfo, + kubeletDone chan<- struct{}, + staticPodsConfigPath string, + apiclient *client.Client, + podLW *cache.ListWatch, +) error { kcfg, err := s.UnsecuredKubeletConfig() if err == nil { // apply Messo specific settings @@ -199,17 +203,8 @@ func (s *KubeletExecutorServer) runKubelet(execUpdates <-chan kubetypes.PodUpdat } }() - // create main pod source - updates := kcfg.PodConfig.Channel(MESOS_CFG_SOURCE) - go func() { - // execUpdates will be closed by the executor on shutdown - defer close(executorDone) - - for u := range execUpdates { - u.Source = MESOS_CFG_SOURCE - updates <- u - } - }() + // create main pod source, it will close executorDone when the executor updates stop flowing + newSourceMesos(executorDone, execUpdates, kcfg.PodConfig.Channel(mesosSource), podLW) // create static-pods directory file source log.V(2).Infof("initializing static pods source factory, configured at path %q", staticPodsConfigPath) @@ -257,14 +252,18 @@ func (s *KubeletExecutorServer) Run(hks hyperkube.Interface, _ []string) error { return fmt.Errorf("cannot create API client: %v", err) } + pw := cache.NewListWatchFromClient(apiclient, "pods", api.NamespaceAll, + fields.OneTermEqualSelector(client.PodHost, s.HostnameOverride), + ) + // start executor - err = s.runExecutor(execUpdates, nodeInfos, kubeletFinished, staticPodsConfigPath, apiclient) + err = s.runExecutor(execUpdates, nodeInfos, kubeletFinished, staticPodsConfigPath, apiclient, pw) if err != nil { return err } // start kubelet, blocking - return s.runKubelet(execUpdates, nodeInfos, kubeletFinished, staticPodsConfigPath, apiclient) + return s.runKubelet(execUpdates, nodeInfos, kubeletFinished, staticPodsConfigPath, apiclient, pw) } func defaultBindingAddress() string { diff --git a/contrib/mesos/pkg/archive/doc.go b/contrib/mesos/pkg/podutil/doc.go similarity index 82% rename from contrib/mesos/pkg/archive/doc.go rename to contrib/mesos/pkg/podutil/doc.go index a4e311de230..89086ecf463 100644 --- a/contrib/mesos/pkg/archive/doc.go +++ b/contrib/mesos/pkg/podutil/doc.go @@ -14,6 +14,6 @@ See the License for the specific language governing permissions and limitations under the License. */ -// Package archive provides utilities to archive and unarchive filesystem -// hierarchies. -package archive +// podutil contains utilities for reading, writing and filtering streams +// and lists of api.Pod objects. +package podutil diff --git a/contrib/mesos/pkg/podutil/filters.go b/contrib/mesos/pkg/podutil/filters.go new file mode 100644 index 00000000000..f90009b5d91 --- /dev/null +++ b/contrib/mesos/pkg/podutil/filters.go @@ -0,0 +1,90 @@ +/* +Copyright 2015 The Kubernetes Authors All rights reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package podutil + +import ( + log "github.com/golang/glog" + "k8s.io/kubernetes/pkg/api" +) + +type defaultFunc func(pod *api.Pod) error + +type FilterFunc func(pod *api.Pod) (bool, error) + +type Filters []FilterFunc + +// Annotate safely copies annotation metadata from kv to meta.Annotations. +func Annotate(meta *api.ObjectMeta, kv map[string]string) { + //TODO(jdef) this func probably belong in an "apiutil" package, but we don't + //have much to put there right now so it can just live here. + if meta.Annotations == nil { + meta.Annotations = make(map[string]string) + } + for k, v := range kv { + meta.Annotations[k] = v + } +} + +// Annotator returns a filter that copies annotations from map m into a pod +func Annotator(m map[string]string) FilterFunc { + return FilterFunc(func(pod *api.Pod) (bool, error) { + Annotate(&pod.ObjectMeta, m) + return true, nil + }) +} + +// Stream returns a chan of pods that yields each pod from the given list. +// No pods are yielded if err is non-nil. +func Stream(list *api.PodList, err error) <-chan *api.Pod { + out := make(chan *api.Pod) + go func() { + defer close(out) + if err != nil { + log.Errorf("failed to obtain pod list: %v", err) + return + } + for _, pod := range list.Items { + pod := pod + out <- &pod + } + }() + return out +} + +func (filter FilterFunc) Do(in <-chan *api.Pod) <-chan *api.Pod { + out := make(chan *api.Pod) + go func() { + defer close(out) + for pod := range in { + if ok, err := filter(pod); err != nil { + log.Errorf("pod failed selection: %v", err) + } else if ok { + out <- pod + } + } + }() + return out +} + +// List reads every pod from the pods chan and returns them all in an api.PodList +func List(pods <-chan *api.Pod) *api.PodList { + list := &api.PodList{} + for p := range pods { + list.Items = append(list.Items, *p) + } + return list +} diff --git a/contrib/mesos/pkg/podutil/gzip.go b/contrib/mesos/pkg/podutil/gzip.go new file mode 100644 index 00000000000..751f5d5e78c --- /dev/null +++ b/contrib/mesos/pkg/podutil/gzip.go @@ -0,0 +1,81 @@ +/* +Copyright 2015 The Kubernetes Authors All rights reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package podutil + +import ( + "bytes" + "compress/gzip" + "fmt" + "io/ioutil" + + "k8s.io/kubernetes/pkg/api" + "k8s.io/kubernetes/pkg/api/v1" +) + +func Gzip(pods <-chan *api.Pod) ([]byte, error) { + return gzipList(List(pods)) +} + +func gzipList(list *api.PodList) ([]byte, error) { + raw, err := v1.Codec.Encode(list) + if err != nil { + return nil, err + } + + zipped := &bytes.Buffer{} + zw := gzip.NewWriter(zipped) + _, err = bytes.NewBuffer(raw).WriteTo(zw) + if err != nil { + return nil, err + } + + err = zw.Close() + if err != nil { + return nil, err + } + + return zipped.Bytes(), nil +} + +func Gunzip(gzipped []byte) <-chan *api.Pod { + return Stream(gunzipList(gzipped)) +} + +func gunzipList(gzipped []byte) (*api.PodList, error) { + zr, err := gzip.NewReader(bytes.NewReader(gzipped)) + if err != nil { + return nil, err + } + defer zr.Close() + + raw, err := ioutil.ReadAll(zr) + if err != nil { + return nil, err + } + + obj, err := api.Scheme.Decode(raw) + if err != nil { + return nil, err + } + + podlist, ok := obj.(*api.PodList) + if !ok { + return nil, fmt.Errorf("expected *api.PodList instead of %T", obj) + } + + return podlist, nil +} diff --git a/contrib/mesos/pkg/podutil/gzip_test.go b/contrib/mesos/pkg/podutil/gzip_test.go new file mode 100644 index 00000000000..9affd8e54c4 --- /dev/null +++ b/contrib/mesos/pkg/podutil/gzip_test.go @@ -0,0 +1,69 @@ +/* +Copyright 2015 The Kubernetes Authors All rights reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package podutil + +import ( + "testing" + + "github.com/stretchr/testify/assert" + "k8s.io/kubernetes/pkg/api" + "k8s.io/kubernetes/pkg/api/v1" +) + +func TestGzipList(t *testing.T) { + // pod spec defaults are written during deserialization, this is what we + // expect them to be + period := int64(v1.DefaultTerminationGracePeriodSeconds) + defaultSpec := api.PodSpec{ + DNSPolicy: api.DNSClusterFirst, + RestartPolicy: api.RestartPolicyAlways, + TerminationGracePeriodSeconds: &period, + SecurityContext: new(api.PodSecurityContext), + } + list := &api.PodList{ + Items: []api.Pod{ + { + ObjectMeta: api.ObjectMeta{ + Name: "foo", + Namespace: "bar", + }, + }, + { + ObjectMeta: api.ObjectMeta{ + Name: "qax", + Namespace: "lkj", + }, + }, + }, + } + + amap := map[string]string{ + "crazy": "horse", + } + annotator := Annotator(amap) + raw, err := Gzip(annotator.Do(Stream(list, nil))) + assert.NoError(t, err) + + list2, err := gunzipList(raw) + assert.NoError(t, err) + + list.Items[0].Spec = defaultSpec + list.Items[0].Annotations = amap + list.Items[1].Spec = defaultSpec + list.Items[1].Annotations = amap + assert.True(t, api.Semantic.DeepEqual(*list, *list2), "expected %+v instead of %+v", *list, *list2) +} diff --git a/contrib/mesos/pkg/podutil/io.go b/contrib/mesos/pkg/podutil/io.go new file mode 100644 index 00000000000..e21c22f7ab7 --- /dev/null +++ b/contrib/mesos/pkg/podutil/io.go @@ -0,0 +1,123 @@ +/* +Copyright 2015 The Kubernetes Authors All rights reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package podutil + +import ( + "fmt" + "io/ioutil" + "os" + "path/filepath" + + log "github.com/golang/glog" + "k8s.io/kubernetes/contrib/mesos/pkg/scheduler/meta" + "k8s.io/kubernetes/pkg/api" + "k8s.io/kubernetes/pkg/api/v1" + "k8s.io/kubernetes/pkg/api/validation" + utilyaml "k8s.io/kubernetes/pkg/util/yaml" +) + +func WriteToDir(pods <-chan *api.Pod, destDir string) error { + err := os.MkdirAll(destDir, 0660) + if err != nil { + return err + } + for p := range pods { + filename, ok := p.Annotations[meta.StaticPodFilenameKey] + if !ok { + log.Warningf("skipping static pod %s/%s that had no filename", p.Namespace, p.Name) + continue + } + raw, err := v1.Codec.Encode(p) + if err != nil { + log.Errorf("failed to encode static pod as v1 object: %v", err) + continue + } + destfile := filepath.Join(destDir, filename) + err = ioutil.WriteFile(destfile, raw, 0660) + if err != nil { + log.Errorf("failed to write static pod file %q: %v", destfile, err) + } + log.V(1).Infof("wrote static pod %s/%s to %s", p.Namespace, p.Name, destfile) + } + return nil +} + +func ReadFromDir(dirpath string) (<-chan *api.Pod, <-chan error) { + pods := make(chan *api.Pod) + errors := make(chan error) + go func() { + defer close(pods) + defer close(errors) + files, err := ioutil.ReadDir(dirpath) + if err != nil { + errors <- fmt.Errorf("error scanning static pods directory: %q: %v", dirpath, err) + return + } + for _, f := range files { + if f.IsDir() || f.Size() == 0 { + continue + } + filename := filepath.Join(dirpath, f.Name()) + log.V(1).Infof("reading static pod conf from file %q", filename) + + data, err := ioutil.ReadFile(filename) + if err != nil { + errors <- fmt.Errorf("failed to read static pod file: %q: %v", filename, err) + continue + } + + parsed, pod, err := tryDecodeSinglePod(data) + if !parsed { + if err != nil { + errors <- fmt.Errorf("error parsing static pod file %q: %v", filename, err) + } + continue + } + if err != nil { + errors <- fmt.Errorf("error validating static pod file %q: %v", filename, err) + continue + } + Annotate(&pod.ObjectMeta, map[string]string{meta.StaticPodFilenameKey: f.Name()}) + pods <- pod + } + }() + return pods, errors +} + +// tryDecodeSinglePod was copied from pkg/kubelet/config/common.go v1.0.5 +func tryDecodeSinglePod(data []byte) (parsed bool, pod *api.Pod, err error) { + // JSON is valid YAML, so this should work for everything. + json, err := utilyaml.ToJSON(data) + if err != nil { + return false, nil, err + } + obj, err := api.Scheme.Decode(json) + if err != nil { + return false, pod, err + } + // Check whether the object could be converted to single pod. + if _, ok := obj.(*api.Pod); !ok { + err = fmt.Errorf("invalid pod: %+v", obj) + return false, pod, err + } + newPod := obj.(*api.Pod) + if errs := validation.ValidatePod(newPod); len(errs) > 0 { + err = fmt.Errorf("invalid pod: %v", errs) + return true, pod, err + } + return true, newPod, nil +} diff --git a/contrib/mesos/pkg/scheduler/components/framework/framework.go b/contrib/mesos/pkg/scheduler/components/framework/framework.go index 575441f9f7a..fec79ea0eb4 100644 --- a/contrib/mesos/pkg/scheduler/components/framework/framework.go +++ b/contrib/mesos/pkg/scheduler/components/framework/framework.go @@ -547,7 +547,6 @@ func (k *framework) SlaveLost(driver bindings.SchedulerDriver, slaveId *mesos.Sl // ExecutorLost is called when some executor is lost. func (k *framework) ExecutorLost(driver bindings.SchedulerDriver, executorId *mesos.ExecutorID, slaveId *mesos.SlaveID, status int) { log.Infof("Executor %v of slave %v is lost, status: %v\n", executorId, slaveId, status) - // TODO(yifan): Restart any unfinished tasks of the executor. } // Error is called when there is an unrecoverable error in the scheduler or scheduler driver. diff --git a/contrib/mesos/pkg/scheduler/meta/annotations.go b/contrib/mesos/pkg/scheduler/meta/annotations.go index 361b8eca87b..c7d61886626 100644 --- a/contrib/mesos/pkg/scheduler/meta/annotations.go +++ b/contrib/mesos/pkg/scheduler/meta/annotations.go @@ -30,4 +30,5 @@ const ( PortNameMappingKeyPrefix = "k8s.mesosphere.io/portName_" PortNameMappingKeyFormat = PortNameMappingKeyPrefix + "%s_%s" ContainerPortKeyFormat = "k8s.mesosphere.io/containerPort_%s_%s_%d" + StaticPodFilenameKey = "k8s.mesosphere.io/staticPodFilename" ) diff --git a/contrib/mesos/pkg/scheduler/scheduler.go b/contrib/mesos/pkg/scheduler/scheduler.go index 242a829b7c6..eadc03edfdf 100644 --- a/contrib/mesos/pkg/scheduler/scheduler.go +++ b/contrib/mesos/pkg/scheduler/scheduler.go @@ -33,6 +33,5 @@ type Scheduler interface { Reconcile(t *podtask.T) KillTask(id string) error LaunchTask(t *podtask.T) error - Run(done <-chan struct{}) } diff --git a/contrib/mesos/pkg/scheduler/service/service.go b/contrib/mesos/pkg/scheduler/service/service.go index 244f7e6a91e..7691b11d7c0 100644 --- a/contrib/mesos/pkg/scheduler/service/service.go +++ b/contrib/mesos/pkg/scheduler/service/service.go @@ -18,7 +18,6 @@ package service import ( "bufio" - "encoding/json" "errors" "fmt" "io/ioutil" @@ -47,11 +46,11 @@ import ( "github.com/spf13/pflag" "golang.org/x/net/context" - "k8s.io/kubernetes/contrib/mesos/pkg/archive" "k8s.io/kubernetes/contrib/mesos/pkg/election" execcfg "k8s.io/kubernetes/contrib/mesos/pkg/executor/config" "k8s.io/kubernetes/contrib/mesos/pkg/hyperkube" minioncfg "k8s.io/kubernetes/contrib/mesos/pkg/minion/config" + "k8s.io/kubernetes/contrib/mesos/pkg/podutil" "k8s.io/kubernetes/contrib/mesos/pkg/profile" "k8s.io/kubernetes/contrib/mesos/pkg/runtime" "k8s.io/kubernetes/contrib/mesos/pkg/scheduler/components" @@ -75,6 +74,9 @@ import ( "k8s.io/kubernetes/pkg/master/ports" etcdstorage "k8s.io/kubernetes/pkg/storage/etcd" "k8s.io/kubernetes/pkg/tools" + + // lock to this API version, compilation will fail when this becomes unsupported + _ "k8s.io/kubernetes/pkg/api/v1" ) const ( @@ -431,46 +433,7 @@ func (s *SchedulerServer) prepareExecutorInfo(hks hyperkube.Interface) (*mesos.E } // Check for staticPods - var staticPodCPUs, staticPodMem float64 - if s.staticPodsConfigPath != "" { - bs, paths, err := archive.ZipDir(s.staticPodsConfigPath) - if err != nil { - return nil, nil, err - } - - // try to read pod files and sum resources - // TODO(sttts): don't terminate when static pods are broken, but skip them - // TODO(sttts): add a directory watch and tell running executors about updates - for _, podPath := range paths { - podJson, err := ioutil.ReadFile(podPath) - if err != nil { - return nil, nil, fmt.Errorf("error reading static pod spec: %v", err) - } - - pod := api.Pod{} - err = json.Unmarshal(podJson, &pod) - if err != nil { - return nil, nil, fmt.Errorf("error parsing static pod spec at %v: %v", podPath, err) - } - - _, cpu, _, err := mresource.LimitPodCPU(&pod, s.defaultContainerCPULimit) - if err != nil { - return nil, nil, fmt.Errorf("cannot derive cpu limit for static pod: %v", podPath) - } - _, mem, _, err := mresource.LimitPodMem(&pod, s.defaultContainerMemLimit) - if err != nil { - return nil, nil, fmt.Errorf("cannot derive memory limit for static pod: %v", podPath) - } - - log.V(2).Infof("reserving %.2f cpu shares and %.2f MB of memory to static pod %s", cpu, mem, pod.Name) - - staticPodCPUs += float64(cpu) - staticPodMem += float64(mem) - } - - // pass zipped pod spec to executor - execInfo.Data = bs - } + data, staticPodCPUs, staticPodMem := s.prepareStaticPods() execInfo.Resources = []*mesos.Resource{ mutil.NewScalarResource("cpus", float64(s.mesosExecutorCPUs)+staticPodCPUs), @@ -482,10 +445,43 @@ func (s *SchedulerServer) prepareExecutorInfo(hks hyperkube.Interface) (*mesos.E ehash := hashExecutorInfo(execInfo) eid := uid.New(ehash, execcfg.DefaultInfoID) execInfo.ExecutorId = &mesos.ExecutorID{Value: proto.String(eid.String())} + execInfo.Data = data return execInfo, eid, nil } +func (s *SchedulerServer) prepareStaticPods() (data []byte, staticPodCPUs, staticPodMem float64) { + // TODO(sttts): add a directory watch and tell running executors about updates + if s.staticPodsConfigPath == "" { + return + } + + entries, errCh := podutil.ReadFromDir(s.staticPodsConfigPath) + go func() { + // we just skip file system errors for now, do our best to gather + // as many static pod specs as we can. + for err := range errCh { + log.Errorln(err.Error()) + } + }() + + // validate cpu and memory limits, tracking the running totals in staticPod{CPUs,Mem} + validateResourceLimits := StaticPodValidator( + s.defaultContainerCPULimit, + s.defaultContainerMemLimit, + &staticPodCPUs, + &staticPodMem) + + zipped, err := podutil.Gzip(validateResourceLimits.Do(entries)) + if err != nil { + log.Errorf("failed to generate static pod data: %v", err) + staticPodCPUs, staticPodMem = 0, 0 + } else { + data = zipped + } + return +} + // TODO(jdef): hacked from kubelet/server/server.go // TODO(k8s): replace this with clientcmd func (s *SchedulerServer) createAPIServerClient() (*client.Client, error) { diff --git a/contrib/mesos/pkg/scheduler/service/service_test.go b/contrib/mesos/pkg/scheduler/service/service_test.go index 26eaa5a63d5..24c74e2adc4 100644 --- a/contrib/mesos/pkg/scheduler/service/service_test.go +++ b/contrib/mesos/pkg/scheduler/service/service_test.go @@ -1,5 +1,3 @@ -// +build unit_test - /* Copyright 2015 The Kubernetes Authors All rights reserved. @@ -19,15 +17,9 @@ limitations under the License. package service import ( - "archive/zip" - "bytes" - "io/ioutil" - "os" - "path/filepath" "testing" "time" - "k8s.io/kubernetes/contrib/mesos/pkg/archive" mresource "k8s.io/kubernetes/contrib/mesos/pkg/scheduler/resource" "github.com/stretchr/testify/assert" @@ -124,42 +116,3 @@ func Test_DefaultResourceLimits(t *testing.T) { assert.Equal(s.defaultContainerCPULimit, mresource.DefaultDefaultContainerCPULimit) assert.Equal(s.defaultContainerMemLimit, mresource.DefaultDefaultContainerMemLimit) } - -func Test_StaticPods(t *testing.T) { - assert := assert.New(t) - - // create static pods config files, spod1 on toplevel and spod2 in a directory "dir" - staticPodsConfigPath, err := ioutil.TempDir(os.TempDir(), "executor-k8sm-archive") - assert.NoError(err) - defer os.RemoveAll(staticPodsConfigPath) - - spod1, err := os.Create(filepath.Join(staticPodsConfigPath, "spod1.json")) - assert.NoError(err) - _, err = spod1.WriteString("content1") - assert.NoError(err) - - err = os.Mkdir(filepath.Join(staticPodsConfigPath, "dir"), 0755) - assert.NoError(err) - - spod2, err := os.Create(filepath.Join(staticPodsConfigPath, "dir", "spod2.json")) - assert.NoError(err) - _, err = spod2.WriteString("content2") - assert.NoError(err) - - // archive config files - data, paths, err := archive.ZipDir(staticPodsConfigPath) - assert.NoError(err) - assert.Equal(2, len(paths)) - - // unarchive config files - zr, err := zip.NewReader(bytes.NewReader(data), int64(len(data))) - assert.NoError(err) - fileNames := []string{} - for _, f := range zr.File { - if !f.FileInfo().IsDir() { - fileNames = append(fileNames, f.Name) - } - } - assert.Contains(fileNames, "spod1.json") - assert.Contains(fileNames, "dir/spod2.json") -} diff --git a/contrib/mesos/pkg/scheduler/service/validation.go b/contrib/mesos/pkg/scheduler/service/validation.go new file mode 100644 index 00000000000..4b58532ce6e --- /dev/null +++ b/contrib/mesos/pkg/scheduler/service/validation.go @@ -0,0 +1,49 @@ +/* +Copyright 2015 The Kubernetes Authors All rights reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package service + +import ( + log "github.com/golang/glog" + "k8s.io/kubernetes/contrib/mesos/pkg/podutil" + "k8s.io/kubernetes/contrib/mesos/pkg/scheduler/resource" + "k8s.io/kubernetes/pkg/api" +) + +// StaticPodValidator discards a pod if we can't calculate resource limits for it. +func StaticPodValidator( + defaultContainerCPULimit resource.CPUShares, + defaultContainerMemLimit resource.MegaBytes, + accumCPU, accumMem *float64, +) podutil.FilterFunc { + return podutil.FilterFunc(func(pod *api.Pod) (bool, error) { + _, cpu, _, err := resource.LimitPodCPU(pod, defaultContainerCPULimit) + if err != nil { + return false, err + } + + _, mem, _, err := resource.LimitPodMem(pod, defaultContainerMemLimit) + if err != nil { + return false, err + } + + log.V(2).Infof("reserving %.2f cpu shares and %.2f MB of memory to static pod %s/%s", cpu, mem, pod.Namespace, pod.Name) + + *accumCPU += float64(cpu) + *accumMem += float64(mem) + return true, nil + }) +} diff --git a/contrib/mesos/pkg/scheduler/service/validation_test.go b/contrib/mesos/pkg/scheduler/service/validation_test.go new file mode 100644 index 00000000000..744e5b421c5 --- /dev/null +++ b/contrib/mesos/pkg/scheduler/service/validation_test.go @@ -0,0 +1,161 @@ +/* +Copyright 2015 The Kubernetes Authors All rights reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package service_test + +import ( + "testing" + + "github.com/stretchr/testify/assert" + "k8s.io/kubernetes/contrib/mesos/pkg/podutil" + mresource "k8s.io/kubernetes/contrib/mesos/pkg/scheduler/resource" + "k8s.io/kubernetes/contrib/mesos/pkg/scheduler/service" + "k8s.io/kubernetes/pkg/api" + "k8s.io/kubernetes/pkg/api/resource" +) + +func TestStaticPodValidator(t *testing.T) { + // test within limits + tests := []struct { + // given + pods <-chan *api.Pod + // wants + podcount int + cputot float64 + memtot float64 + }{ + // test: valid, pod specifies limits for ALL containers + { + pods: pods(pod( + podName("foo", "bar"), + containers( + container(resourceLimits(10, 20)), // min is 32 + container(resourceLimits(30, 40)), + container(resourceLimits(50, 60)), + ), + )), + podcount: 1, + cputot: 90, + memtot: 132, + }, + // test: valid, multiple pods, specify limits for ALL containers + { + pods: pods( + pod( + podName("foo", "bar"), + containers( + container(resourceLimits(10, 20)), // min is 32 + container(resourceLimits(30, 40)), + container(resourceLimits(50, 60)), + ), + ), + pod( + podName("kjh", "jkk"), + containers( + container(resourceLimits(15, 25)), // min is 32 + container(resourceLimits(35, 45)), + container(resourceLimits(55, 65)), + ), + ), + ), + podcount: 2, + cputot: 195, + memtot: 274, + }, + // test: no limits on CT in first pod so it's rejected + { + pods: pods( + pod( + podName("foo", "bar"), + containers( + container(resourceLimits(10, 20)), // min is 32 + container(), // min is 0.01, 32 + container(resourceLimits(50, 60)), + ), + ), + pod( + podName("wza", "wer"), + containers( + container(resourceLimits(10, 20)), // min is 32 + container(resourceLimits(30, 40)), + container(resourceLimits(50, 60)), + ), + ), + ), + podcount: 2, + cputot: 60.01 + 90, + memtot: 124 + 132, + }, + } + for i, tc := range tests { + var cpu, mem float64 + f := service.StaticPodValidator(0, 0, &cpu, &mem) + list := podutil.List(f.Do(tc.pods)) + assert.Equal(t, tc.podcount, len(list.Items), "test case #%d: expected %d pods instead of %d", i, tc.podcount, len(list.Items)) + assert.EqualValues(t, tc.cputot, cpu, "test case #%d: expected %f total cpu instead of %f", i, tc.cputot, cpu) + assert.EqualValues(t, tc.memtot, mem, "test case #%d: expected %f total mem instead of %f", i, tc.memtot, mem) + } +} + +type podOpt func(*api.Pod) +type ctOpt func(*api.Container) + +func pods(pods ...*api.Pod) <-chan *api.Pod { + ch := make(chan *api.Pod, len(pods)) + for _, x := range pods { + ch <- x + } + close(ch) + return ch +} + +func pod(opts ...podOpt) *api.Pod { + p := &api.Pod{} + for _, x := range opts { + x(p) + } + return p +} + +func container(opts ...ctOpt) (c api.Container) { + for _, x := range opts { + x(&c) + } + return +} + +func containers(ct ...api.Container) podOpt { + return podOpt(func(p *api.Pod) { + p.Spec.Containers = ct + }) +} + +func resourceLimits(cpu mresource.CPUShares, mem mresource.MegaBytes) ctOpt { + return ctOpt(func(c *api.Container) { + if c.Resources.Limits == nil { + c.Resources.Limits = make(api.ResourceList) + } + c.Resources.Limits[api.ResourceCPU] = *resource.NewMilliQuantity(int64(float64(cpu)*1000.0), resource.DecimalSI) + c.Resources.Limits[api.ResourceMemory] = *resource.NewQuantity(int64(float64(mem)*1024.0*1024.0), resource.BinarySI) + }) +} + +func podName(ns, name string) podOpt { + return podOpt(func(p *api.Pod) { + p.Namespace = ns + p.Name = name + }) +} diff --git a/test/e2e/mesos.go b/test/e2e/mesos.go index eeb8e2ea6d6..f1a917fe7fe 100644 --- a/test/e2e/mesos.go +++ b/test/e2e/mesos.go @@ -17,12 +17,15 @@ limitations under the License. package e2e import ( + "fmt" + "k8s.io/kubernetes/pkg/api" + "k8s.io/kubernetes/pkg/fields" "k8s.io/kubernetes/pkg/labels" + "k8s.io/kubernetes/pkg/util" . "github.com/onsi/ginkgo" . "github.com/onsi/gomega" - "k8s.io/kubernetes/pkg/fields" ) var _ = Describe("Mesos", func() { @@ -50,4 +53,17 @@ var _ = Describe("Mesos", func() { } Expect(len(addr)).NotTo(Equal("")) }) + + It("starts static pods on every node in the mesos cluster", func() { + client := framework.Client + expectNoError(allNodesReady(client, util.ForeverTestTimeout), "all nodes ready") + + nodelist, err := client.Nodes().List(labels.Everything(), fields.Everything()) + expectNoError(err, "nodes fetched from apiserver") + + const ns = "static-pods" + numpods := len(nodelist.Items) + expectNoError(waitForPodsRunningReady(ns, numpods, util.ForeverTestTimeout), + fmt.Sprintf("number of static pods in namespace %s is %d", ns, numpods)) + }) })