Read BoundPods from etcd instead of ContainerManifestList

There are three values that uniquely identify a pod on a host -
the configuration source (etcd, file, http), the pod name, and the
pod namespace. This change ensures that configuration properly
makes those names unique by changing podFullName to contain both
name (currently ID in v1beta1, Name in v1beta3) and namespace.

The Kubelet does not properly handle information requests for
pods not in the default namespace at this time.
This commit is contained in:
Clayton Coleman
2014-10-08 15:56:02 -04:00
committed by Eric Paris
parent 332a03b085
commit 892942af8f
25 changed files with 801 additions and 439 deletions

View File

@@ -21,7 +21,9 @@ import (
"reflect"
"sync"
"github.com/GoogleCloudPlatform/kubernetes/pkg/api"
apierrs "github.com/GoogleCloudPlatform/kubernetes/pkg/api/errors"
"github.com/GoogleCloudPlatform/kubernetes/pkg/api/validation"
"github.com/GoogleCloudPlatform/kubernetes/pkg/kubelet"
"github.com/GoogleCloudPlatform/kubernetes/pkg/util"
"github.com/GoogleCloudPlatform/kubernetes/pkg/util/config"
@@ -89,7 +91,7 @@ func (c *PodConfig) Sync() {
type podStorage struct {
podLock sync.RWMutex
// map of source name to pod name to pod reference
pods map[string]map[string]*kubelet.Pod
pods map[string]map[string]*api.BoundPod
mode PodConfigNotificationMode
// ensures that updates are delivered in strict order
@@ -103,7 +105,7 @@ type podStorage struct {
// TODO: allow initialization of the current state of the store with snapshotted version.
func newPodStorage(updates chan<- kubelet.PodUpdate, mode PodConfigNotificationMode) *podStorage {
return &podStorage{
pods: make(map[string]map[string]*kubelet.Pod),
pods: make(map[string]map[string]*api.BoundPod),
mode: mode,
updates: updates,
}
@@ -136,12 +138,12 @@ func (s *podStorage) Merge(source string, change interface{}) error {
s.updates <- *updates
}
if len(deletes.Pods) > 0 || len(adds.Pods) > 0 {
s.updates <- kubelet.PodUpdate{s.MergedState().([]kubelet.Pod), kubelet.SET}
s.updates <- kubelet.PodUpdate{s.MergedState().([]api.BoundPod), kubelet.SET}
}
case PodConfigNotificationSnapshot:
if len(updates.Pods) > 0 || len(deletes.Pods) > 0 || len(adds.Pods) > 0 {
s.updates <- kubelet.PodUpdate{s.MergedState().([]kubelet.Pod), kubelet.SET}
s.updates <- kubelet.PodUpdate{s.MergedState().([]api.BoundPod), kubelet.SET}
}
default:
@@ -161,7 +163,7 @@ func (s *podStorage) merge(source string, change interface{}) (adds, updates, de
pods := s.pods[source]
if pods == nil {
pods = make(map[string]*kubelet.Pod)
pods = make(map[string]*api.BoundPod)
}
update := change.(kubelet.PodUpdate)
@@ -175,11 +177,11 @@ func (s *podStorage) merge(source string, change interface{}) (adds, updates, de
filtered := filterInvalidPods(update.Pods, source)
for _, ref := range filtered {
name := ref.Name
name := podUniqueName(ref)
if existing, found := pods[name]; found {
if !reflect.DeepEqual(existing.Manifest, ref.Manifest) {
if !reflect.DeepEqual(existing.Spec, ref.Spec) {
// this is an update
existing.Manifest = ref.Manifest
existing.Spec = ref.Spec
updates.Pods = append(updates.Pods, *existing)
continue
}
@@ -187,7 +189,10 @@ func (s *podStorage) merge(source string, change interface{}) (adds, updates, de
continue
}
// this is an add
ref.Namespace = source
if ref.Annotations == nil {
ref.Annotations = make(map[string]string)
}
ref.Annotations[kubelet.ConfigSourceAnnotationKey] = source
pods[name] = ref
adds.Pods = append(adds.Pods, *ref)
}
@@ -195,7 +200,7 @@ func (s *podStorage) merge(source string, change interface{}) (adds, updates, de
case kubelet.REMOVE:
glog.V(4).Infof("Removing a pod %v", update)
for _, value := range update.Pods {
name := value.Name
name := podUniqueName(&value)
if existing, found := pods[name]; found {
// this is a delete
delete(pods, name)
@@ -209,23 +214,26 @@ func (s *podStorage) merge(source string, change interface{}) (adds, updates, de
glog.V(4).Infof("Setting pods for source %s : %v", source, update)
// Clear the old map entries by just creating a new map
oldPods := pods
pods = make(map[string]*kubelet.Pod)
pods = make(map[string]*api.BoundPod)
filtered := filterInvalidPods(update.Pods, source)
for _, ref := range filtered {
name := ref.Name
name := podUniqueName(ref)
if existing, found := oldPods[name]; found {
pods[name] = existing
if !reflect.DeepEqual(existing.Manifest, ref.Manifest) {
if !reflect.DeepEqual(existing.Spec, ref.Spec) {
// this is an update
existing.Manifest = ref.Manifest
existing.Spec = ref.Spec
updates.Pods = append(updates.Pods, *existing)
continue
}
// this is a no-op
continue
}
ref.Namespace = source
if ref.Annotations == nil {
ref.Annotations = make(map[string]string)
}
ref.Annotations[kubelet.ConfigSourceAnnotationKey] = source
pods[name] = ref
adds.Pods = append(adds.Pods, *ref)
}
@@ -246,20 +254,21 @@ func (s *podStorage) merge(source string, change interface{}) (adds, updates, de
return adds, updates, deletes
}
func filterInvalidPods(pods []kubelet.Pod, source string) (filtered []*kubelet.Pod) {
func filterInvalidPods(pods []api.BoundPod, source string) (filtered []*api.BoundPod) {
names := util.StringSet{}
for i := range pods {
var errors []error
if names.Has(pods[i].Name) {
errors = append(errors, apierrs.NewFieldDuplicate("name", pods[i].Name))
name := podUniqueName(&pods[i])
if names.Has(name) {
errors = append(errors, apierrs.NewFieldDuplicate("name", pods[i].ID))
} else {
names.Insert(pods[i].Name)
names.Insert(name)
}
if errs := kubelet.ValidatePod(&pods[i]); len(errs) != 0 {
if errs := validation.ValidateBoundPod(&pods[i]); len(errs) != 0 {
errors = append(errors, errs...)
}
if len(errors) > 0 {
glog.Warningf("Pod %d (%s) from %s failed validation, ignoring: %v", i+1, pods[i].Name, source, errors)
glog.Warningf("Pod %d (%s) from %s failed validation, ignoring: %v", i+1, pods[i].ID, source, errors)
continue
}
filtered = append(filtered, &pods[i])
@@ -271,20 +280,32 @@ func filterInvalidPods(pods []kubelet.Pod, source string) (filtered []*kubelet.P
func (s *podStorage) Sync() {
s.updateLock.Lock()
defer s.updateLock.Unlock()
s.updates <- kubelet.PodUpdate{s.MergedState().([]kubelet.Pod), kubelet.SET}
s.updates <- kubelet.PodUpdate{s.MergedState().([]api.BoundPod), kubelet.SET}
}
// Object implements config.Accessor
func (s *podStorage) MergedState() interface{} {
s.podLock.RLock()
defer s.podLock.RUnlock()
pods := make([]kubelet.Pod, 0)
for source, sourcePods := range s.pods {
var pods []api.BoundPod
for _, sourcePods := range s.pods {
for _, podRef := range sourcePods {
pod := *podRef
pod.Namespace = source
pods = append(pods, pod)
pod, err := api.Scheme.Copy(podRef)
if err != nil {
glog.Errorf("unable to copy pod: %v", err)
}
pods = append(pods, *pod.(*api.BoundPod))
}
}
return pods
}
// podUniqueName returns a value for a given pod that is unique across a source,
// which is the combination of namespace and ID.
func podUniqueName(pod *api.BoundPod) string {
namespace := pod.Namespace
if len(namespace) == 0 {
namespace = api.NamespaceDefault
}
return fmt.Sprintf("%s.%s", pod.ID, namespace)
}

View File

@@ -18,6 +18,7 @@ package config
import (
"reflect"
"sort"
"testing"
"github.com/GoogleCloudPlatform/kubernetes/pkg/api"
@@ -32,7 +33,7 @@ func expectEmptyChannel(t *testing.T, ch <-chan interface{}) {
}
}
type sortedPods []kubelet.Pod
type sortedPods []api.BoundPod
func (s sortedPods) Len() int {
return len(s)
@@ -41,25 +42,27 @@ func (s sortedPods) Swap(i, j int) {
s[i], s[j] = s[j], s[i]
}
func (s sortedPods) Less(i, j int) bool {
if s[i].Namespace < s[j].Namespace {
return true
}
return s[i].Name < s[j].Name
return s[i].ID < s[j].ID
}
func CreateValidPod(name, namespace string) kubelet.Pod {
return kubelet.Pod{
Name: name,
Namespace: namespace,
Manifest: api.ContainerManifest{
Version: "v1beta1",
func CreateValidPod(name, namespace, source string) api.BoundPod {
return api.BoundPod{
TypeMeta: api.TypeMeta{
ID: name,
Namespace: namespace,
Annotations: map[string]string{kubelet.ConfigSourceAnnotationKey: source},
},
Spec: api.PodSpec{
RestartPolicy: api.RestartPolicy{Always: &api.RestartPolicyAlways{}},
},
}
}
func CreatePodUpdate(op kubelet.PodOperation, pods ...kubelet.Pod) kubelet.PodUpdate {
newPods := make([]kubelet.Pod, len(pods))
func CreatePodUpdate(op kubelet.PodOperation, pods ...api.BoundPod) kubelet.PodUpdate {
if len(pods) == 0 {
return kubelet.PodUpdate{Op: op}
}
newPods := make([]api.BoundPod, len(pods))
for i := range pods {
newPods[i] = pods[i]
}
@@ -76,6 +79,7 @@ func createPodConfigTester(mode PodConfigNotificationMode) (chan<- interface{},
func expectPodUpdate(t *testing.T, ch <-chan kubelet.PodUpdate, expected ...kubelet.PodUpdate) {
for i := range expected {
update := <-ch
sort.Sort(sortedPods(update.Pods))
if !reflect.DeepEqual(expected[i], update) {
t.Fatalf("Expected %#v, Got %#v", expected[i], update)
}
@@ -95,24 +99,63 @@ func TestNewPodAdded(t *testing.T) {
channel, ch, config := createPodConfigTester(PodConfigNotificationIncremental)
// see an update
podUpdate := CreatePodUpdate(kubelet.ADD, CreateValidPod("foo", ""))
podUpdate := CreatePodUpdate(kubelet.ADD, CreateValidPod("foo", "new", ""))
channel <- podUpdate
expectPodUpdate(t, ch, CreatePodUpdate(kubelet.ADD, CreateValidPod("foo", "test")))
expectPodUpdate(t, ch, CreatePodUpdate(kubelet.ADD, CreateValidPod("foo", "new", "test")))
config.Sync()
expectPodUpdate(t, ch, CreatePodUpdate(kubelet.SET, CreateValidPod("foo", "test")))
expectPodUpdate(t, ch, CreatePodUpdate(kubelet.SET, CreateValidPod("foo", "new", "test")))
}
func TestNewPodAddedInvalidNamespace(t *testing.T) {
channel, ch, config := createPodConfigTester(PodConfigNotificationIncremental)
// see an update
podUpdate := CreatePodUpdate(kubelet.ADD, CreateValidPod("foo", "", ""))
channel <- podUpdate
config.Sync()
expectPodUpdate(t, ch, CreatePodUpdate(kubelet.SET))
}
func TestNewPodAddedDefaultNamespace(t *testing.T) {
channel, ch, config := createPodConfigTester(PodConfigNotificationIncremental)
// see an update
podUpdate := CreatePodUpdate(kubelet.ADD, CreateValidPod("foo", "default", ""))
channel <- podUpdate
expectPodUpdate(t, ch, CreatePodUpdate(kubelet.ADD, CreateValidPod("foo", "default", "test")))
config.Sync()
expectPodUpdate(t, ch, CreatePodUpdate(kubelet.SET, CreateValidPod("foo", "default", "test")))
}
func TestNewPodAddedDifferentNamespaces(t *testing.T) {
channel, ch, config := createPodConfigTester(PodConfigNotificationIncremental)
// see an update
podUpdate := CreatePodUpdate(kubelet.ADD, CreateValidPod("foo", "default", ""))
channel <- podUpdate
expectPodUpdate(t, ch, CreatePodUpdate(kubelet.ADD, CreateValidPod("foo", "default", "test")))
// see an update in another namespace
podUpdate = CreatePodUpdate(kubelet.ADD, CreateValidPod("foo", "new", ""))
channel <- podUpdate
expectPodUpdate(t, ch, CreatePodUpdate(kubelet.ADD, CreateValidPod("foo", "new", "test")))
config.Sync()
expectPodUpdate(t, ch, CreatePodUpdate(kubelet.SET, CreateValidPod("foo", "default", "test"), CreateValidPod("foo", "new", "test")))
}
func TestInvalidPodFiltered(t *testing.T) {
channel, ch, _ := createPodConfigTester(PodConfigNotificationIncremental)
// see an update
podUpdate := CreatePodUpdate(kubelet.ADD, CreateValidPod("foo", ""))
podUpdate := CreatePodUpdate(kubelet.ADD, CreateValidPod("foo", "new", ""))
channel <- podUpdate
expectPodUpdate(t, ch, CreatePodUpdate(kubelet.ADD, CreateValidPod("foo", "test")))
expectPodUpdate(t, ch, CreatePodUpdate(kubelet.ADD, CreateValidPod("foo", "new", "test")))
// add an invalid update
podUpdate = CreatePodUpdate(kubelet.UPDATE, kubelet.Pod{Name: "foo"})
podUpdate = CreatePodUpdate(kubelet.UPDATE, api.BoundPod{TypeMeta: api.TypeMeta{ID: "foo"}})
channel <- podUpdate
expectNoPodUpdate(t, ch)
}
@@ -121,16 +164,16 @@ func TestNewPodAddedSnapshotAndUpdates(t *testing.T) {
channel, ch, config := createPodConfigTester(PodConfigNotificationSnapshotAndUpdates)
// see an set
podUpdate := CreatePodUpdate(kubelet.ADD, CreateValidPod("foo", ""))
podUpdate := CreatePodUpdate(kubelet.ADD, CreateValidPod("foo", "new", ""))
channel <- podUpdate
expectPodUpdate(t, ch, CreatePodUpdate(kubelet.SET, CreateValidPod("foo", "test")))
expectPodUpdate(t, ch, CreatePodUpdate(kubelet.SET, CreateValidPod("foo", "new", "test")))
config.Sync()
expectPodUpdate(t, ch, CreatePodUpdate(kubelet.SET, CreateValidPod("foo", "test")))
expectPodUpdate(t, ch, CreatePodUpdate(kubelet.SET, CreateValidPod("foo", "new", "test")))
// container updates are separated as UPDATE
pod := podUpdate.Pods[0]
pod.Manifest.Containers = []api.Container{{Name: "bar", Image: "test"}}
pod.Spec.Containers = []api.Container{{Name: "bar", Image: "test"}}
channel <- CreatePodUpdate(kubelet.ADD, pod)
expectPodUpdate(t, ch, CreatePodUpdate(kubelet.UPDATE, pod))
}
@@ -139,16 +182,16 @@ func TestNewPodAddedSnapshot(t *testing.T) {
channel, ch, config := createPodConfigTester(PodConfigNotificationSnapshot)
// see an set
podUpdate := CreatePodUpdate(kubelet.ADD, CreateValidPod("foo", ""))
podUpdate := CreatePodUpdate(kubelet.ADD, CreateValidPod("foo", "new", ""))
channel <- podUpdate
expectPodUpdate(t, ch, CreatePodUpdate(kubelet.SET, CreateValidPod("foo", "test")))
expectPodUpdate(t, ch, CreatePodUpdate(kubelet.SET, CreateValidPod("foo", "new", "test")))
config.Sync()
expectPodUpdate(t, ch, CreatePodUpdate(kubelet.SET, CreateValidPod("foo", "test")))
expectPodUpdate(t, ch, CreatePodUpdate(kubelet.SET, CreateValidPod("foo", "new", "test")))
// container updates are separated as UPDATE
pod := podUpdate.Pods[0]
pod.Manifest.Containers = []api.Container{{Name: "bar", Image: "test"}}
pod.Spec.Containers = []api.Container{{Name: "bar", Image: "test"}}
channel <- CreatePodUpdate(kubelet.ADD, pod)
expectPodUpdate(t, ch, CreatePodUpdate(kubelet.SET, pod))
}
@@ -157,21 +200,21 @@ func TestNewPodAddedUpdatedRemoved(t *testing.T) {
channel, ch, _ := createPodConfigTester(PodConfigNotificationIncremental)
// should register an add
podUpdate := CreatePodUpdate(kubelet.ADD, CreateValidPod("foo", ""))
podUpdate := CreatePodUpdate(kubelet.ADD, CreateValidPod("foo", "new", ""))
channel <- podUpdate
expectPodUpdate(t, ch, CreatePodUpdate(kubelet.ADD, CreateValidPod("foo", "test")))
expectPodUpdate(t, ch, CreatePodUpdate(kubelet.ADD, CreateValidPod("foo", "new", "test")))
// should ignore ADDs that are identical
expectNoPodUpdate(t, ch)
// an kubelet.ADD should be converted to kubelet.UPDATE
pod := CreateValidPod("foo", "test")
pod.Manifest.Containers = []api.Container{{Name: "bar", Image: "test"}}
pod := CreateValidPod("foo", "new", "test")
pod.Spec.Containers = []api.Container{{Name: "bar", Image: "test"}}
podUpdate = CreatePodUpdate(kubelet.ADD, pod)
channel <- podUpdate
expectPodUpdate(t, ch, CreatePodUpdate(kubelet.UPDATE, pod))
podUpdate = CreatePodUpdate(kubelet.REMOVE, kubelet.Pod{Name: "foo"})
podUpdate = CreatePodUpdate(kubelet.REMOVE, api.BoundPod{TypeMeta: api.TypeMeta{ID: "foo", Namespace: "new"}})
channel <- podUpdate
expectPodUpdate(t, ch, CreatePodUpdate(kubelet.REMOVE, pod))
}
@@ -180,20 +223,20 @@ func TestNewPodAddedUpdatedSet(t *testing.T) {
channel, ch, _ := createPodConfigTester(PodConfigNotificationIncremental)
// should register an add
podUpdate := CreatePodUpdate(kubelet.ADD, CreateValidPod("foo", ""), CreateValidPod("foo2", ""), CreateValidPod("foo3", ""))
podUpdate := CreatePodUpdate(kubelet.ADD, CreateValidPod("foo", "new", ""), CreateValidPod("foo2", "new", ""), CreateValidPod("foo3", "new", ""))
channel <- podUpdate
expectPodUpdate(t, ch, CreatePodUpdate(kubelet.ADD, CreateValidPod("foo", "test"), CreateValidPod("foo2", "test"), CreateValidPod("foo3", "test")))
expectPodUpdate(t, ch, CreatePodUpdate(kubelet.ADD, CreateValidPod("foo", "new", "test"), CreateValidPod("foo2", "new", "test"), CreateValidPod("foo3", "new", "test")))
// should ignore ADDs that are identical
expectNoPodUpdate(t, ch)
// should be converted to an kubelet.ADD, kubelet.REMOVE, and kubelet.UPDATE
pod := CreateValidPod("foo2", "test")
pod.Manifest.Containers = []api.Container{{Name: "bar", Image: "test"}}
podUpdate = CreatePodUpdate(kubelet.SET, pod, CreateValidPod("foo3", ""), CreateValidPod("foo4", "test"))
pod := CreateValidPod("foo2", "new", "test")
pod.Spec.Containers = []api.Container{{Name: "bar", Image: "test"}}
podUpdate = CreatePodUpdate(kubelet.SET, pod, CreateValidPod("foo3", "new", ""), CreateValidPod("foo4", "new", "test"))
channel <- podUpdate
expectPodUpdate(t, ch,
CreatePodUpdate(kubelet.REMOVE, CreateValidPod("foo", "test")),
CreatePodUpdate(kubelet.ADD, CreateValidPod("foo4", "test")),
CreatePodUpdate(kubelet.REMOVE, CreateValidPod("foo", "new", "test")),
CreatePodUpdate(kubelet.ADD, CreateValidPod("foo4", "new", "test")),
CreatePodUpdate(kubelet.UPDATE, pod))
}

View File

@@ -21,6 +21,7 @@ import (
"errors"
"fmt"
"path"
"strconv"
"time"
"github.com/GoogleCloudPlatform/kubernetes/pkg/api"
@@ -86,21 +87,26 @@ func (s *SourceEtcd) run() {
// eventToPods takes a watch.Event object, and turns it into a structured list of pods.
// It returns a list of containers, or an error if one occurs.
func eventToPods(ev watch.Event) ([]kubelet.Pod, error) {
pods := []kubelet.Pod{}
manifests, ok := ev.Object.(*api.ContainerManifestList)
func eventToPods(ev watch.Event) ([]api.BoundPod, error) {
pods := []api.BoundPod{}
boundPods, ok := ev.Object.(*api.BoundPods)
if !ok {
return pods, errors.New("unable to parse response as ContainerManifestList")
return pods, errors.New("unable to parse response as BoundPods")
}
for i, manifest := range manifests.Items {
name := manifest.ID
if name == "" {
name = fmt.Sprintf("%d", i+1)
for i, pod := range boundPods.Items {
if len(pod.ID) == 0 {
pod.ID = fmt.Sprintf("%d", i+1)
}
pods = append(pods, kubelet.Pod{
Name: name,
Manifest: manifest})
// TODO: generate random UID if not present
if pod.UID == "" && !pod.CreationTimestamp.IsZero() {
pod.UID = strconv.FormatInt(pod.CreationTimestamp.Unix(), 10)
}
// Backwards compatibility with old api servers
if len(pod.Namespace) == 0 {
pod.Namespace = api.NamespaceDefault
}
pods = append(pods, pod)
}
return pods, nil

View File

@@ -21,53 +21,52 @@ import (
"testing"
"github.com/GoogleCloudPlatform/kubernetes/pkg/api"
"github.com/GoogleCloudPlatform/kubernetes/pkg/kubelet"
"github.com/GoogleCloudPlatform/kubernetes/pkg/watch"
)
func TestEventToPods(t *testing.T) {
tests := []struct {
input watch.Event
pods []kubelet.Pod
pods []api.BoundPod
fail bool
}{
{
input: watch.Event{Object: nil},
pods: []kubelet.Pod{},
pods: []api.BoundPod{},
fail: true,
},
{
input: watch.Event{Object: &api.ContainerManifestList{}},
pods: []kubelet.Pod{},
input: watch.Event{Object: &api.BoundPods{}},
pods: []api.BoundPod{},
fail: false,
},
{
input: watch.Event{
Object: &api.ContainerManifestList{
Items: []api.ContainerManifest{
{ID: "foo"},
{ID: "bar"},
Object: &api.BoundPods{
Items: []api.BoundPod{
{TypeMeta: api.TypeMeta{ID: "foo"}},
{TypeMeta: api.TypeMeta{ID: "bar"}},
},
},
},
pods: []kubelet.Pod{
{Name: "foo", Manifest: api.ContainerManifest{ID: "foo"}},
{Name: "bar", Manifest: api.ContainerManifest{ID: "bar"}},
pods: []api.BoundPod{
{TypeMeta: api.TypeMeta{ID: "foo", Namespace: "default"}, Spec: api.PodSpec{}},
{TypeMeta: api.TypeMeta{ID: "bar", Namespace: "default"}, Spec: api.PodSpec{}},
},
fail: false,
},
{
input: watch.Event{
Object: &api.ContainerManifestList{
Items: []api.ContainerManifest{
{ID: ""},
{ID: ""},
Object: &api.BoundPods{
Items: []api.BoundPod{
{TypeMeta: api.TypeMeta{ID: "1"}},
{TypeMeta: api.TypeMeta{ID: "2", Namespace: "foo"}},
},
},
},
pods: []kubelet.Pod{
{Name: "1", Manifest: api.ContainerManifest{ID: ""}},
{Name: "2", Manifest: api.ContainerManifest{ID: ""}},
pods: []api.BoundPod{
{TypeMeta: api.TypeMeta{ID: "1", Namespace: "default"}, Spec: api.PodSpec{}},
{TypeMeta: api.TypeMeta{ID: "2", Namespace: "foo"}, Spec: api.PodSpec{}},
},
fail: false,
},

View File

@@ -29,8 +29,10 @@ import (
"strings"
"time"
"github.com/GoogleCloudPlatform/kubernetes/pkg/api"
"github.com/GoogleCloudPlatform/kubernetes/pkg/kubelet"
"github.com/GoogleCloudPlatform/kubernetes/pkg/util"
"github.com/golang/glog"
"gopkg.in/v1/yaml"
)
@@ -79,7 +81,7 @@ func (s *SourceFile) extractFromPath() error {
if err != nil {
return err
}
s.updates <- kubelet.PodUpdate{[]kubelet.Pod{pod}, kubelet.SET}
s.updates <- kubelet.PodUpdate{[]api.BoundPod{pod}, kubelet.SET}
default:
return fmt.Errorf("path is not a directory or file")
@@ -88,28 +90,29 @@ func (s *SourceFile) extractFromPath() error {
return nil
}
func extractFromDir(name string) ([]kubelet.Pod, error) {
pods := []kubelet.Pod{}
func extractFromDir(name string) ([]api.BoundPod, error) {
files, err := filepath.Glob(filepath.Join(name, "[^.]*"))
if err != nil {
return pods, err
return nil, err
}
if len(files) == 0 {
return nil, nil
}
sort.Strings(files)
pods := []api.BoundPod{}
for _, file := range files {
pod, err := extractFromFile(file)
if err != nil {
return []kubelet.Pod{}, err
return nil, err
}
pods = append(pods, pod)
}
return pods, nil
}
func extractFromFile(name string) (kubelet.Pod, error) {
var pod kubelet.Pod
func extractFromFile(name string) (api.BoundPod, error) {
var pod api.BoundPod
file, err := os.Open(name)
if err != nil {
@@ -123,15 +126,23 @@ func extractFromFile(name string) (kubelet.Pod, error) {
return pod, err
}
if err := yaml.Unmarshal(data, &pod.Manifest); err != nil {
return pod, fmt.Errorf("could not unmarshal manifest: %v", err)
manifest := &api.ContainerManifest{}
// TODO: use api.Scheme.DecodeInto
if err := yaml.Unmarshal(data, manifest); err != nil {
return pod, err
}
podName := pod.Manifest.ID
if podName == "" {
podName = simpleSubdomainSafeHash(name)
if err := api.Scheme.Convert(manifest, &pod); err != nil {
return pod, err
}
pod.ID = simpleSubdomainSafeHash(name)
if len(pod.UID) == 0 {
pod.UID = simpleSubdomainSafeHash(name)
}
if len(pod.Namespace) == 0 {
pod.Namespace = api.NamespaceDefault
}
pod.Name = podName
return pod, nil
}

View File

@@ -26,10 +26,57 @@ import (
"time"
"github.com/GoogleCloudPlatform/kubernetes/pkg/api"
"github.com/GoogleCloudPlatform/kubernetes/pkg/api/validation"
"github.com/GoogleCloudPlatform/kubernetes/pkg/kubelet"
"gopkg.in/v1/yaml"
)
func ExampleManifestAndPod(id string) (api.ContainerManifest, api.BoundPod) {
manifest := api.ContainerManifest{
ID: id,
UUID: "uid",
Containers: []api.Container{
{
Name: "c" + id,
Image: "foo",
},
},
Volumes: []api.Volume{
{
Name: "host-dir",
Source: &api.VolumeSource{
HostDir: &api.HostDir{"/dir/path"},
},
},
},
}
expectedPod := api.BoundPod{
TypeMeta: api.TypeMeta{
ID: id,
UID: "uid",
Namespace: "default",
},
Spec: api.PodSpec{
Containers: []api.Container{
{
Name: "c" + id,
Image: "foo",
},
},
Volumes: []api.Volume{
{
Name: "host-dir",
Source: &api.VolumeSource{
HostDir: &api.HostDir{"/dir/path"},
},
},
},
},
}
return manifest, expectedPod
}
func TestExtractFromNonExistentFile(t *testing.T) {
ch := make(chan interface{}, 1)
c := SourceFile{"/some/fake/file", ch}
@@ -70,16 +117,18 @@ func TestReadFromFile(t *testing.T) {
select {
case got := <-ch:
update := got.(kubelet.PodUpdate)
expected := CreatePodUpdate(kubelet.SET, kubelet.Pod{
Name: "test",
Manifest: api.ContainerManifest{
ID: "test",
Version: "v1beta1",
expected := CreatePodUpdate(kubelet.SET, api.BoundPod{
TypeMeta: api.TypeMeta{
ID: simpleSubdomainSafeHash(file.Name()),
UID: simpleSubdomainSafeHash(file.Name()),
Namespace: "default",
},
Spec: api.PodSpec{
Containers: []api.Container{{Image: "test/image"}},
},
})
if !reflect.DeepEqual(expected, update) {
t.Errorf("Expected %#v, Got %#v", expected, update)
t.Fatalf("Expected %#v, Got %#v", expected, update)
}
case <-time.After(2 * time.Millisecond):
@@ -95,29 +144,31 @@ func TestExtractFromBadDataFile(t *testing.T) {
c := SourceFile{file.Name(), ch}
err := c.extractFromPath()
if err == nil {
t.Errorf("Expected error")
t.Fatalf("Expected error")
}
expectEmptyChannel(t, ch)
}
func TestExtractFromValidDataFile(t *testing.T) {
manifest := api.ContainerManifest{ID: ""}
manifest, expectedPod := ExampleManifestAndPod("id")
text, err := json.Marshal(manifest)
if err != nil {
t.Errorf("Unexpected error: %v", err)
t.Fatalf("Unexpected error: %v", err)
}
file := writeTestFile(t, os.TempDir(), "test_pod_config", string(text))
defer os.Remove(file.Name())
expectedPod.ID = simpleSubdomainSafeHash(file.Name())
ch := make(chan interface{}, 1)
c := SourceFile{file.Name(), ch}
err = c.extractFromPath()
if err != nil {
t.Errorf("Unexpected error: %v", err)
t.Fatalf("Unexpected error: %v", err)
}
update := (<-ch).(kubelet.PodUpdate)
expected := CreatePodUpdate(kubelet.SET, kubelet.Pod{Name: simpleSubdomainSafeHash(file.Name()), Manifest: manifest})
expected := CreatePodUpdate(kubelet.SET, expectedPod)
if !reflect.DeepEqual(expected, update) {
t.Errorf("Expected %#v, Got %#v", expected, update)
}
@@ -126,7 +177,7 @@ func TestExtractFromValidDataFile(t *testing.T) {
func TestExtractFromEmptyDir(t *testing.T) {
dirName, err := ioutil.TempDir("", "foo")
if err != nil {
t.Errorf("Unexpected error: %v", err)
t.Fatalf("Unexpected error: %v", err)
}
defer os.RemoveAll(dirName)
@@ -134,7 +185,7 @@ func TestExtractFromEmptyDir(t *testing.T) {
c := SourceFile{dirName, ch}
err = c.extractFromPath()
if err != nil {
t.Errorf("Unexpected error: %v", err)
t.Fatalf("Unexpected error: %v", err)
}
update := (<-ch).(kubelet.PodUpdate)
@@ -145,54 +196,55 @@ func TestExtractFromEmptyDir(t *testing.T) {
}
func TestExtractFromDir(t *testing.T) {
manifests := []api.ContainerManifest{
{Version: "v1beta1", Containers: []api.Container{{Name: "1", Image: "foo"}}},
{Version: "v1beta1", Containers: []api.Container{{Name: "2", Image: "bar"}}},
}
manifest, expectedPod := ExampleManifestAndPod("1")
manifest2, expectedPod2 := ExampleManifestAndPod("2")
manifests := []api.ContainerManifest{manifest, manifest2}
pods := []api.BoundPod{expectedPod, expectedPod2}
files := make([]*os.File, len(manifests))
dirName, err := ioutil.TempDir("", "foo")
if err != nil {
t.Errorf("Unexpected error: %v", err)
t.Fatalf("Unexpected error: %v", err)
}
for i, manifest := range manifests {
data, err := json.Marshal(manifest)
if err != nil {
t.Errorf("Unexpected error: %v", err)
continue
}
file, err := ioutil.TempFile(dirName, manifest.ID)
if err != nil {
t.Errorf("Unexpected error: %v", err)
continue
}
name := file.Name()
if err := file.Close(); err != nil {
t.Errorf("Unexpected error: %v", err)
continue
}
ioutil.WriteFile(name, data, 0755)
files[i] = file
pods[i].ID = simpleSubdomainSafeHash(name)
}
ch := make(chan interface{}, 1)
c := SourceFile{dirName, ch}
err = c.extractFromPath()
if err != nil {
t.Errorf("Unexpected error: %v", err)
t.Fatalf("Unexpected error: %v", err)
}
update := (<-ch).(kubelet.PodUpdate)
expected := CreatePodUpdate(
kubelet.SET,
kubelet.Pod{Name: simpleSubdomainSafeHash(files[0].Name()), Manifest: manifests[0]},
kubelet.Pod{Name: simpleSubdomainSafeHash(files[1].Name()), Manifest: manifests[1]},
)
expected := CreatePodUpdate(kubelet.SET, pods...)
sort.Sort(sortedPods(update.Pods))
sort.Sort(sortedPods(expected.Pods))
if !reflect.DeepEqual(expected, update) {
t.Errorf("Expected %#v, Got %#v", expected, update)
t.Fatalf("Expected %#v, Got %#v", expected, update)
}
for i := range update.Pods {
if errs := kubelet.ValidatePod(&update.Pods[i]); len(errs) != 0 {
if errs := validation.ValidateBoundPod(&update.Pods[i]); len(errs) != 0 {
t.Errorf("Expected no validation errors on %#v, Got %#v", update.Pods[i], errs)
}
}

View File

@@ -28,6 +28,7 @@ import (
"github.com/GoogleCloudPlatform/kubernetes/pkg/api/validation"
"github.com/GoogleCloudPlatform/kubernetes/pkg/kubelet"
"github.com/GoogleCloudPlatform/kubernetes/pkg/util"
"github.com/golang/glog"
"gopkg.in/v1/yaml"
)
@@ -79,6 +80,7 @@ func (s *SourceURL) extractFromURL() error {
// First try as if it's a single manifest
var manifest api.ContainerManifest
// TODO: should be api.Scheme.Decode
singleErr := yaml.Unmarshal(data, &manifest)
if singleErr == nil {
if errs := validation.ValidateManifest(&manifest); len(errs) > 0 {
@@ -86,16 +88,23 @@ func (s *SourceURL) extractFromURL() error {
}
}
if singleErr == nil {
pod := kubelet.Pod{Name: manifest.ID, Manifest: manifest}
if pod.Name == "" {
pod.Name = "1"
pod := api.BoundPod{}
if err := api.Scheme.Convert(&manifest, &pod); err != nil {
return err
}
s.updates <- kubelet.PodUpdate{[]kubelet.Pod{pod}, kubelet.SET}
if len(pod.ID) == 0 {
pod.ID = "1"
}
if len(pod.Namespace) == 0 {
pod.Namespace = api.NamespaceDefault
}
s.updates <- kubelet.PodUpdate{[]api.BoundPod{pod}, kubelet.SET}
return nil
}
// That didn't work, so try an array of manifests.
var manifests []api.ContainerManifest
// TODO: should be api.Scheme.Decode
multiErr := yaml.Unmarshal(data, &manifests)
// We're not sure if the person reading the logs is going to care about the single or
// multiple manifest unmarshalling attempt, so we need to put both in the logs, as is
@@ -113,18 +122,24 @@ func (s *SourceURL) extractFromURL() error {
// array of manifests (and no error) when unmarshaled as such. In that case,
// if the single manifest at least had a Version, we return the single-manifest
// error (if any).
if len(manifests) == 0 && manifest.Version != "" {
if len(manifests) == 0 && len(manifest.Version) != 0 {
return singleErr
}
pods := []kubelet.Pod{}
for i, manifest := range manifests {
pod := kubelet.Pod{Name: manifest.ID, Manifest: manifest}
if pod.Name == "" {
pod.Name = fmt.Sprintf("%d", i+1)
}
pods = append(pods, pod)
list := api.ContainerManifestList{Items: manifests}
boundPods := &api.BoundPods{}
if err := api.Scheme.Convert(&list, boundPods); err != nil {
return err
}
s.updates <- kubelet.PodUpdate{pods, kubelet.SET}
for i := range boundPods.Items {
pod := &boundPods.Items[i]
if len(pod.ID) == 0 {
pod.ID = fmt.Sprintf("%d", i+1)
}
if len(pod.Namespace) == 0 {
pod.Namespace = api.NamespaceDefault
}
}
s.updates <- kubelet.PodUpdate{boundPods.Items, kubelet.SET}
return nil
}

View File

@@ -24,6 +24,7 @@ import (
"time"
"github.com/GoogleCloudPlatform/kubernetes/pkg/api"
"github.com/GoogleCloudPlatform/kubernetes/pkg/api/validation"
"github.com/GoogleCloudPlatform/kubernetes/pkg/kubelet"
"github.com/GoogleCloudPlatform/kubernetes/pkg/util"
)
@@ -122,11 +123,12 @@ func TestExtractFromHTTP(t *testing.T) {
desc: "Single manifest",
manifests: api.ContainerManifest{Version: "v1beta1", ID: "foo"},
expected: CreatePodUpdate(kubelet.SET,
kubelet.Pod{
Name: "foo",
Manifest: api.ContainerManifest{
Version: "v1beta1",
ID: "foo",
api.BoundPod{
TypeMeta: api.TypeMeta{
ID: "foo",
Namespace: "default",
},
Spec: api.PodSpec{
RestartPolicy: api.RestartPolicy{Always: &api.RestartPolicyAlways{}},
},
}),
@@ -138,13 +140,19 @@ func TestExtractFromHTTP(t *testing.T) {
{Version: "v1beta1", ID: "bar", Containers: []api.Container{{Name: "1", Image: "foo"}}},
},
expected: CreatePodUpdate(kubelet.SET,
kubelet.Pod{
Name: "1",
Manifest: api.ContainerManifest{Version: "v1beta1", ID: "", Containers: []api.Container{{Name: "1", Image: "foo"}}},
api.BoundPod{
TypeMeta: api.TypeMeta{
ID: "1",
Namespace: "default",
},
Spec: api.PodSpec{Containers: []api.Container{{Name: "1", Image: "foo"}}},
},
kubelet.Pod{
Name: "bar",
Manifest: api.ContainerManifest{Version: "v1beta1", ID: "bar", Containers: []api.Container{{Name: "1", Image: "foo"}}},
api.BoundPod{
TypeMeta: api.TypeMeta{
ID: "bar",
Namespace: "default",
},
Spec: api.PodSpec{Containers: []api.Container{{Name: "1", Image: "foo"}}},
}),
},
{
@@ -167,13 +175,14 @@ func TestExtractFromHTTP(t *testing.T) {
c := SourceURL{testServer.URL, ch, nil}
if err := c.extractFromURL(); err != nil {
t.Errorf("%s: Unexpected error: %v", testCase.desc, err)
continue
}
update := (<-ch).(kubelet.PodUpdate)
if !reflect.DeepEqual(testCase.expected, update) {
t.Errorf("%s: Expected: %#v, Got: %#v", testCase.desc, testCase.expected, update)
}
for i := range update.Pods {
if errs := kubelet.ValidatePod(&update.Pods[i]); len(errs) != 0 {
if errs := validation.ValidateBoundPod(&update.Pods[i]); len(errs) != 0 {
t.Errorf("%s: Expected no validation errors on %#v, Got %#v", testCase.desc, update.Pods[i], errs)
}
}