rewrite of static pod json zipper

- add busybox static pod to mesos-docker cluster
- customize static pods with binding annotations
- code cleanup
- removed hacky podtask.And func; support minimal resources for static pods when resource accounting is disabled
- removed zip archive of static pods, changed to gzip of PodList json
- pod utilities moved to package podutil
- added e2e test
- merge watched mirror pods into the mesos pod config stream
This commit is contained in:
James DeFelice 2015-09-14 17:17:08 +02:00
parent 20a99af00e
commit 3d3577b9f3
23 changed files with 909 additions and 459 deletions

View File

@ -52,8 +52,9 @@ function deploy_ui {
"${kubectl}" create -f "${KUBE_ROOT}/cluster/addons/kube-ui/kube-ui-svc.yaml"
}
# create the kube-system namespace
# create the kube-system and static-pods namespaces
"${kubectl}" create -f "${KUBE_ROOT}/cluster/mesos/docker/kube-system-ns.yaml"
"${kubectl}" create -f "${KUBE_ROOT}/cluster/mesos/docker/static-pods-ns.yaml"
if [ "${ENABLE_CLUSTER_DNS}" == true ]; then
cluster::mesos::docker::run_in_temp_dir 'k8sm-dns' 'deploy_dns'

View File

@ -141,6 +141,7 @@ scheduler:
--cluster-domain=cluster.local
--mesos-executor-cpus=1.0
--mesos-sandbox-overlay=/opt/sandbox-overlay.tar.gz
--static-pods-config=/opt/static-pods
--v=4
--executor-logv=4
--profiling=true
@ -148,6 +149,8 @@ scheduler:
- etcd
- mesosmaster1
- apiserver
volumes:
- ./static-pod.json:/opt/static-pods/static-pod.json
keygen:
image: mesosphere/kubernetes-mesos-keygen
command:

View File

@ -0,0 +1,23 @@
{
"apiVersion": "v1",
"kind": "Pod",
"metadata": {
"name": "busybox",
"namespace": "static-pods"
},
"spec": {
"containers": [
{
"image": "busybox",
"command": [
"sh",
"-c",
"exec tail -f /dev/null"
],
"imagePullPolicy": "IfNotPresent",
"name": "busybox"
}
],
"restartPolicy": "Always"
}
}

View File

@ -0,0 +1,7 @@
---
apiVersion: v1
kind: Namespace
metadata:
name: static-pods
labels:
name: static-pods

View File

@ -1,137 +0,0 @@
/*
Copyright 2015 The Kubernetes Authors All rights reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package archive
import (
"archive/zip"
"bytes"
"fmt"
"io"
"os"
"path"
"path/filepath"
)
// ZipWalker returns a filepath.WalkFunc that adds every filesystem node
// to the given *zip.Writer.
func ZipWalker(zw *zip.Writer) filepath.WalkFunc {
var base string
return func(path string, info os.FileInfo, err error) error {
if base == "" {
base = path
}
header, err := zip.FileInfoHeader(info)
if err != nil {
return err
}
if header.Name, err = filepath.Rel(base, path); err != nil {
return err
} else if info.IsDir() {
header.Name = header.Name + string(filepath.Separator)
} else {
header.Method = zip.Deflate
}
w, err := zw.CreateHeader(header)
if err != nil {
return err
}
if info.IsDir() {
return nil
}
f, err := os.Open(path)
if err != nil {
return err
}
_, err = io.Copy(w, f)
f.Close()
return err
}
}
// Create a zip of all files in a directory recursively, return a byte array and
// the number of files archived.
func ZipDir(path string) ([]byte, []string, error) {
var buf bytes.Buffer
zw := zip.NewWriter(&buf)
zipWalker := ZipWalker(zw)
paths := []string{}
err := filepath.Walk(path, filepath.WalkFunc(func(path string, info os.FileInfo, err error) error {
if !info.IsDir() {
paths = append(paths, path)
}
return zipWalker(path, info, err)
}))
if err != nil {
return nil, nil, err
} else if err = zw.Close(); err != nil {
return nil, nil, err
}
return buf.Bytes(), paths, nil
}
// UnzipDir unzips all files from a given zip byte array into a given directory.
// The directory is created if it does not exist yet.
func UnzipDir(data []byte, destPath string) error {
// open zip
zr, err := zip.NewReader(bytes.NewReader(data), int64(len(data)))
if err != nil {
return fmt.Errorf("Unzip archive read error: %v", err)
}
for _, file := range zr.File {
// skip directories
if file.FileInfo().IsDir() {
continue
}
// open file
rc, err := file.Open()
defer rc.Close()
if err != nil {
return fmt.Errorf("Unzip file read error: %v", err)
}
// make sure the directory of the file exists, otherwise create
destPath := filepath.Clean(filepath.Join(destPath, file.Name))
destBasedir := path.Dir(destPath)
err = os.MkdirAll(destBasedir, 0755)
if err != nil {
return fmt.Errorf("Unzip mkdir error: %v", err)
}
// create file
f, err := os.Create(destPath)
if err != nil {
return fmt.Errorf("Unzip file creation error: %v", err)
}
defer f.Close()
// write file
if _, err := io.Copy(f, rc); err != nil {
return fmt.Errorf("Unzip file write error: %v", err)
}
}
return nil
}

View File

@ -1,66 +0,0 @@
/*
Copyright 2015 The Kubernetes Authors All rights reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package archive
import (
"archive/zip"
"bytes"
"io/ioutil"
"os"
"path/filepath"
"testing"
)
func TestZipWalker(t *testing.T) {
dir, err := ioutil.TempDir(os.TempDir(), "")
if err != nil {
t.Fatal(err)
}
tree := map[string]string{"a/b/c": "12345", "a/b/d": "54321", "a/e": "00000"}
for path, content := range tree {
path = filepath.Join(dir, path)
if err := os.MkdirAll(filepath.Dir(path), os.ModeTemporary|0700); err != nil {
t.Fatal(err)
} else if err = ioutil.WriteFile(path, []byte(content), 0700); err != nil {
t.Fatal(err)
}
}
var buf bytes.Buffer
zw := zip.NewWriter(&buf)
if err := filepath.Walk(dir, ZipWalker(zw)); err != nil {
t.Fatal(err)
} else if err = zw.Close(); err != nil {
t.Fatal(err)
}
zr, err := zip.NewReader(bytes.NewReader(buf.Bytes()), int64(buf.Len()))
if err != nil {
t.Fatal(err)
}
for _, file := range zr.File {
if rc, err := file.Open(); err != nil {
t.Fatal(err)
} else if got, err := ioutil.ReadAll(rc); err != nil {
t.Error(err)
} else if want := []byte(tree[file.Name]); !bytes.Equal(got, want) {
t.Errorf("%s\ngot: %s\nwant: %s", file.Name, got, want)
}
}
}

View File

@ -30,9 +30,9 @@ import (
bindings "github.com/mesos/mesos-go/executor"
mesos "github.com/mesos/mesos-go/mesosproto"
mutil "github.com/mesos/mesos-go/mesosutil"
"k8s.io/kubernetes/contrib/mesos/pkg/archive"
"k8s.io/kubernetes/contrib/mesos/pkg/executor/messages"
"k8s.io/kubernetes/contrib/mesos/pkg/node"
"k8s.io/kubernetes/contrib/mesos/pkg/podutil"
"k8s.io/kubernetes/contrib/mesos/pkg/scheduler/meta"
"k8s.io/kubernetes/pkg/api"
unversionedapi "k8s.io/kubernetes/pkg/api/unversioned"
@ -205,11 +205,19 @@ func (k *Executor) isDone() bool {
}
}
// sendPodUpdate assumes that caller is holding state lock; returns true when update is sent otherwise false
func (k *Executor) sendPodUpdate(u *kubetypes.PodUpdate) bool {
// sendPodsSnapshot assumes that caller is holding state lock; returns true when update is sent otherwise false
func (k *Executor) sendPodsSnapshot() bool {
if k.isDone() {
return false
}
snapshot := make([]*api.Pod, 0, len(k.pods))
for _, v := range k.pods {
snapshot = append(snapshot, v)
}
u := &kubetypes.PodUpdate{
Op: kubetypes.SET,
Pods: snapshot,
}
k.updateChan <- *u
return true
}
@ -227,7 +235,10 @@ func (k *Executor) Registered(driver bindings.ExecutorDriver,
}
if executorInfo != nil && executorInfo.Data != nil {
k.initializeStaticPodsSource(executorInfo.Data)
err := k.initializeStaticPodsSource(slaveInfo.GetHostname(), executorInfo.Data)
if err != nil {
log.Errorf("failed to initialize static pod configuration: %v", err)
}
}
if slaveInfo != nil {
@ -240,10 +251,7 @@ func (k *Executor) Registered(driver bindings.ExecutorDriver,
// emit an empty update to allow the mesos "source" to be marked as seen
k.lock.Lock()
defer k.lock.Unlock()
k.sendPodUpdate(&kubetypes.PodUpdate{
Pods: []*api.Pod{},
Op: kubetypes.SET,
})
k.sendPodsSnapshot()
if slaveInfo != nil && k.nodeInfos != nil {
k.nodeInfos <- nodeInfo(slaveInfo, executorInfo) // leave it behind the upper lock to avoid panics
@ -280,13 +288,15 @@ func (k *Executor) Reregistered(driver bindings.ExecutorDriver, slaveInfo *mesos
}
// initializeStaticPodsSource unzips the data slice into the static-pods directory
func (k *Executor) initializeStaticPodsSource(data []byte) {
func (k *Executor) initializeStaticPodsSource(hostname string, data []byte) error {
log.V(2).Infof("extracting static pods config to %s", k.staticPodsConfigPath)
err := archive.UnzipDir(data, k.staticPodsConfigPath)
if err != nil {
log.Errorf("Failed to extract static pod config: %v", err)
return
}
// annotate the pod with BindingHostKey so that the scheduler will ignore the pod
// once it appears in the pod registry. the stock kubelet sets the pod host in order
// to accomplish the same; we do this because the k8sm scheduler works differently.
annotator := podutil.Annotator(map[string]string{
meta.BindingHostKey: hostname,
})
return podutil.WriteToDir(annotator.Do(podutil.Gunzip(data)), k.staticPodsConfigPath)
}
// Disconnected is called when the executor is disconnected from the slave.
@ -390,10 +400,7 @@ func (k *Executor) handleChangedApiserverPod(pod *api.Pod) {
oldPod.DeletionTimestamp = pod.DeletionTimestamp
oldPod.DeletionGracePeriodSeconds = pod.DeletionGracePeriodSeconds
k.sendPodUpdate(&kubetypes.PodUpdate{
Op: kubetypes.UPDATE,
Pods: []*api.Pod{oldPod},
})
k.sendPodsSnapshot()
}
}
}
@ -557,17 +564,14 @@ func (k *Executor) launchTask(driver bindings.ExecutorDriver, taskId string, pod
//TODO(jdef) check for duplicate pod name, if found send TASK_ERROR
// send the new pod to the kubelet which will spin it up
ok := k.sendPodUpdate(&kubetypes.PodUpdate{
Op: kubetypes.ADD,
Pods: []*api.Pod{pod},
})
if !ok {
return // executor is terminating, cancel launch
}
// mark task as sent by setting the podName and register the sent pod
task.podName = podFullName
k.pods[podFullName] = pod
ok := k.sendPodsSnapshot()
if !ok {
task.podName = ""
delete(k.pods, podFullName)
return // executor is terminating, cancel launch
}
// From here on, we need to delete containers associated with the task upon
// it going into a terminal state.
@ -752,7 +756,7 @@ func (k *Executor) removePodTask(driver bindings.ExecutorDriver, tid, reason str
k.resetSuicideWatch(driver)
pid := task.podName
pod, found := k.pods[pid]
_, found := k.pods[pid]
if !found {
log.Warningf("Cannot remove unknown pod %v for task %v", pid, tid)
} else {
@ -760,10 +764,7 @@ func (k *Executor) removePodTask(driver bindings.ExecutorDriver, tid, reason str
delete(k.pods, pid)
// tell the kubelet to remove the pod
k.sendPodUpdate(&kubetypes.PodUpdate{
Op: kubetypes.REMOVE,
Pods: []*api.Pod{pod},
})
k.sendPodsSnapshot()
}
// TODO(jdef): ensure that the update propagates, perhaps return a signal chan?
k.sendStatus(driver, newStatus(mutil.NewTaskID(tid), state, reason))

View File

@ -17,14 +17,13 @@ limitations under the License.
package executor
import (
"archive/zip"
"bytes"
"fmt"
"io/ioutil"
"net"
"net/http"
"net/http/httptest"
"os"
"path/filepath"
"reflect"
"sync"
"sync/atomic"
@ -33,6 +32,7 @@ import (
assertext "k8s.io/kubernetes/contrib/mesos/pkg/assert"
"k8s.io/kubernetes/contrib/mesos/pkg/executor/messages"
"k8s.io/kubernetes/contrib/mesos/pkg/podutil"
kmruntime "k8s.io/kubernetes/contrib/mesos/pkg/runtime"
"k8s.io/kubernetes/contrib/mesos/pkg/scheduler/podtask"
"k8s.io/kubernetes/pkg/api"
@ -41,7 +41,6 @@ import (
"k8s.io/kubernetes/pkg/client/cache"
client "k8s.io/kubernetes/pkg/client/unversioned"
"k8s.io/kubernetes/pkg/kubelet"
kconfig "k8s.io/kubernetes/pkg/kubelet/config"
"k8s.io/kubernetes/pkg/kubelet/dockertools"
kubetypes "k8s.io/kubernetes/pkg/kubelet/types"
"k8s.io/kubernetes/pkg/runtime"
@ -49,7 +48,6 @@ import (
"k8s.io/kubernetes/pkg/watch"
"github.com/mesos/mesos-go/mesosproto"
"github.com/mesos/mesos-go/mesosutil"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/mock"
)
@ -246,123 +244,79 @@ func TestExecutorLaunchAndKillTask(t *testing.T) {
// TestExecutorStaticPods test that the ExecutorInfo.data is parsed
// as a zip archive with pod definitions.
func TestExecutorStaticPods(t *testing.T) {
func TestExecutorInitializeStaticPodsSource(t *testing.T) {
// create some zip with static pod definition
var buf bytes.Buffer
zw := zip.NewWriter(&buf)
createStaticPodFile := func(fileName, id, name string) {
w, err := zw.Create(fileName)
assert.NoError(t, err)
spod := `{
"apiVersion": "v1",
"kind": "Pod",
"metadata": {
"name": "%v",
"labels": { "name": "foo", "cluster": "bar" }
},
"spec": {
"containers": [{
"name": "%v",
"image": "library/nginx",
"ports": [{ "containerPort": 80, "name": "http" }],
"livenessProbe": {
"enabled": true,
"type": "http",
"initialDelaySeconds": 30,
"httpGet": { "path": "/", "port": 80 }
givenPodsDir, err := ioutil.TempDir("/tmp", "executor-givenpods")
assert.NoError(t, err)
defer os.RemoveAll(givenPodsDir)
var wg sync.WaitGroup
reportErrors := func(errCh <-chan error) {
wg.Add(1)
go func() {
defer wg.Done()
for err := range errCh {
t.Error(err)
}
}]
}()
}
}`
_, err = w.Write([]byte(fmt.Sprintf(spod, id, name)))
createStaticPodFile := func(fileName, name string) {
spod := `{
"apiVersion": "v1",
"kind": "Pod",
"metadata": {
"name": "%v",
"namespace": "staticpods",
"labels": { "name": "foo", "cluster": "bar" }
},
"spec": {
"containers": [{
"name": "%v",
"image": "library/nginx",
"ports": [{ "containerPort": 80, "name": "http" }]
}]
}
}`
destfile := filepath.Join(givenPodsDir, fileName)
err = os.MkdirAll(filepath.Dir(destfile), 0770)
assert.NoError(t, err)
err = ioutil.WriteFile(destfile, []byte(fmt.Sprintf(spod, name, name)), 0660)
assert.NoError(t, err)
}
createStaticPodFile("spod.json", "spod-id-01", "spod-01")
createStaticPodFile("spod2.json", "spod-id-02", "spod-02")
createStaticPodFile("dir/spod.json", "spod-id-03", "spod-03") // same file name as first one to check for overwriting
expectedStaticPodsNum := 2 // subdirectories are ignored by FileSource, hence only 2
createStaticPodFile("spod.json", "spod-01")
createStaticPodFile("spod2.json", "spod-02")
createStaticPodFile("dir/spod.json", "spod-03") // same file name as first one to check for overwriting
staticpods, errs := podutil.ReadFromDir(givenPodsDir)
reportErrors(errs)
err := zw.Close()
gzipped, err := podutil.Gzip(staticpods)
assert.NoError(t, err)
// create fake apiserver
testApiServer := NewTestServer(t, api.NamespaceDefault, nil)
defer testApiServer.server.Close()
expectedStaticPodsNum := 2 // subdirectories are ignored by FileSource, hence only 2
// temporary directory which is normally located in the executor sandbox
staticPodsConfigPath, err := ioutil.TempDir("/tmp", "executor-k8sm-archive")
assert.NoError(t, err)
defer os.RemoveAll(staticPodsConfigPath)
mockDriver := &MockExecutorDriver{}
config := Config{
Docker: dockertools.ConnectToDockerOrDie("fake://"),
Updates: make(chan kubetypes.PodUpdate, 1), // allow kube-executor source to proceed past init
NodeInfos: make(chan NodeInfo, 1),
APIClient: client.NewOrDie(&client.Config{
Host: testApiServer.server.URL,
Version: testapi.Default.Version(),
}),
PodStatusFunc: func(pod *api.Pod) (*api.PodStatus, error) {
return &api.PodStatus{
ContainerStatuses: []api.ContainerStatus{
{
Name: "foo",
State: api.ContainerState{
Running: &api.ContainerStateRunning{},
},
},
},
Phase: api.PodRunning,
}, nil
},
StaticPodsConfigPath: staticPodsConfigPath,
PodLW: &NewMockPodsListWatch(api.PodList{}).ListWatch,
executor := &Executor{
staticPodsConfigPath: staticPodsConfigPath,
}
executor := New(config)
// register static pod source
// extract the pods into staticPodsConfigPath
hostname := "h1"
fileSourceUpdates := make(chan interface{}, 1024)
kconfig.NewSourceFile(staticPodsConfigPath, hostname, 1*time.Second, fileSourceUpdates)
err = executor.initializeStaticPodsSource(hostname, gzipped)
assert.NoError(t, err)
// create ExecutorInfo with static pod zip in data field
executorInfo := mesosutil.NewExecutorInfo(
mesosutil.NewExecutorID("ex1"),
mesosutil.NewCommandInfo("k8sm-executor"),
)
executorInfo.Data = buf.Bytes()
actualpods, errs := podutil.ReadFromDir(staticPodsConfigPath)
reportErrors(errs)
// start the executor with the static pod data
executor.Init(mockDriver)
executor.Registered(mockDriver, executorInfo, nil, nil)
// wait for static pod to start
seenPods := map[string]struct{}{}
timeout := time.After(util.ForeverTestTimeout)
defer mockDriver.AssertExpectations(t)
for {
// filter by PodUpdate type
select {
case <-timeout:
t.Fatalf("Executor should send pod updates for %v pods, only saw %v", expectedStaticPodsNum, len(seenPods))
case update, ok := <-fileSourceUpdates:
if !ok {
return
}
podUpdate, ok := update.(kubetypes.PodUpdate)
if !ok {
continue
}
for _, pod := range podUpdate.Pods {
seenPods[pod.Name] = struct{}{}
}
if len(seenPods) == expectedStaticPodsNum {
return
}
}
}
list := podutil.List(actualpods)
assert.NotNil(t, list)
assert.Equal(t, expectedStaticPodsNum, len(list.Items))
wg.Wait()
}
// TestExecutorFrameworkMessage ensures that the executor is able to

View File

@ -0,0 +1,128 @@
/*
Copyright 2015 The Kubernetes Authors All rights reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package service
import (
"fmt"
log "github.com/golang/glog"
"k8s.io/kubernetes/pkg/api"
"k8s.io/kubernetes/pkg/client/cache"
kubetypes "k8s.io/kubernetes/pkg/kubelet/types"
)
const (
// if we don't use this source then the kubelet will do funny, mirror things. we alias
// this here for convenience. see the docs for sourceMesos for additional explanation.
// @see ConfigSourceAnnotationKey
mesosSource = kubetypes.ApiserverSource
)
// sourceMesos merges pods from mesos, and mirror pods from the apiserver. why?
// (a) can't have two sources with the same name;
// (b) all sources, other than ApiserverSource are considered static/mirror
// sources, and;
// (c) kubelet wants to see mirror pods reflected in a non-static source.
//
// Mesos pods must appear to come from apiserver due to (b), while reflected
// static pods (mirror pods) must appear to come from apiserver due to (c).
//
// The only option I could think of was creating a source that merges the pod
// streams. I don't like it. But I could think of anything else, other than
// starting to hack up the kubelet's understanding of mirror/static pod
// sources (ouch!)
type sourceMesos struct {
sourceFinished chan struct{} // sourceFinished closes when mergeAndForward exits
out chan<- interface{} // out is the sink for merged pod snapshots
mirrorPods chan []*api.Pod // mirrorPods communicates snapshots of the current set of mirror pods
execUpdates <-chan kubetypes.PodUpdate // execUpdates receives snapshots of the current set of mesos pods
}
// newSourceMesos creates a pod config source that merges pod updates from
// mesos (via execUpdates), and mirror pod updates from the apiserver (via
// podWatch) writing the merged update stream to the out chan. It is expected
// that execUpdates will only ever contain SET operations. The source takes
// ownership of the sourceFinished chan, closing it when the source terminates.
// Source termination happens when the execUpdates chan is closed and fully
// drained of updates.
func newSourceMesos(
sourceFinished chan struct{},
execUpdates <-chan kubetypes.PodUpdate,
out chan<- interface{},
podWatch *cache.ListWatch,
) {
source := &sourceMesos{
sourceFinished: sourceFinished,
mirrorPods: make(chan []*api.Pod),
execUpdates: execUpdates,
out: out,
}
// reflect changes from the watch into a chan, filtered to include only mirror pods (have an ConfigMirrorAnnotationKey attr)
cache.NewReflector(podWatch, &api.Pod{}, cache.NewUndeltaStore(source.send, cache.MetaNamespaceKeyFunc), 0).RunUntil(sourceFinished)
go source.mergeAndForward()
}
func (source *sourceMesos) send(objs []interface{}) {
var mirrors []*api.Pod
for _, o := range objs {
p := o.(*api.Pod)
if _, ok := p.Annotations[kubetypes.ConfigMirrorAnnotationKey]; ok {
mirrors = append(mirrors, p)
}
}
select {
case <-source.sourceFinished:
case source.mirrorPods <- mirrors:
}
}
func (source *sourceMesos) mergeAndForward() {
// execUpdates will be closed by the executor on shutdown
defer close(source.sourceFinished)
var (
mirrors = []*api.Pod{}
pods = []*api.Pod{}
)
eventLoop:
for {
select {
case m := <-source.mirrorPods:
mirrors = m[:]
u := kubetypes.PodUpdate{
Op: kubetypes.SET,
Pods: append(m, pods...),
Source: mesosSource,
}
log.V(3).Infof("mirror update, sending snapshot of size %d", len(u.Pods))
source.out <- u
case u, ok := <-source.execUpdates:
if !ok {
break eventLoop
}
if u.Op != kubetypes.SET {
panic(fmt.Sprintf("unexpected Op type: %v", u.Op))
}
pods = u.Pods[:]
u.Pods = append(u.Pods, mirrors...)
u.Source = mesosSource
log.V(3).Infof("pods update, sending snapshot of size %d", len(u.Pods))
source.out <- u
}
}
log.V(2).Infoln("mesos pod source terminating normally")
}

View File

@ -42,12 +42,6 @@ import (
kubetypes "k8s.io/kubernetes/pkg/kubelet/types"
)
const (
// if we don't use this source then the kubelet will do funny, mirror things.
// @see ConfigSourceAnnotationKey
MESOS_CFG_SOURCE = kubetypes.ApiserverSource
)
type KubeletExecutorServer struct {
*app.KubeletServer
SuicideTimeout time.Duration
@ -79,8 +73,14 @@ func (s *KubeletExecutorServer) AddFlags(fs *pflag.FlagSet) {
fs.DurationVar(&s.LaunchGracePeriod, "mesos-launch-grace-period", s.LaunchGracePeriod, "Launch grace period after which launching tasks will be cancelled. Zero disables launch cancellation.")
}
func (s *KubeletExecutorServer) runExecutor(execUpdates chan<- kubetypes.PodUpdate, nodeInfos chan<- executor.NodeInfo, kubeletFinished <-chan struct{},
staticPodsConfigPath string, apiclient *client.Client) error {
func (s *KubeletExecutorServer) runExecutor(
execUpdates chan<- kubetypes.PodUpdate,
nodeInfos chan<- executor.NodeInfo,
kubeletFinished <-chan struct{},
staticPodsConfigPath string,
apiclient *client.Client,
podLW *cache.ListWatch,
) error {
exec := executor.New(executor.Config{
Updates: execUpdates,
APIClient: apiclient,
@ -111,10 +111,8 @@ func (s *KubeletExecutorServer) runExecutor(execUpdates chan<- kubetypes.PodUpda
return status, nil
},
StaticPodsConfigPath: staticPodsConfigPath,
PodLW: cache.NewListWatchFromClient(apiclient, "pods", api.NamespaceAll,
fields.OneTermEqualSelector(client.PodHost, s.HostnameOverride),
),
NodeInfos: nodeInfos,
PodLW: podLW,
NodeInfos: nodeInfos,
})
// initialize driver and initialize the executor with it
@ -141,8 +139,14 @@ func (s *KubeletExecutorServer) runExecutor(execUpdates chan<- kubetypes.PodUpda
return nil
}
func (s *KubeletExecutorServer) runKubelet(execUpdates <-chan kubetypes.PodUpdate, nodeInfos <-chan executor.NodeInfo, kubeletDone chan<- struct{},
staticPodsConfigPath string, apiclient *client.Client) error {
func (s *KubeletExecutorServer) runKubelet(
execUpdates <-chan kubetypes.PodUpdate,
nodeInfos <-chan executor.NodeInfo,
kubeletDone chan<- struct{},
staticPodsConfigPath string,
apiclient *client.Client,
podLW *cache.ListWatch,
) error {
kcfg, err := s.UnsecuredKubeletConfig()
if err == nil {
// apply Messo specific settings
@ -199,17 +203,8 @@ func (s *KubeletExecutorServer) runKubelet(execUpdates <-chan kubetypes.PodUpdat
}
}()
// create main pod source
updates := kcfg.PodConfig.Channel(MESOS_CFG_SOURCE)
go func() {
// execUpdates will be closed by the executor on shutdown
defer close(executorDone)
for u := range execUpdates {
u.Source = MESOS_CFG_SOURCE
updates <- u
}
}()
// create main pod source, it will close executorDone when the executor updates stop flowing
newSourceMesos(executorDone, execUpdates, kcfg.PodConfig.Channel(mesosSource), podLW)
// create static-pods directory file source
log.V(2).Infof("initializing static pods source factory, configured at path %q", staticPodsConfigPath)
@ -257,14 +252,18 @@ func (s *KubeletExecutorServer) Run(hks hyperkube.Interface, _ []string) error {
return fmt.Errorf("cannot create API client: %v", err)
}
pw := cache.NewListWatchFromClient(apiclient, "pods", api.NamespaceAll,
fields.OneTermEqualSelector(client.PodHost, s.HostnameOverride),
)
// start executor
err = s.runExecutor(execUpdates, nodeInfos, kubeletFinished, staticPodsConfigPath, apiclient)
err = s.runExecutor(execUpdates, nodeInfos, kubeletFinished, staticPodsConfigPath, apiclient, pw)
if err != nil {
return err
}
// start kubelet, blocking
return s.runKubelet(execUpdates, nodeInfos, kubeletFinished, staticPodsConfigPath, apiclient)
return s.runKubelet(execUpdates, nodeInfos, kubeletFinished, staticPodsConfigPath, apiclient, pw)
}
func defaultBindingAddress() string {

View File

@ -14,6 +14,6 @@ See the License for the specific language governing permissions and
limitations under the License.
*/
// Package archive provides utilities to archive and unarchive filesystem
// hierarchies.
package archive
// podutil contains utilities for reading, writing and filtering streams
// and lists of api.Pod objects.
package podutil

View File

@ -0,0 +1,90 @@
/*
Copyright 2015 The Kubernetes Authors All rights reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package podutil
import (
log "github.com/golang/glog"
"k8s.io/kubernetes/pkg/api"
)
type defaultFunc func(pod *api.Pod) error
type FilterFunc func(pod *api.Pod) (bool, error)
type Filters []FilterFunc
// Annotate safely copies annotation metadata from kv to meta.Annotations.
func Annotate(meta *api.ObjectMeta, kv map[string]string) {
//TODO(jdef) this func probably belong in an "apiutil" package, but we don't
//have much to put there right now so it can just live here.
if meta.Annotations == nil {
meta.Annotations = make(map[string]string)
}
for k, v := range kv {
meta.Annotations[k] = v
}
}
// Annotator returns a filter that copies annotations from map m into a pod
func Annotator(m map[string]string) FilterFunc {
return FilterFunc(func(pod *api.Pod) (bool, error) {
Annotate(&pod.ObjectMeta, m)
return true, nil
})
}
// Stream returns a chan of pods that yields each pod from the given list.
// No pods are yielded if err is non-nil.
func Stream(list *api.PodList, err error) <-chan *api.Pod {
out := make(chan *api.Pod)
go func() {
defer close(out)
if err != nil {
log.Errorf("failed to obtain pod list: %v", err)
return
}
for _, pod := range list.Items {
pod := pod
out <- &pod
}
}()
return out
}
func (filter FilterFunc) Do(in <-chan *api.Pod) <-chan *api.Pod {
out := make(chan *api.Pod)
go func() {
defer close(out)
for pod := range in {
if ok, err := filter(pod); err != nil {
log.Errorf("pod failed selection: %v", err)
} else if ok {
out <- pod
}
}
}()
return out
}
// List reads every pod from the pods chan and returns them all in an api.PodList
func List(pods <-chan *api.Pod) *api.PodList {
list := &api.PodList{}
for p := range pods {
list.Items = append(list.Items, *p)
}
return list
}

View File

@ -0,0 +1,81 @@
/*
Copyright 2015 The Kubernetes Authors All rights reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package podutil
import (
"bytes"
"compress/gzip"
"fmt"
"io/ioutil"
"k8s.io/kubernetes/pkg/api"
"k8s.io/kubernetes/pkg/api/v1"
)
func Gzip(pods <-chan *api.Pod) ([]byte, error) {
return gzipList(List(pods))
}
func gzipList(list *api.PodList) ([]byte, error) {
raw, err := v1.Codec.Encode(list)
if err != nil {
return nil, err
}
zipped := &bytes.Buffer{}
zw := gzip.NewWriter(zipped)
_, err = bytes.NewBuffer(raw).WriteTo(zw)
if err != nil {
return nil, err
}
err = zw.Close()
if err != nil {
return nil, err
}
return zipped.Bytes(), nil
}
func Gunzip(gzipped []byte) <-chan *api.Pod {
return Stream(gunzipList(gzipped))
}
func gunzipList(gzipped []byte) (*api.PodList, error) {
zr, err := gzip.NewReader(bytes.NewReader(gzipped))
if err != nil {
return nil, err
}
defer zr.Close()
raw, err := ioutil.ReadAll(zr)
if err != nil {
return nil, err
}
obj, err := api.Scheme.Decode(raw)
if err != nil {
return nil, err
}
podlist, ok := obj.(*api.PodList)
if !ok {
return nil, fmt.Errorf("expected *api.PodList instead of %T", obj)
}
return podlist, nil
}

View File

@ -0,0 +1,69 @@
/*
Copyright 2015 The Kubernetes Authors All rights reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package podutil
import (
"testing"
"github.com/stretchr/testify/assert"
"k8s.io/kubernetes/pkg/api"
"k8s.io/kubernetes/pkg/api/v1"
)
func TestGzipList(t *testing.T) {
// pod spec defaults are written during deserialization, this is what we
// expect them to be
period := int64(v1.DefaultTerminationGracePeriodSeconds)
defaultSpec := api.PodSpec{
DNSPolicy: api.DNSClusterFirst,
RestartPolicy: api.RestartPolicyAlways,
TerminationGracePeriodSeconds: &period,
SecurityContext: new(api.PodSecurityContext),
}
list := &api.PodList{
Items: []api.Pod{
{
ObjectMeta: api.ObjectMeta{
Name: "foo",
Namespace: "bar",
},
},
{
ObjectMeta: api.ObjectMeta{
Name: "qax",
Namespace: "lkj",
},
},
},
}
amap := map[string]string{
"crazy": "horse",
}
annotator := Annotator(amap)
raw, err := Gzip(annotator.Do(Stream(list, nil)))
assert.NoError(t, err)
list2, err := gunzipList(raw)
assert.NoError(t, err)
list.Items[0].Spec = defaultSpec
list.Items[0].Annotations = amap
list.Items[1].Spec = defaultSpec
list.Items[1].Annotations = amap
assert.True(t, api.Semantic.DeepEqual(*list, *list2), "expected %+v instead of %+v", *list, *list2)
}

View File

@ -0,0 +1,123 @@
/*
Copyright 2015 The Kubernetes Authors All rights reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package podutil
import (
"fmt"
"io/ioutil"
"os"
"path/filepath"
log "github.com/golang/glog"
"k8s.io/kubernetes/contrib/mesos/pkg/scheduler/meta"
"k8s.io/kubernetes/pkg/api"
"k8s.io/kubernetes/pkg/api/v1"
"k8s.io/kubernetes/pkg/api/validation"
utilyaml "k8s.io/kubernetes/pkg/util/yaml"
)
func WriteToDir(pods <-chan *api.Pod, destDir string) error {
err := os.MkdirAll(destDir, 0660)
if err != nil {
return err
}
for p := range pods {
filename, ok := p.Annotations[meta.StaticPodFilenameKey]
if !ok {
log.Warningf("skipping static pod %s/%s that had no filename", p.Namespace, p.Name)
continue
}
raw, err := v1.Codec.Encode(p)
if err != nil {
log.Errorf("failed to encode static pod as v1 object: %v", err)
continue
}
destfile := filepath.Join(destDir, filename)
err = ioutil.WriteFile(destfile, raw, 0660)
if err != nil {
log.Errorf("failed to write static pod file %q: %v", destfile, err)
}
log.V(1).Infof("wrote static pod %s/%s to %s", p.Namespace, p.Name, destfile)
}
return nil
}
func ReadFromDir(dirpath string) (<-chan *api.Pod, <-chan error) {
pods := make(chan *api.Pod)
errors := make(chan error)
go func() {
defer close(pods)
defer close(errors)
files, err := ioutil.ReadDir(dirpath)
if err != nil {
errors <- fmt.Errorf("error scanning static pods directory: %q: %v", dirpath, err)
return
}
for _, f := range files {
if f.IsDir() || f.Size() == 0 {
continue
}
filename := filepath.Join(dirpath, f.Name())
log.V(1).Infof("reading static pod conf from file %q", filename)
data, err := ioutil.ReadFile(filename)
if err != nil {
errors <- fmt.Errorf("failed to read static pod file: %q: %v", filename, err)
continue
}
parsed, pod, err := tryDecodeSinglePod(data)
if !parsed {
if err != nil {
errors <- fmt.Errorf("error parsing static pod file %q: %v", filename, err)
}
continue
}
if err != nil {
errors <- fmt.Errorf("error validating static pod file %q: %v", filename, err)
continue
}
Annotate(&pod.ObjectMeta, map[string]string{meta.StaticPodFilenameKey: f.Name()})
pods <- pod
}
}()
return pods, errors
}
// tryDecodeSinglePod was copied from pkg/kubelet/config/common.go v1.0.5
func tryDecodeSinglePod(data []byte) (parsed bool, pod *api.Pod, err error) {
// JSON is valid YAML, so this should work for everything.
json, err := utilyaml.ToJSON(data)
if err != nil {
return false, nil, err
}
obj, err := api.Scheme.Decode(json)
if err != nil {
return false, pod, err
}
// Check whether the object could be converted to single pod.
if _, ok := obj.(*api.Pod); !ok {
err = fmt.Errorf("invalid pod: %+v", obj)
return false, pod, err
}
newPod := obj.(*api.Pod)
if errs := validation.ValidatePod(newPod); len(errs) > 0 {
err = fmt.Errorf("invalid pod: %v", errs)
return true, pod, err
}
return true, newPod, nil
}

View File

@ -547,7 +547,6 @@ func (k *framework) SlaveLost(driver bindings.SchedulerDriver, slaveId *mesos.Sl
// ExecutorLost is called when some executor is lost.
func (k *framework) ExecutorLost(driver bindings.SchedulerDriver, executorId *mesos.ExecutorID, slaveId *mesos.SlaveID, status int) {
log.Infof("Executor %v of slave %v is lost, status: %v\n", executorId, slaveId, status)
// TODO(yifan): Restart any unfinished tasks of the executor.
}
// Error is called when there is an unrecoverable error in the scheduler or scheduler driver.

View File

@ -30,4 +30,5 @@ const (
PortNameMappingKeyPrefix = "k8s.mesosphere.io/portName_"
PortNameMappingKeyFormat = PortNameMappingKeyPrefix + "%s_%s"
ContainerPortKeyFormat = "k8s.mesosphere.io/containerPort_%s_%s_%d"
StaticPodFilenameKey = "k8s.mesosphere.io/staticPodFilename"
)

View File

@ -33,6 +33,5 @@ type Scheduler interface {
Reconcile(t *podtask.T)
KillTask(id string) error
LaunchTask(t *podtask.T) error
Run(done <-chan struct{})
}

View File

@ -18,7 +18,6 @@ package service
import (
"bufio"
"encoding/json"
"errors"
"fmt"
"io/ioutil"
@ -47,11 +46,11 @@ import (
"github.com/spf13/pflag"
"golang.org/x/net/context"
"k8s.io/kubernetes/contrib/mesos/pkg/archive"
"k8s.io/kubernetes/contrib/mesos/pkg/election"
execcfg "k8s.io/kubernetes/contrib/mesos/pkg/executor/config"
"k8s.io/kubernetes/contrib/mesos/pkg/hyperkube"
minioncfg "k8s.io/kubernetes/contrib/mesos/pkg/minion/config"
"k8s.io/kubernetes/contrib/mesos/pkg/podutil"
"k8s.io/kubernetes/contrib/mesos/pkg/profile"
"k8s.io/kubernetes/contrib/mesos/pkg/runtime"
"k8s.io/kubernetes/contrib/mesos/pkg/scheduler/components"
@ -75,6 +74,9 @@ import (
"k8s.io/kubernetes/pkg/master/ports"
etcdstorage "k8s.io/kubernetes/pkg/storage/etcd"
"k8s.io/kubernetes/pkg/tools"
// lock to this API version, compilation will fail when this becomes unsupported
_ "k8s.io/kubernetes/pkg/api/v1"
)
const (
@ -431,46 +433,7 @@ func (s *SchedulerServer) prepareExecutorInfo(hks hyperkube.Interface) (*mesos.E
}
// Check for staticPods
var staticPodCPUs, staticPodMem float64
if s.staticPodsConfigPath != "" {
bs, paths, err := archive.ZipDir(s.staticPodsConfigPath)
if err != nil {
return nil, nil, err
}
// try to read pod files and sum resources
// TODO(sttts): don't terminate when static pods are broken, but skip them
// TODO(sttts): add a directory watch and tell running executors about updates
for _, podPath := range paths {
podJson, err := ioutil.ReadFile(podPath)
if err != nil {
return nil, nil, fmt.Errorf("error reading static pod spec: %v", err)
}
pod := api.Pod{}
err = json.Unmarshal(podJson, &pod)
if err != nil {
return nil, nil, fmt.Errorf("error parsing static pod spec at %v: %v", podPath, err)
}
_, cpu, _, err := mresource.LimitPodCPU(&pod, s.defaultContainerCPULimit)
if err != nil {
return nil, nil, fmt.Errorf("cannot derive cpu limit for static pod: %v", podPath)
}
_, mem, _, err := mresource.LimitPodMem(&pod, s.defaultContainerMemLimit)
if err != nil {
return nil, nil, fmt.Errorf("cannot derive memory limit for static pod: %v", podPath)
}
log.V(2).Infof("reserving %.2f cpu shares and %.2f MB of memory to static pod %s", cpu, mem, pod.Name)
staticPodCPUs += float64(cpu)
staticPodMem += float64(mem)
}
// pass zipped pod spec to executor
execInfo.Data = bs
}
data, staticPodCPUs, staticPodMem := s.prepareStaticPods()
execInfo.Resources = []*mesos.Resource{
mutil.NewScalarResource("cpus", float64(s.mesosExecutorCPUs)+staticPodCPUs),
@ -482,10 +445,43 @@ func (s *SchedulerServer) prepareExecutorInfo(hks hyperkube.Interface) (*mesos.E
ehash := hashExecutorInfo(execInfo)
eid := uid.New(ehash, execcfg.DefaultInfoID)
execInfo.ExecutorId = &mesos.ExecutorID{Value: proto.String(eid.String())}
execInfo.Data = data
return execInfo, eid, nil
}
func (s *SchedulerServer) prepareStaticPods() (data []byte, staticPodCPUs, staticPodMem float64) {
// TODO(sttts): add a directory watch and tell running executors about updates
if s.staticPodsConfigPath == "" {
return
}
entries, errCh := podutil.ReadFromDir(s.staticPodsConfigPath)
go func() {
// we just skip file system errors for now, do our best to gather
// as many static pod specs as we can.
for err := range errCh {
log.Errorln(err.Error())
}
}()
// validate cpu and memory limits, tracking the running totals in staticPod{CPUs,Mem}
validateResourceLimits := StaticPodValidator(
s.defaultContainerCPULimit,
s.defaultContainerMemLimit,
&staticPodCPUs,
&staticPodMem)
zipped, err := podutil.Gzip(validateResourceLimits.Do(entries))
if err != nil {
log.Errorf("failed to generate static pod data: %v", err)
staticPodCPUs, staticPodMem = 0, 0
} else {
data = zipped
}
return
}
// TODO(jdef): hacked from kubelet/server/server.go
// TODO(k8s): replace this with clientcmd
func (s *SchedulerServer) createAPIServerClient() (*client.Client, error) {

View File

@ -1,5 +1,3 @@
// +build unit_test
/*
Copyright 2015 The Kubernetes Authors All rights reserved.
@ -19,15 +17,9 @@ limitations under the License.
package service
import (
"archive/zip"
"bytes"
"io/ioutil"
"os"
"path/filepath"
"testing"
"time"
"k8s.io/kubernetes/contrib/mesos/pkg/archive"
mresource "k8s.io/kubernetes/contrib/mesos/pkg/scheduler/resource"
"github.com/stretchr/testify/assert"
@ -124,42 +116,3 @@ func Test_DefaultResourceLimits(t *testing.T) {
assert.Equal(s.defaultContainerCPULimit, mresource.DefaultDefaultContainerCPULimit)
assert.Equal(s.defaultContainerMemLimit, mresource.DefaultDefaultContainerMemLimit)
}
func Test_StaticPods(t *testing.T) {
assert := assert.New(t)
// create static pods config files, spod1 on toplevel and spod2 in a directory "dir"
staticPodsConfigPath, err := ioutil.TempDir(os.TempDir(), "executor-k8sm-archive")
assert.NoError(err)
defer os.RemoveAll(staticPodsConfigPath)
spod1, err := os.Create(filepath.Join(staticPodsConfigPath, "spod1.json"))
assert.NoError(err)
_, err = spod1.WriteString("content1")
assert.NoError(err)
err = os.Mkdir(filepath.Join(staticPodsConfigPath, "dir"), 0755)
assert.NoError(err)
spod2, err := os.Create(filepath.Join(staticPodsConfigPath, "dir", "spod2.json"))
assert.NoError(err)
_, err = spod2.WriteString("content2")
assert.NoError(err)
// archive config files
data, paths, err := archive.ZipDir(staticPodsConfigPath)
assert.NoError(err)
assert.Equal(2, len(paths))
// unarchive config files
zr, err := zip.NewReader(bytes.NewReader(data), int64(len(data)))
assert.NoError(err)
fileNames := []string{}
for _, f := range zr.File {
if !f.FileInfo().IsDir() {
fileNames = append(fileNames, f.Name)
}
}
assert.Contains(fileNames, "spod1.json")
assert.Contains(fileNames, "dir/spod2.json")
}

View File

@ -0,0 +1,49 @@
/*
Copyright 2015 The Kubernetes Authors All rights reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package service
import (
log "github.com/golang/glog"
"k8s.io/kubernetes/contrib/mesos/pkg/podutil"
"k8s.io/kubernetes/contrib/mesos/pkg/scheduler/resource"
"k8s.io/kubernetes/pkg/api"
)
// StaticPodValidator discards a pod if we can't calculate resource limits for it.
func StaticPodValidator(
defaultContainerCPULimit resource.CPUShares,
defaultContainerMemLimit resource.MegaBytes,
accumCPU, accumMem *float64,
) podutil.FilterFunc {
return podutil.FilterFunc(func(pod *api.Pod) (bool, error) {
_, cpu, _, err := resource.LimitPodCPU(pod, defaultContainerCPULimit)
if err != nil {
return false, err
}
_, mem, _, err := resource.LimitPodMem(pod, defaultContainerMemLimit)
if err != nil {
return false, err
}
log.V(2).Infof("reserving %.2f cpu shares and %.2f MB of memory to static pod %s/%s", cpu, mem, pod.Namespace, pod.Name)
*accumCPU += float64(cpu)
*accumMem += float64(mem)
return true, nil
})
}

View File

@ -0,0 +1,161 @@
/*
Copyright 2015 The Kubernetes Authors All rights reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package service_test
import (
"testing"
"github.com/stretchr/testify/assert"
"k8s.io/kubernetes/contrib/mesos/pkg/podutil"
mresource "k8s.io/kubernetes/contrib/mesos/pkg/scheduler/resource"
"k8s.io/kubernetes/contrib/mesos/pkg/scheduler/service"
"k8s.io/kubernetes/pkg/api"
"k8s.io/kubernetes/pkg/api/resource"
)
func TestStaticPodValidator(t *testing.T) {
// test within limits
tests := []struct {
// given
pods <-chan *api.Pod
// wants
podcount int
cputot float64
memtot float64
}{
// test: valid, pod specifies limits for ALL containers
{
pods: pods(pod(
podName("foo", "bar"),
containers(
container(resourceLimits(10, 20)), // min is 32
container(resourceLimits(30, 40)),
container(resourceLimits(50, 60)),
),
)),
podcount: 1,
cputot: 90,
memtot: 132,
},
// test: valid, multiple pods, specify limits for ALL containers
{
pods: pods(
pod(
podName("foo", "bar"),
containers(
container(resourceLimits(10, 20)), // min is 32
container(resourceLimits(30, 40)),
container(resourceLimits(50, 60)),
),
),
pod(
podName("kjh", "jkk"),
containers(
container(resourceLimits(15, 25)), // min is 32
container(resourceLimits(35, 45)),
container(resourceLimits(55, 65)),
),
),
),
podcount: 2,
cputot: 195,
memtot: 274,
},
// test: no limits on CT in first pod so it's rejected
{
pods: pods(
pod(
podName("foo", "bar"),
containers(
container(resourceLimits(10, 20)), // min is 32
container(), // min is 0.01, 32
container(resourceLimits(50, 60)),
),
),
pod(
podName("wza", "wer"),
containers(
container(resourceLimits(10, 20)), // min is 32
container(resourceLimits(30, 40)),
container(resourceLimits(50, 60)),
),
),
),
podcount: 2,
cputot: 60.01 + 90,
memtot: 124 + 132,
},
}
for i, tc := range tests {
var cpu, mem float64
f := service.StaticPodValidator(0, 0, &cpu, &mem)
list := podutil.List(f.Do(tc.pods))
assert.Equal(t, tc.podcount, len(list.Items), "test case #%d: expected %d pods instead of %d", i, tc.podcount, len(list.Items))
assert.EqualValues(t, tc.cputot, cpu, "test case #%d: expected %f total cpu instead of %f", i, tc.cputot, cpu)
assert.EqualValues(t, tc.memtot, mem, "test case #%d: expected %f total mem instead of %f", i, tc.memtot, mem)
}
}
type podOpt func(*api.Pod)
type ctOpt func(*api.Container)
func pods(pods ...*api.Pod) <-chan *api.Pod {
ch := make(chan *api.Pod, len(pods))
for _, x := range pods {
ch <- x
}
close(ch)
return ch
}
func pod(opts ...podOpt) *api.Pod {
p := &api.Pod{}
for _, x := range opts {
x(p)
}
return p
}
func container(opts ...ctOpt) (c api.Container) {
for _, x := range opts {
x(&c)
}
return
}
func containers(ct ...api.Container) podOpt {
return podOpt(func(p *api.Pod) {
p.Spec.Containers = ct
})
}
func resourceLimits(cpu mresource.CPUShares, mem mresource.MegaBytes) ctOpt {
return ctOpt(func(c *api.Container) {
if c.Resources.Limits == nil {
c.Resources.Limits = make(api.ResourceList)
}
c.Resources.Limits[api.ResourceCPU] = *resource.NewMilliQuantity(int64(float64(cpu)*1000.0), resource.DecimalSI)
c.Resources.Limits[api.ResourceMemory] = *resource.NewQuantity(int64(float64(mem)*1024.0*1024.0), resource.BinarySI)
})
}
func podName(ns, name string) podOpt {
return podOpt(func(p *api.Pod) {
p.Namespace = ns
p.Name = name
})
}

View File

@ -17,12 +17,15 @@ limitations under the License.
package e2e
import (
"fmt"
"k8s.io/kubernetes/pkg/api"
"k8s.io/kubernetes/pkg/fields"
"k8s.io/kubernetes/pkg/labels"
"k8s.io/kubernetes/pkg/util"
. "github.com/onsi/ginkgo"
. "github.com/onsi/gomega"
"k8s.io/kubernetes/pkg/fields"
)
var _ = Describe("Mesos", func() {
@ -50,4 +53,17 @@ var _ = Describe("Mesos", func() {
}
Expect(len(addr)).NotTo(Equal(""))
})
It("starts static pods on every node in the mesos cluster", func() {
client := framework.Client
expectNoError(allNodesReady(client, util.ForeverTestTimeout), "all nodes ready")
nodelist, err := client.Nodes().List(labels.Everything(), fields.Everything())
expectNoError(err, "nodes fetched from apiserver")
const ns = "static-pods"
numpods := len(nodelist.Items)
expectNoError(waitForPodsRunningReady(ns, numpods, util.ForeverTestTimeout),
fmt.Sprintf("number of static pods in namespace %s is %d", ns, numpods))
})
})