diff --git a/pkg/api/conversion.go b/pkg/api/conversion.go index 5c8cd1c8097..e6f93710318 100644 --- a/pkg/api/conversion.go +++ b/pkg/api/conversion.go @@ -17,9 +17,64 @@ limitations under the License. package api import ( + "github.com/GoogleCloudPlatform/kubernetes/pkg/conversion" "github.com/GoogleCloudPlatform/kubernetes/pkg/runtime" ) // Codec is the identity codec for this package - it can only convert itself // to itself. var Codec = runtime.CodecFor(Scheme, "") + +func init() { + Scheme.AddConversionFuncs( + // Convert ContainerManifest to BoundPod + func(in *ContainerManifest, out *BoundPod, s conversion.Scope) error { + out.Spec.Containers = in.Containers + out.Spec.Volumes = in.Volumes + out.Spec.RestartPolicy = in.RestartPolicy + out.ID = in.ID + out.UID = in.UUID + return nil + }, + func(in *BoundPod, out *ContainerManifest, s conversion.Scope) error { + out.Containers = in.Spec.Containers + out.Volumes = in.Spec.Volumes + out.RestartPolicy = in.Spec.RestartPolicy + out.Version = "v1beta2" + out.ID = in.ID + out.UUID = in.UID + return nil + }, + func(in *ContainerManifestList, out *BoundPods, s conversion.Scope) error { + if err := s.Convert(&in.Items, &out.Items, 0); err != nil { + return err + } + for i := range out.Items { + item := &out.Items[i] + item.ResourceVersion = in.ResourceVersion + } + return nil + }, + func(in *BoundPods, out *ContainerManifestList, s conversion.Scope) error { + if err := s.Convert(&in.Items, &out.Items, 0); err != nil { + return err + } + out.ResourceVersion = in.ResourceVersion + return nil + }, + + // Convert Pod to BoundPod + func(in *Pod, out *BoundPod, s conversion.Scope) error { + if err := s.Convert(&in.DesiredState.Manifest, out, 0); err != nil { + return err + } + // Only copy a subset of fields, and override manifest attributes with the pod + // metadata + out.UID = in.UID + out.ID = in.ID + out.Namespace = in.Namespace + out.CreationTimestamp = in.CreationTimestamp + return nil + }, + ) +} diff --git a/pkg/api/register.go b/pkg/api/register.go index 5e7bb8cc42e..95b22ca87f1 100644 --- a/pkg/api/register.go +++ b/pkg/api/register.go @@ -40,7 +40,9 @@ func init() { &Binding{}, &Event{}, &EventList{}, + &ContainerManifest{}, &ContainerManifestList{}, + &BoundPod{}, &BoundPods{}, ) } @@ -61,5 +63,7 @@ func (*ServerOp) IsAnAPIObject() {} func (*ServerOpList) IsAnAPIObject() {} func (*Event) IsAnAPIObject() {} func (*EventList) IsAnAPIObject() {} +func (*ContainerManifest) IsAnAPIObject() {} func (*ContainerManifestList) IsAnAPIObject() {} +func (*BoundPod) IsAnAPIObject() {} func (*BoundPods) IsAnAPIObject() {} diff --git a/pkg/api/serialization_test.go b/pkg/api/serialization_test.go index eb0e2481559..4feeda1bd8a 100644 --- a/pkg/api/serialization_test.go +++ b/pkg/api/serialization_test.go @@ -170,6 +170,10 @@ func TestTypes(t *testing.T) { t.Errorf("Couldn't make a %v? %v", kind, err) continue } + if _, err := runtime.FindTypeMeta(item); err != nil { + t.Logf("%s is not a TypeMeta and cannot be round tripped: %v", kind, err) + continue + } runTest(t, v1beta1.Codec, item) runTest(t, v1beta2.Codec, item) runTest(t, api.Codec, item) diff --git a/pkg/api/v1beta1/register.go b/pkg/api/v1beta1/register.go index 7a125b8a92d..b278d7769ec 100644 --- a/pkg/api/v1beta1/register.go +++ b/pkg/api/v1beta1/register.go @@ -42,7 +42,9 @@ func init() { &ServerOpList{}, &Event{}, &EventList{}, + &ContainerManifest{}, &ContainerManifestList{}, + &BoundPod{}, &BoundPods{}, ) } @@ -63,5 +65,7 @@ func (*ServerOp) IsAnAPIObject() {} func (*ServerOpList) IsAnAPIObject() {} func (*Event) IsAnAPIObject() {} func (*EventList) IsAnAPIObject() {} +func (*ContainerManifest) IsAnAPIObject() {} func (*ContainerManifestList) IsAnAPIObject() {} +func (*BoundPod) IsAnAPIObject() {} func (*BoundPods) IsAnAPIObject() {} diff --git a/pkg/api/v1beta2/register.go b/pkg/api/v1beta2/register.go index 2a5439b87c5..e0319b5ee19 100644 --- a/pkg/api/v1beta2/register.go +++ b/pkg/api/v1beta2/register.go @@ -42,7 +42,9 @@ func init() { &ServerOpList{}, &Event{}, &EventList{}, + &ContainerManifest{}, &ContainerManifestList{}, + &BoundPod{}, &BoundPods{}, ) } @@ -63,5 +65,7 @@ func (*ServerOp) IsAnAPIObject() {} func (*ServerOpList) IsAnAPIObject() {} func (*Event) IsAnAPIObject() {} func (*EventList) IsAnAPIObject() {} +func (*ContainerManifest) IsAnAPIObject() {} func (*ContainerManifestList) IsAnAPIObject() {} +func (*BoundPod) IsAnAPIObject() {} func (*BoundPods) IsAnAPIObject() {} diff --git a/pkg/api/validation/validation.go b/pkg/api/validation/validation.go index a68dc681148..25bc6aa6a76 100644 --- a/pkg/api/validation/validation.go +++ b/pkg/api/validation/validation.go @@ -437,3 +437,25 @@ func ValidateReadOnlyPersistentDisks(volumes []api.Volume) errs.ErrorList { } return allErrs } + +// ValidateBoundPod tests if required fields on a bound pod are set. +func ValidateBoundPod(pod *api.BoundPod) (errors []error) { + if !util.IsDNSSubdomain(pod.ID) { + errors = append(errors, errs.NewFieldInvalid("id", pod.ID)) + } + if !util.IsDNSSubdomain(pod.Namespace) { + errors = append(errors, errs.NewFieldInvalid("namespace", pod.Namespace)) + } + containerManifest := &api.ContainerManifest{ + Version: "v1beta2", + ID: pod.ID, + UUID: pod.UID, + Containers: pod.Spec.Containers, + Volumes: pod.Spec.Volumes, + RestartPolicy: pod.Spec.RestartPolicy, + } + if errs := ValidateManifest(containerManifest); len(errs) != 0 { + errors = append(errors, errs...) + } + return errors +} diff --git a/pkg/api/validation/validation_test.go b/pkg/api/validation/validation_test.go index 9e0ed6366b3..baa3b0171fd 100644 --- a/pkg/api/validation/validation_test.go +++ b/pkg/api/validation/validation_test.go @@ -823,3 +823,21 @@ func TestValidateReplicationController(t *testing.T) { } } } + +func TestValidateBoundPodNoName(t *testing.T) { + errorCases := map[string]api.BoundPod{ + // manifest is tested in api/validation_test.go, ensure it is invoked + "empty version": {TypeMeta: api.TypeMeta{ID: "test"}, Spec: api.PodSpec{Containers: []api.Container{{Name: ""}}}}, + + // Name + "zero-length name": {TypeMeta: api.TypeMeta{ID: ""}}, + "name > 255 characters": {TypeMeta: api.TypeMeta{ID: strings.Repeat("a", 256)}}, + "name not a DNS subdomain": {TypeMeta: api.TypeMeta{ID: "a.b.c."}}, + "name with underscore": {TypeMeta: api.TypeMeta{ID: "a_b_c"}}, + } + for k, v := range errorCases { + if errs := ValidateBoundPod(&v); len(errs) == 0 { + t.Errorf("expected failure for %s", k) + } + } +} diff --git a/pkg/cloudprovider/controller/minioncontroller_test.go b/pkg/cloudprovider/controller/minioncontroller_test.go index 4e3f3221cbd..ef117fc44cd 100644 --- a/pkg/cloudprovider/controller/minioncontroller_test.go +++ b/pkg/cloudprovider/controller/minioncontroller_test.go @@ -33,10 +33,12 @@ import ( ) func NewTestEtcdRegistry(client tools.EtcdClient) *etcdregistry.Registry { - registry := etcdregistry.NewRegistry(tools.EtcdHelper{client, latest.Codec, tools.RuntimeVersionAdapter{latest.ResourceVersioner}}, - &pod.BasicManifestFactory{ + registry := etcdregistry.NewRegistry( + tools.EtcdHelper{client, latest.Codec, tools.RuntimeVersionAdapter{latest.ResourceVersioner}}, + &pod.BasicBoundPodFactory{ ServiceRegistry: ®istrytest.ServiceRegistry{}, - }) + }, + ) return registry } diff --git a/pkg/constraint/constraint.go b/pkg/constraint/constraint.go index 0e08162792f..8acc8c073d8 100644 --- a/pkg/constraint/constraint.go +++ b/pkg/constraint/constraint.go @@ -20,8 +20,8 @@ import ( "github.com/GoogleCloudPlatform/kubernetes/pkg/api" ) -// Allowed returns true if manifests is a collection of manifests +// Allowed returns true if pods is a collection of bound pods // which can run without conflict on a single minion. -func Allowed(manifests []api.ContainerManifest) bool { - return !PortsConflict(manifests) +func Allowed(pods []api.BoundPod) bool { + return !PortsConflict(pods) } diff --git a/pkg/constraint/constraint_test.go b/pkg/constraint/constraint_test.go index 4d168551fc7..1677d935df5 100644 --- a/pkg/constraint/constraint_test.go +++ b/pkg/constraint/constraint_test.go @@ -30,27 +30,27 @@ func containerWithHostPorts(ports ...int) api.Container { return c } -func manifestWithContainers(containers ...api.Container) api.ContainerManifest { - m := api.ContainerManifest{} +func podWithContainers(containers ...api.Container) api.BoundPod { + m := api.BoundPod{} for _, c := range containers { - m.Containers = append(m.Containers, c) + m.Spec.Containers = append(m.Spec.Containers, c) } return m } func TestAllowed(t *testing.T) { table := []struct { - allowed bool - manifests []api.ContainerManifest + allowed bool + pods []api.BoundPod }{ { allowed: true, - manifests: []api.ContainerManifest{ - manifestWithContainers( + pods: []api.BoundPod{ + podWithContainers( containerWithHostPorts(1, 2, 3), containerWithHostPorts(4, 5, 6), ), - manifestWithContainers( + podWithContainers( containerWithHostPorts(7, 8, 9), containerWithHostPorts(10, 11, 12), ), @@ -58,12 +58,12 @@ func TestAllowed(t *testing.T) { }, { allowed: true, - manifests: []api.ContainerManifest{ - manifestWithContainers( + pods: []api.BoundPod{ + podWithContainers( containerWithHostPorts(0, 0), containerWithHostPorts(0, 0), ), - manifestWithContainers( + podWithContainers( containerWithHostPorts(0, 0), containerWithHostPorts(0, 0), ), @@ -71,19 +71,19 @@ func TestAllowed(t *testing.T) { }, { allowed: false, - manifests: []api.ContainerManifest{ - manifestWithContainers( + pods: []api.BoundPod{ + podWithContainers( containerWithHostPorts(3, 3), ), }, }, { allowed: false, - manifests: []api.ContainerManifest{ - manifestWithContainers( + pods: []api.BoundPod{ + podWithContainers( containerWithHostPorts(6), ), - manifestWithContainers( + podWithContainers( containerWithHostPorts(6), ), }, @@ -91,8 +91,8 @@ func TestAllowed(t *testing.T) { } for _, item := range table { - if e, a := item.allowed, Allowed(item.manifests); e != a { - t.Errorf("Expected %v, got %v: \n%v\v", e, a, item.manifests) + if e, a := item.allowed, Allowed(item.pods); e != a { + t.Errorf("Expected %v, got %v: \n%v\v", e, a, item.pods) } } } diff --git a/pkg/constraint/ports.go b/pkg/constraint/ports.go index 92622469a2c..f5c95f5fb2d 100644 --- a/pkg/constraint/ports.go +++ b/pkg/constraint/ports.go @@ -22,10 +22,10 @@ import ( // PortsConflict returns true iff two containers attempt to expose // the same host port. -func PortsConflict(manifests []api.ContainerManifest) bool { +func PortsConflict(pods []api.BoundPod) bool { hostPorts := map[int]struct{}{} - for _, manifest := range manifests { - for _, container := range manifest.Containers { + for _, pod := range pods { + for _, container := range pod.Spec.Containers { for _, port := range container.Ports { if port.HostPort == 0 { continue diff --git a/pkg/kubelet/config/config.go b/pkg/kubelet/config/config.go index a110044f072..9c57ccc52d8 100644 --- a/pkg/kubelet/config/config.go +++ b/pkg/kubelet/config/config.go @@ -21,7 +21,9 @@ import ( "reflect" "sync" + "github.com/GoogleCloudPlatform/kubernetes/pkg/api" apierrs "github.com/GoogleCloudPlatform/kubernetes/pkg/api/errors" + "github.com/GoogleCloudPlatform/kubernetes/pkg/api/validation" "github.com/GoogleCloudPlatform/kubernetes/pkg/kubelet" "github.com/GoogleCloudPlatform/kubernetes/pkg/util" "github.com/GoogleCloudPlatform/kubernetes/pkg/util/config" @@ -89,7 +91,7 @@ func (c *PodConfig) Sync() { type podStorage struct { podLock sync.RWMutex // map of source name to pod name to pod reference - pods map[string]map[string]*kubelet.Pod + pods map[string]map[string]*api.BoundPod mode PodConfigNotificationMode // ensures that updates are delivered in strict order @@ -103,7 +105,7 @@ type podStorage struct { // TODO: allow initialization of the current state of the store with snapshotted version. func newPodStorage(updates chan<- kubelet.PodUpdate, mode PodConfigNotificationMode) *podStorage { return &podStorage{ - pods: make(map[string]map[string]*kubelet.Pod), + pods: make(map[string]map[string]*api.BoundPod), mode: mode, updates: updates, } @@ -136,12 +138,12 @@ func (s *podStorage) Merge(source string, change interface{}) error { s.updates <- *updates } if len(deletes.Pods) > 0 || len(adds.Pods) > 0 { - s.updates <- kubelet.PodUpdate{s.MergedState().([]kubelet.Pod), kubelet.SET} + s.updates <- kubelet.PodUpdate{s.MergedState().([]api.BoundPod), kubelet.SET} } case PodConfigNotificationSnapshot: if len(updates.Pods) > 0 || len(deletes.Pods) > 0 || len(adds.Pods) > 0 { - s.updates <- kubelet.PodUpdate{s.MergedState().([]kubelet.Pod), kubelet.SET} + s.updates <- kubelet.PodUpdate{s.MergedState().([]api.BoundPod), kubelet.SET} } default: @@ -161,7 +163,7 @@ func (s *podStorage) merge(source string, change interface{}) (adds, updates, de pods := s.pods[source] if pods == nil { - pods = make(map[string]*kubelet.Pod) + pods = make(map[string]*api.BoundPod) } update := change.(kubelet.PodUpdate) @@ -175,11 +177,11 @@ func (s *podStorage) merge(source string, change interface{}) (adds, updates, de filtered := filterInvalidPods(update.Pods, source) for _, ref := range filtered { - name := ref.Name + name := podUniqueName(ref) if existing, found := pods[name]; found { - if !reflect.DeepEqual(existing.Manifest, ref.Manifest) { + if !reflect.DeepEqual(existing.Spec, ref.Spec) { // this is an update - existing.Manifest = ref.Manifest + existing.Spec = ref.Spec updates.Pods = append(updates.Pods, *existing) continue } @@ -187,7 +189,10 @@ func (s *podStorage) merge(source string, change interface{}) (adds, updates, de continue } // this is an add - ref.Namespace = source + if ref.Annotations == nil { + ref.Annotations = make(map[string]string) + } + ref.Annotations[kubelet.ConfigSourceAnnotationKey] = source pods[name] = ref adds.Pods = append(adds.Pods, *ref) } @@ -195,7 +200,7 @@ func (s *podStorage) merge(source string, change interface{}) (adds, updates, de case kubelet.REMOVE: glog.V(4).Infof("Removing a pod %v", update) for _, value := range update.Pods { - name := value.Name + name := podUniqueName(&value) if existing, found := pods[name]; found { // this is a delete delete(pods, name) @@ -209,23 +214,26 @@ func (s *podStorage) merge(source string, change interface{}) (adds, updates, de glog.V(4).Infof("Setting pods for source %s : %v", source, update) // Clear the old map entries by just creating a new map oldPods := pods - pods = make(map[string]*kubelet.Pod) + pods = make(map[string]*api.BoundPod) filtered := filterInvalidPods(update.Pods, source) for _, ref := range filtered { - name := ref.Name + name := podUniqueName(ref) if existing, found := oldPods[name]; found { pods[name] = existing - if !reflect.DeepEqual(existing.Manifest, ref.Manifest) { + if !reflect.DeepEqual(existing.Spec, ref.Spec) { // this is an update - existing.Manifest = ref.Manifest + existing.Spec = ref.Spec updates.Pods = append(updates.Pods, *existing) continue } // this is a no-op continue } - ref.Namespace = source + if ref.Annotations == nil { + ref.Annotations = make(map[string]string) + } + ref.Annotations[kubelet.ConfigSourceAnnotationKey] = source pods[name] = ref adds.Pods = append(adds.Pods, *ref) } @@ -246,20 +254,21 @@ func (s *podStorage) merge(source string, change interface{}) (adds, updates, de return adds, updates, deletes } -func filterInvalidPods(pods []kubelet.Pod, source string) (filtered []*kubelet.Pod) { +func filterInvalidPods(pods []api.BoundPod, source string) (filtered []*api.BoundPod) { names := util.StringSet{} for i := range pods { var errors []error - if names.Has(pods[i].Name) { - errors = append(errors, apierrs.NewFieldDuplicate("name", pods[i].Name)) + name := podUniqueName(&pods[i]) + if names.Has(name) { + errors = append(errors, apierrs.NewFieldDuplicate("name", pods[i].ID)) } else { - names.Insert(pods[i].Name) + names.Insert(name) } - if errs := kubelet.ValidatePod(&pods[i]); len(errs) != 0 { + if errs := validation.ValidateBoundPod(&pods[i]); len(errs) != 0 { errors = append(errors, errs...) } if len(errors) > 0 { - glog.Warningf("Pod %d (%s) from %s failed validation, ignoring: %v", i+1, pods[i].Name, source, errors) + glog.Warningf("Pod %d (%s) from %s failed validation, ignoring: %v", i+1, pods[i].ID, source, errors) continue } filtered = append(filtered, &pods[i]) @@ -271,20 +280,32 @@ func filterInvalidPods(pods []kubelet.Pod, source string) (filtered []*kubelet.P func (s *podStorage) Sync() { s.updateLock.Lock() defer s.updateLock.Unlock() - s.updates <- kubelet.PodUpdate{s.MergedState().([]kubelet.Pod), kubelet.SET} + s.updates <- kubelet.PodUpdate{s.MergedState().([]api.BoundPod), kubelet.SET} } // Object implements config.Accessor func (s *podStorage) MergedState() interface{} { s.podLock.RLock() defer s.podLock.RUnlock() - pods := make([]kubelet.Pod, 0) - for source, sourcePods := range s.pods { + var pods []api.BoundPod + for _, sourcePods := range s.pods { for _, podRef := range sourcePods { - pod := *podRef - pod.Namespace = source - pods = append(pods, pod) + pod, err := api.Scheme.Copy(podRef) + if err != nil { + glog.Errorf("unable to copy pod: %v", err) + } + pods = append(pods, *pod.(*api.BoundPod)) } } return pods } + +// podUniqueName returns a value for a given pod that is unique across a source, +// which is the combination of namespace and ID. +func podUniqueName(pod *api.BoundPod) string { + namespace := pod.Namespace + if len(namespace) == 0 { + namespace = api.NamespaceDefault + } + return fmt.Sprintf("%s.%s", pod.ID, namespace) +} diff --git a/pkg/kubelet/config/config_test.go b/pkg/kubelet/config/config_test.go index 05d85386255..4671c65aa80 100644 --- a/pkg/kubelet/config/config_test.go +++ b/pkg/kubelet/config/config_test.go @@ -18,6 +18,7 @@ package config import ( "reflect" + "sort" "testing" "github.com/GoogleCloudPlatform/kubernetes/pkg/api" @@ -32,7 +33,7 @@ func expectEmptyChannel(t *testing.T, ch <-chan interface{}) { } } -type sortedPods []kubelet.Pod +type sortedPods []api.BoundPod func (s sortedPods) Len() int { return len(s) @@ -41,25 +42,27 @@ func (s sortedPods) Swap(i, j int) { s[i], s[j] = s[j], s[i] } func (s sortedPods) Less(i, j int) bool { - if s[i].Namespace < s[j].Namespace { - return true - } - return s[i].Name < s[j].Name + return s[i].ID < s[j].ID } -func CreateValidPod(name, namespace string) kubelet.Pod { - return kubelet.Pod{ - Name: name, - Namespace: namespace, - Manifest: api.ContainerManifest{ - Version: "v1beta1", +func CreateValidPod(name, namespace, source string) api.BoundPod { + return api.BoundPod{ + TypeMeta: api.TypeMeta{ + ID: name, + Namespace: namespace, + Annotations: map[string]string{kubelet.ConfigSourceAnnotationKey: source}, + }, + Spec: api.PodSpec{ RestartPolicy: api.RestartPolicy{Always: &api.RestartPolicyAlways{}}, }, } } -func CreatePodUpdate(op kubelet.PodOperation, pods ...kubelet.Pod) kubelet.PodUpdate { - newPods := make([]kubelet.Pod, len(pods)) +func CreatePodUpdate(op kubelet.PodOperation, pods ...api.BoundPod) kubelet.PodUpdate { + if len(pods) == 0 { + return kubelet.PodUpdate{Op: op} + } + newPods := make([]api.BoundPod, len(pods)) for i := range pods { newPods[i] = pods[i] } @@ -76,6 +79,7 @@ func createPodConfigTester(mode PodConfigNotificationMode) (chan<- interface{}, func expectPodUpdate(t *testing.T, ch <-chan kubelet.PodUpdate, expected ...kubelet.PodUpdate) { for i := range expected { update := <-ch + sort.Sort(sortedPods(update.Pods)) if !reflect.DeepEqual(expected[i], update) { t.Fatalf("Expected %#v, Got %#v", expected[i], update) } @@ -95,24 +99,63 @@ func TestNewPodAdded(t *testing.T) { channel, ch, config := createPodConfigTester(PodConfigNotificationIncremental) // see an update - podUpdate := CreatePodUpdate(kubelet.ADD, CreateValidPod("foo", "")) + podUpdate := CreatePodUpdate(kubelet.ADD, CreateValidPod("foo", "new", "")) channel <- podUpdate - expectPodUpdate(t, ch, CreatePodUpdate(kubelet.ADD, CreateValidPod("foo", "test"))) + expectPodUpdate(t, ch, CreatePodUpdate(kubelet.ADD, CreateValidPod("foo", "new", "test"))) config.Sync() - expectPodUpdate(t, ch, CreatePodUpdate(kubelet.SET, CreateValidPod("foo", "test"))) + expectPodUpdate(t, ch, CreatePodUpdate(kubelet.SET, CreateValidPod("foo", "new", "test"))) +} + +func TestNewPodAddedInvalidNamespace(t *testing.T) { + channel, ch, config := createPodConfigTester(PodConfigNotificationIncremental) + + // see an update + podUpdate := CreatePodUpdate(kubelet.ADD, CreateValidPod("foo", "", "")) + channel <- podUpdate + config.Sync() + expectPodUpdate(t, ch, CreatePodUpdate(kubelet.SET)) +} + +func TestNewPodAddedDefaultNamespace(t *testing.T) { + channel, ch, config := createPodConfigTester(PodConfigNotificationIncremental) + + // see an update + podUpdate := CreatePodUpdate(kubelet.ADD, CreateValidPod("foo", "default", "")) + channel <- podUpdate + expectPodUpdate(t, ch, CreatePodUpdate(kubelet.ADD, CreateValidPod("foo", "default", "test"))) + + config.Sync() + expectPodUpdate(t, ch, CreatePodUpdate(kubelet.SET, CreateValidPod("foo", "default", "test"))) +} + +func TestNewPodAddedDifferentNamespaces(t *testing.T) { + channel, ch, config := createPodConfigTester(PodConfigNotificationIncremental) + + // see an update + podUpdate := CreatePodUpdate(kubelet.ADD, CreateValidPod("foo", "default", "")) + channel <- podUpdate + expectPodUpdate(t, ch, CreatePodUpdate(kubelet.ADD, CreateValidPod("foo", "default", "test"))) + + // see an update in another namespace + podUpdate = CreatePodUpdate(kubelet.ADD, CreateValidPod("foo", "new", "")) + channel <- podUpdate + expectPodUpdate(t, ch, CreatePodUpdate(kubelet.ADD, CreateValidPod("foo", "new", "test"))) + + config.Sync() + expectPodUpdate(t, ch, CreatePodUpdate(kubelet.SET, CreateValidPod("foo", "default", "test"), CreateValidPod("foo", "new", "test"))) } func TestInvalidPodFiltered(t *testing.T) { channel, ch, _ := createPodConfigTester(PodConfigNotificationIncremental) // see an update - podUpdate := CreatePodUpdate(kubelet.ADD, CreateValidPod("foo", "")) + podUpdate := CreatePodUpdate(kubelet.ADD, CreateValidPod("foo", "new", "")) channel <- podUpdate - expectPodUpdate(t, ch, CreatePodUpdate(kubelet.ADD, CreateValidPod("foo", "test"))) + expectPodUpdate(t, ch, CreatePodUpdate(kubelet.ADD, CreateValidPod("foo", "new", "test"))) // add an invalid update - podUpdate = CreatePodUpdate(kubelet.UPDATE, kubelet.Pod{Name: "foo"}) + podUpdate = CreatePodUpdate(kubelet.UPDATE, api.BoundPod{TypeMeta: api.TypeMeta{ID: "foo"}}) channel <- podUpdate expectNoPodUpdate(t, ch) } @@ -121,16 +164,16 @@ func TestNewPodAddedSnapshotAndUpdates(t *testing.T) { channel, ch, config := createPodConfigTester(PodConfigNotificationSnapshotAndUpdates) // see an set - podUpdate := CreatePodUpdate(kubelet.ADD, CreateValidPod("foo", "")) + podUpdate := CreatePodUpdate(kubelet.ADD, CreateValidPod("foo", "new", "")) channel <- podUpdate - expectPodUpdate(t, ch, CreatePodUpdate(kubelet.SET, CreateValidPod("foo", "test"))) + expectPodUpdate(t, ch, CreatePodUpdate(kubelet.SET, CreateValidPod("foo", "new", "test"))) config.Sync() - expectPodUpdate(t, ch, CreatePodUpdate(kubelet.SET, CreateValidPod("foo", "test"))) + expectPodUpdate(t, ch, CreatePodUpdate(kubelet.SET, CreateValidPod("foo", "new", "test"))) // container updates are separated as UPDATE pod := podUpdate.Pods[0] - pod.Manifest.Containers = []api.Container{{Name: "bar", Image: "test"}} + pod.Spec.Containers = []api.Container{{Name: "bar", Image: "test"}} channel <- CreatePodUpdate(kubelet.ADD, pod) expectPodUpdate(t, ch, CreatePodUpdate(kubelet.UPDATE, pod)) } @@ -139,16 +182,16 @@ func TestNewPodAddedSnapshot(t *testing.T) { channel, ch, config := createPodConfigTester(PodConfigNotificationSnapshot) // see an set - podUpdate := CreatePodUpdate(kubelet.ADD, CreateValidPod("foo", "")) + podUpdate := CreatePodUpdate(kubelet.ADD, CreateValidPod("foo", "new", "")) channel <- podUpdate - expectPodUpdate(t, ch, CreatePodUpdate(kubelet.SET, CreateValidPod("foo", "test"))) + expectPodUpdate(t, ch, CreatePodUpdate(kubelet.SET, CreateValidPod("foo", "new", "test"))) config.Sync() - expectPodUpdate(t, ch, CreatePodUpdate(kubelet.SET, CreateValidPod("foo", "test"))) + expectPodUpdate(t, ch, CreatePodUpdate(kubelet.SET, CreateValidPod("foo", "new", "test"))) // container updates are separated as UPDATE pod := podUpdate.Pods[0] - pod.Manifest.Containers = []api.Container{{Name: "bar", Image: "test"}} + pod.Spec.Containers = []api.Container{{Name: "bar", Image: "test"}} channel <- CreatePodUpdate(kubelet.ADD, pod) expectPodUpdate(t, ch, CreatePodUpdate(kubelet.SET, pod)) } @@ -157,21 +200,21 @@ func TestNewPodAddedUpdatedRemoved(t *testing.T) { channel, ch, _ := createPodConfigTester(PodConfigNotificationIncremental) // should register an add - podUpdate := CreatePodUpdate(kubelet.ADD, CreateValidPod("foo", "")) + podUpdate := CreatePodUpdate(kubelet.ADD, CreateValidPod("foo", "new", "")) channel <- podUpdate - expectPodUpdate(t, ch, CreatePodUpdate(kubelet.ADD, CreateValidPod("foo", "test"))) + expectPodUpdate(t, ch, CreatePodUpdate(kubelet.ADD, CreateValidPod("foo", "new", "test"))) // should ignore ADDs that are identical expectNoPodUpdate(t, ch) // an kubelet.ADD should be converted to kubelet.UPDATE - pod := CreateValidPod("foo", "test") - pod.Manifest.Containers = []api.Container{{Name: "bar", Image: "test"}} + pod := CreateValidPod("foo", "new", "test") + pod.Spec.Containers = []api.Container{{Name: "bar", Image: "test"}} podUpdate = CreatePodUpdate(kubelet.ADD, pod) channel <- podUpdate expectPodUpdate(t, ch, CreatePodUpdate(kubelet.UPDATE, pod)) - podUpdate = CreatePodUpdate(kubelet.REMOVE, kubelet.Pod{Name: "foo"}) + podUpdate = CreatePodUpdate(kubelet.REMOVE, api.BoundPod{TypeMeta: api.TypeMeta{ID: "foo", Namespace: "new"}}) channel <- podUpdate expectPodUpdate(t, ch, CreatePodUpdate(kubelet.REMOVE, pod)) } @@ -180,20 +223,20 @@ func TestNewPodAddedUpdatedSet(t *testing.T) { channel, ch, _ := createPodConfigTester(PodConfigNotificationIncremental) // should register an add - podUpdate := CreatePodUpdate(kubelet.ADD, CreateValidPod("foo", ""), CreateValidPod("foo2", ""), CreateValidPod("foo3", "")) + podUpdate := CreatePodUpdate(kubelet.ADD, CreateValidPod("foo", "new", ""), CreateValidPod("foo2", "new", ""), CreateValidPod("foo3", "new", "")) channel <- podUpdate - expectPodUpdate(t, ch, CreatePodUpdate(kubelet.ADD, CreateValidPod("foo", "test"), CreateValidPod("foo2", "test"), CreateValidPod("foo3", "test"))) + expectPodUpdate(t, ch, CreatePodUpdate(kubelet.ADD, CreateValidPod("foo", "new", "test"), CreateValidPod("foo2", "new", "test"), CreateValidPod("foo3", "new", "test"))) // should ignore ADDs that are identical expectNoPodUpdate(t, ch) // should be converted to an kubelet.ADD, kubelet.REMOVE, and kubelet.UPDATE - pod := CreateValidPod("foo2", "test") - pod.Manifest.Containers = []api.Container{{Name: "bar", Image: "test"}} - podUpdate = CreatePodUpdate(kubelet.SET, pod, CreateValidPod("foo3", ""), CreateValidPod("foo4", "test")) + pod := CreateValidPod("foo2", "new", "test") + pod.Spec.Containers = []api.Container{{Name: "bar", Image: "test"}} + podUpdate = CreatePodUpdate(kubelet.SET, pod, CreateValidPod("foo3", "new", ""), CreateValidPod("foo4", "new", "test")) channel <- podUpdate expectPodUpdate(t, ch, - CreatePodUpdate(kubelet.REMOVE, CreateValidPod("foo", "test")), - CreatePodUpdate(kubelet.ADD, CreateValidPod("foo4", "test")), + CreatePodUpdate(kubelet.REMOVE, CreateValidPod("foo", "new", "test")), + CreatePodUpdate(kubelet.ADD, CreateValidPod("foo4", "new", "test")), CreatePodUpdate(kubelet.UPDATE, pod)) } diff --git a/pkg/kubelet/config/etcd.go b/pkg/kubelet/config/etcd.go index a8a822c3b83..aae9d07d2d6 100644 --- a/pkg/kubelet/config/etcd.go +++ b/pkg/kubelet/config/etcd.go @@ -21,6 +21,7 @@ import ( "errors" "fmt" "path" + "strconv" "time" "github.com/GoogleCloudPlatform/kubernetes/pkg/api" @@ -33,7 +34,7 @@ import ( ) func EtcdKeyForHost(hostname string) string { - return path.Join("/", "registry", "hosts", hostname, "kubelet") + return path.Join("/", "registry", "nodes", hostname, "boundpods") } type SourceEtcd struct { @@ -86,26 +87,27 @@ func (s *SourceEtcd) run() { // eventToPods takes a watch.Event object, and turns it into a structured list of pods. // It returns a list of containers, or an error if one occurs. -func eventToPods(ev watch.Event) ([]kubelet.Pod, error) { - pods := []kubelet.Pod{} - manifests, ok := ev.Object.(*api.ContainerManifestList) +func eventToPods(ev watch.Event) ([]api.BoundPod, error) { + pods := []api.BoundPod{} + boundPods, ok := ev.Object.(*api.BoundPods) if !ok { - return pods, errors.New("unable to parse response as ContainerManifestList") + return pods, errors.New("unable to parse response as BoundPods") } - for i, manifest := range manifests.Items { - name := manifest.ID - if name == "" { - name = fmt.Sprintf("%d", i+1) + for i, pod := range boundPods.Items { + if len(pod.ID) == 0 { + pod.ID = fmt.Sprintf("%d", i+1) } - pods = append(pods, kubelet.Pod{ - Name: name, - Manifest: manifest}) + // TODO: generate random UID if not present + if pod.UID == "" && !pod.CreationTimestamp.IsZero() { + pod.UID = strconv.FormatInt(pod.CreationTimestamp.Unix(), 10) + } + // Backwards compatibility with old api servers + if len(pod.Namespace) == 0 { + pod.Namespace = api.NamespaceDefault + } + pods = append(pods, pod) } return pods, nil } - -func makeContainerKey(machine string) string { - return "/registry/hosts/" + machine + "/kubelet" -} diff --git a/pkg/kubelet/config/etcd_test.go b/pkg/kubelet/config/etcd_test.go index 917acd40e53..6e3cb10126e 100644 --- a/pkg/kubelet/config/etcd_test.go +++ b/pkg/kubelet/config/etcd_test.go @@ -21,53 +21,52 @@ import ( "testing" "github.com/GoogleCloudPlatform/kubernetes/pkg/api" - "github.com/GoogleCloudPlatform/kubernetes/pkg/kubelet" "github.com/GoogleCloudPlatform/kubernetes/pkg/watch" ) func TestEventToPods(t *testing.T) { tests := []struct { input watch.Event - pods []kubelet.Pod + pods []api.BoundPod fail bool }{ { input: watch.Event{Object: nil}, - pods: []kubelet.Pod{}, + pods: []api.BoundPod{}, fail: true, }, { - input: watch.Event{Object: &api.ContainerManifestList{}}, - pods: []kubelet.Pod{}, + input: watch.Event{Object: &api.BoundPods{}}, + pods: []api.BoundPod{}, fail: false, }, { input: watch.Event{ - Object: &api.ContainerManifestList{ - Items: []api.ContainerManifest{ - {ID: "foo"}, - {ID: "bar"}, + Object: &api.BoundPods{ + Items: []api.BoundPod{ + {TypeMeta: api.TypeMeta{ID: "foo"}}, + {TypeMeta: api.TypeMeta{ID: "bar"}}, }, }, }, - pods: []kubelet.Pod{ - {Name: "foo", Manifest: api.ContainerManifest{ID: "foo"}}, - {Name: "bar", Manifest: api.ContainerManifest{ID: "bar"}}, + pods: []api.BoundPod{ + {TypeMeta: api.TypeMeta{ID: "foo", Namespace: "default"}, Spec: api.PodSpec{}}, + {TypeMeta: api.TypeMeta{ID: "bar", Namespace: "default"}, Spec: api.PodSpec{}}, }, fail: false, }, { input: watch.Event{ - Object: &api.ContainerManifestList{ - Items: []api.ContainerManifest{ - {ID: ""}, - {ID: ""}, + Object: &api.BoundPods{ + Items: []api.BoundPod{ + {TypeMeta: api.TypeMeta{ID: "1"}}, + {TypeMeta: api.TypeMeta{ID: "2", Namespace: "foo"}}, }, }, }, - pods: []kubelet.Pod{ - {Name: "1", Manifest: api.ContainerManifest{ID: ""}}, - {Name: "2", Manifest: api.ContainerManifest{ID: ""}}, + pods: []api.BoundPod{ + {TypeMeta: api.TypeMeta{ID: "1", Namespace: "default"}, Spec: api.PodSpec{}}, + {TypeMeta: api.TypeMeta{ID: "2", Namespace: "foo"}, Spec: api.PodSpec{}}, }, fail: false, }, diff --git a/pkg/kubelet/config/file.go b/pkg/kubelet/config/file.go index 111f7cf83f0..107f8d3dced 100644 --- a/pkg/kubelet/config/file.go +++ b/pkg/kubelet/config/file.go @@ -29,8 +29,10 @@ import ( "strings" "time" + "github.com/GoogleCloudPlatform/kubernetes/pkg/api" "github.com/GoogleCloudPlatform/kubernetes/pkg/kubelet" "github.com/GoogleCloudPlatform/kubernetes/pkg/util" + "github.com/golang/glog" "gopkg.in/v1/yaml" ) @@ -79,7 +81,7 @@ func (s *SourceFile) extractFromPath() error { if err != nil { return err } - s.updates <- kubelet.PodUpdate{[]kubelet.Pod{pod}, kubelet.SET} + s.updates <- kubelet.PodUpdate{[]api.BoundPod{pod}, kubelet.SET} default: return fmt.Errorf("path is not a directory or file") @@ -88,28 +90,29 @@ func (s *SourceFile) extractFromPath() error { return nil } -func extractFromDir(name string) ([]kubelet.Pod, error) { - pods := []kubelet.Pod{} - +func extractFromDir(name string) ([]api.BoundPod, error) { files, err := filepath.Glob(filepath.Join(name, "[^.]*")) if err != nil { - return pods, err + return nil, err + } + if len(files) == 0 { + return nil, nil } sort.Strings(files) - + pods := []api.BoundPod{} for _, file := range files { pod, err := extractFromFile(file) if err != nil { - return []kubelet.Pod{}, err + return nil, err } pods = append(pods, pod) } return pods, nil } -func extractFromFile(name string) (kubelet.Pod, error) { - var pod kubelet.Pod +func extractFromFile(name string) (api.BoundPod, error) { + var pod api.BoundPod file, err := os.Open(name) if err != nil { @@ -123,15 +126,23 @@ func extractFromFile(name string) (kubelet.Pod, error) { return pod, err } - if err := yaml.Unmarshal(data, &pod.Manifest); err != nil { - return pod, fmt.Errorf("could not unmarshal manifest: %v", err) + manifest := &api.ContainerManifest{} + // TODO: use api.Scheme.DecodeInto + if err := yaml.Unmarshal(data, manifest); err != nil { + return pod, err } - podName := pod.Manifest.ID - if podName == "" { - podName = simpleSubdomainSafeHash(name) + if err := api.Scheme.Convert(manifest, &pod); err != nil { + return pod, err + } + + pod.ID = simpleSubdomainSafeHash(name) + if len(pod.UID) == 0 { + pod.UID = simpleSubdomainSafeHash(name) + } + if len(pod.Namespace) == 0 { + pod.Namespace = api.NamespaceDefault } - pod.Name = podName return pod, nil } diff --git a/pkg/kubelet/config/file_test.go b/pkg/kubelet/config/file_test.go index 4fed13fd298..0e280f49e7e 100644 --- a/pkg/kubelet/config/file_test.go +++ b/pkg/kubelet/config/file_test.go @@ -26,10 +26,57 @@ import ( "time" "github.com/GoogleCloudPlatform/kubernetes/pkg/api" + "github.com/GoogleCloudPlatform/kubernetes/pkg/api/validation" "github.com/GoogleCloudPlatform/kubernetes/pkg/kubelet" + "gopkg.in/v1/yaml" ) +func ExampleManifestAndPod(id string) (api.ContainerManifest, api.BoundPod) { + manifest := api.ContainerManifest{ + ID: id, + UUID: "uid", + Containers: []api.Container{ + { + Name: "c" + id, + Image: "foo", + }, + }, + Volumes: []api.Volume{ + { + Name: "host-dir", + Source: &api.VolumeSource{ + HostDir: &api.HostDir{"/dir/path"}, + }, + }, + }, + } + expectedPod := api.BoundPod{ + TypeMeta: api.TypeMeta{ + ID: id, + UID: "uid", + Namespace: "default", + }, + Spec: api.PodSpec{ + Containers: []api.Container{ + { + Name: "c" + id, + Image: "foo", + }, + }, + Volumes: []api.Volume{ + { + Name: "host-dir", + Source: &api.VolumeSource{ + HostDir: &api.HostDir{"/dir/path"}, + }, + }, + }, + }, + } + return manifest, expectedPod +} + func TestExtractFromNonExistentFile(t *testing.T) { ch := make(chan interface{}, 1) c := SourceFile{"/some/fake/file", ch} @@ -70,16 +117,18 @@ func TestReadFromFile(t *testing.T) { select { case got := <-ch: update := got.(kubelet.PodUpdate) - expected := CreatePodUpdate(kubelet.SET, kubelet.Pod{ - Name: "test", - Manifest: api.ContainerManifest{ - ID: "test", - Version: "v1beta1", + expected := CreatePodUpdate(kubelet.SET, api.BoundPod{ + TypeMeta: api.TypeMeta{ + ID: simpleSubdomainSafeHash(file.Name()), + UID: simpleSubdomainSafeHash(file.Name()), + Namespace: "default", + }, + Spec: api.PodSpec{ Containers: []api.Container{{Image: "test/image"}}, }, }) if !reflect.DeepEqual(expected, update) { - t.Errorf("Expected %#v, Got %#v", expected, update) + t.Fatalf("Expected %#v, Got %#v", expected, update) } case <-time.After(2 * time.Millisecond): @@ -95,29 +144,31 @@ func TestExtractFromBadDataFile(t *testing.T) { c := SourceFile{file.Name(), ch} err := c.extractFromPath() if err == nil { - t.Errorf("Expected error") + t.Fatalf("Expected error") } expectEmptyChannel(t, ch) } func TestExtractFromValidDataFile(t *testing.T) { - manifest := api.ContainerManifest{ID: ""} + manifest, expectedPod := ExampleManifestAndPod("id") text, err := json.Marshal(manifest) if err != nil { - t.Errorf("Unexpected error: %v", err) + t.Fatalf("Unexpected error: %v", err) } file := writeTestFile(t, os.TempDir(), "test_pod_config", string(text)) defer os.Remove(file.Name()) + expectedPod.ID = simpleSubdomainSafeHash(file.Name()) + ch := make(chan interface{}, 1) c := SourceFile{file.Name(), ch} err = c.extractFromPath() if err != nil { - t.Errorf("Unexpected error: %v", err) + t.Fatalf("Unexpected error: %v", err) } update := (<-ch).(kubelet.PodUpdate) - expected := CreatePodUpdate(kubelet.SET, kubelet.Pod{Name: simpleSubdomainSafeHash(file.Name()), Manifest: manifest}) + expected := CreatePodUpdate(kubelet.SET, expectedPod) if !reflect.DeepEqual(expected, update) { t.Errorf("Expected %#v, Got %#v", expected, update) } @@ -126,7 +177,7 @@ func TestExtractFromValidDataFile(t *testing.T) { func TestExtractFromEmptyDir(t *testing.T) { dirName, err := ioutil.TempDir("", "foo") if err != nil { - t.Errorf("Unexpected error: %v", err) + t.Fatalf("Unexpected error: %v", err) } defer os.RemoveAll(dirName) @@ -134,7 +185,7 @@ func TestExtractFromEmptyDir(t *testing.T) { c := SourceFile{dirName, ch} err = c.extractFromPath() if err != nil { - t.Errorf("Unexpected error: %v", err) + t.Fatalf("Unexpected error: %v", err) } update := (<-ch).(kubelet.PodUpdate) @@ -145,54 +196,55 @@ func TestExtractFromEmptyDir(t *testing.T) { } func TestExtractFromDir(t *testing.T) { - manifests := []api.ContainerManifest{ - {Version: "v1beta1", Containers: []api.Container{{Name: "1", Image: "foo"}}}, - {Version: "v1beta1", Containers: []api.Container{{Name: "2", Image: "bar"}}}, - } + manifest, expectedPod := ExampleManifestAndPod("1") + manifest2, expectedPod2 := ExampleManifestAndPod("2") + + manifests := []api.ContainerManifest{manifest, manifest2} + pods := []api.BoundPod{expectedPod, expectedPod2} files := make([]*os.File, len(manifests)) dirName, err := ioutil.TempDir("", "foo") if err != nil { - t.Errorf("Unexpected error: %v", err) + t.Fatalf("Unexpected error: %v", err) } for i, manifest := range manifests { data, err := json.Marshal(manifest) if err != nil { t.Errorf("Unexpected error: %v", err) + continue } file, err := ioutil.TempFile(dirName, manifest.ID) if err != nil { t.Errorf("Unexpected error: %v", err) + continue } name := file.Name() if err := file.Close(); err != nil { t.Errorf("Unexpected error: %v", err) + continue } ioutil.WriteFile(name, data, 0755) files[i] = file + pods[i].ID = simpleSubdomainSafeHash(name) } ch := make(chan interface{}, 1) c := SourceFile{dirName, ch} err = c.extractFromPath() if err != nil { - t.Errorf("Unexpected error: %v", err) + t.Fatalf("Unexpected error: %v", err) } update := (<-ch).(kubelet.PodUpdate) - expected := CreatePodUpdate( - kubelet.SET, - kubelet.Pod{Name: simpleSubdomainSafeHash(files[0].Name()), Manifest: manifests[0]}, - kubelet.Pod{Name: simpleSubdomainSafeHash(files[1].Name()), Manifest: manifests[1]}, - ) + expected := CreatePodUpdate(kubelet.SET, pods...) sort.Sort(sortedPods(update.Pods)) sort.Sort(sortedPods(expected.Pods)) if !reflect.DeepEqual(expected, update) { - t.Errorf("Expected %#v, Got %#v", expected, update) + t.Fatalf("Expected %#v, Got %#v", expected, update) } for i := range update.Pods { - if errs := kubelet.ValidatePod(&update.Pods[i]); len(errs) != 0 { + if errs := validation.ValidateBoundPod(&update.Pods[i]); len(errs) != 0 { t.Errorf("Expected no validation errors on %#v, Got %#v", update.Pods[i], errs) } } diff --git a/pkg/kubelet/config/http.go b/pkg/kubelet/config/http.go index 6aca6333b2f..455201b13b3 100644 --- a/pkg/kubelet/config/http.go +++ b/pkg/kubelet/config/http.go @@ -28,6 +28,7 @@ import ( "github.com/GoogleCloudPlatform/kubernetes/pkg/api/validation" "github.com/GoogleCloudPlatform/kubernetes/pkg/kubelet" "github.com/GoogleCloudPlatform/kubernetes/pkg/util" + "github.com/golang/glog" "gopkg.in/v1/yaml" ) @@ -79,6 +80,7 @@ func (s *SourceURL) extractFromURL() error { // First try as if it's a single manifest var manifest api.ContainerManifest + // TODO: should be api.Scheme.Decode singleErr := yaml.Unmarshal(data, &manifest) if singleErr == nil { if errs := validation.ValidateManifest(&manifest); len(errs) > 0 { @@ -86,16 +88,23 @@ func (s *SourceURL) extractFromURL() error { } } if singleErr == nil { - pod := kubelet.Pod{Name: manifest.ID, Manifest: manifest} - if pod.Name == "" { - pod.Name = "1" + pod := api.BoundPod{} + if err := api.Scheme.Convert(&manifest, &pod); err != nil { + return err } - s.updates <- kubelet.PodUpdate{[]kubelet.Pod{pod}, kubelet.SET} + if len(pod.ID) == 0 { + pod.ID = "1" + } + if len(pod.Namespace) == 0 { + pod.Namespace = api.NamespaceDefault + } + s.updates <- kubelet.PodUpdate{[]api.BoundPod{pod}, kubelet.SET} return nil } // That didn't work, so try an array of manifests. var manifests []api.ContainerManifest + // TODO: should be api.Scheme.Decode multiErr := yaml.Unmarshal(data, &manifests) // We're not sure if the person reading the logs is going to care about the single or // multiple manifest unmarshalling attempt, so we need to put both in the logs, as is @@ -113,18 +122,24 @@ func (s *SourceURL) extractFromURL() error { // array of manifests (and no error) when unmarshaled as such. In that case, // if the single manifest at least had a Version, we return the single-manifest // error (if any). - if len(manifests) == 0 && manifest.Version != "" { + if len(manifests) == 0 && len(manifest.Version) != 0 { return singleErr } - pods := []kubelet.Pod{} - for i, manifest := range manifests { - pod := kubelet.Pod{Name: manifest.ID, Manifest: manifest} - if pod.Name == "" { - pod.Name = fmt.Sprintf("%d", i+1) - } - pods = append(pods, pod) + list := api.ContainerManifestList{Items: manifests} + boundPods := &api.BoundPods{} + if err := api.Scheme.Convert(&list, boundPods); err != nil { + return err } - s.updates <- kubelet.PodUpdate{pods, kubelet.SET} + for i := range boundPods.Items { + pod := &boundPods.Items[i] + if len(pod.ID) == 0 { + pod.ID = fmt.Sprintf("%d", i+1) + } + if len(pod.Namespace) == 0 { + pod.Namespace = api.NamespaceDefault + } + } + s.updates <- kubelet.PodUpdate{boundPods.Items, kubelet.SET} return nil } diff --git a/pkg/kubelet/config/http_test.go b/pkg/kubelet/config/http_test.go index e561d0d7d57..13a08aeaf41 100644 --- a/pkg/kubelet/config/http_test.go +++ b/pkg/kubelet/config/http_test.go @@ -24,6 +24,7 @@ import ( "time" "github.com/GoogleCloudPlatform/kubernetes/pkg/api" + "github.com/GoogleCloudPlatform/kubernetes/pkg/api/validation" "github.com/GoogleCloudPlatform/kubernetes/pkg/kubelet" "github.com/GoogleCloudPlatform/kubernetes/pkg/util" ) @@ -122,11 +123,12 @@ func TestExtractFromHTTP(t *testing.T) { desc: "Single manifest", manifests: api.ContainerManifest{Version: "v1beta1", ID: "foo"}, expected: CreatePodUpdate(kubelet.SET, - kubelet.Pod{ - Name: "foo", - Manifest: api.ContainerManifest{ - Version: "v1beta1", - ID: "foo", + api.BoundPod{ + TypeMeta: api.TypeMeta{ + ID: "foo", + Namespace: "default", + }, + Spec: api.PodSpec{ RestartPolicy: api.RestartPolicy{Always: &api.RestartPolicyAlways{}}, }, }), @@ -138,13 +140,19 @@ func TestExtractFromHTTP(t *testing.T) { {Version: "v1beta1", ID: "bar", Containers: []api.Container{{Name: "1", Image: "foo"}}}, }, expected: CreatePodUpdate(kubelet.SET, - kubelet.Pod{ - Name: "1", - Manifest: api.ContainerManifest{Version: "v1beta1", ID: "", Containers: []api.Container{{Name: "1", Image: "foo"}}}, + api.BoundPod{ + TypeMeta: api.TypeMeta{ + ID: "1", + Namespace: "default", + }, + Spec: api.PodSpec{Containers: []api.Container{{Name: "1", Image: "foo"}}}, }, - kubelet.Pod{ - Name: "bar", - Manifest: api.ContainerManifest{Version: "v1beta1", ID: "bar", Containers: []api.Container{{Name: "1", Image: "foo"}}}, + api.BoundPod{ + TypeMeta: api.TypeMeta{ + ID: "bar", + Namespace: "default", + }, + Spec: api.PodSpec{Containers: []api.Container{{Name: "1", Image: "foo"}}}, }), }, { @@ -167,13 +175,14 @@ func TestExtractFromHTTP(t *testing.T) { c := SourceURL{testServer.URL, ch, nil} if err := c.extractFromURL(); err != nil { t.Errorf("%s: Unexpected error: %v", testCase.desc, err) + continue } update := (<-ch).(kubelet.PodUpdate) if !reflect.DeepEqual(testCase.expected, update) { t.Errorf("%s: Expected: %#v, Got: %#v", testCase.desc, testCase.expected, update) } for i := range update.Pods { - if errs := kubelet.ValidatePod(&update.Pods[i]); len(errs) != 0 { + if errs := validation.ValidateBoundPod(&update.Pods[i]); len(errs) != 0 { t.Errorf("%s: Expected no validation errors on %#v, Got %#v", testCase.desc, update.Pods[i], errs) } } diff --git a/pkg/kubelet/dockertools/docker.go b/pkg/kubelet/dockertools/docker.go index 6dc70d300c6..ecb2229b6df 100644 --- a/pkg/kubelet/dockertools/docker.go +++ b/pkg/kubelet/dockertools/docker.go @@ -212,7 +212,7 @@ func GetKubeletDockerContainers(client DockerInterface, allContainers bool) (Doc // TODO(dchen1107): Remove the old separator "--" by end of Oct if !strings.HasPrefix(container.Names[0], "/"+containerNamePrefix+"_") && !strings.HasPrefix(container.Names[0], "/"+containerNamePrefix+"--") { - glog.Infof("Docker Container:%s is not managed by kubelet.", container.Names[0]) + glog.Infof("Docker Container: %s is not managed by kubelet.", container.Names[0]) continue } result[DockerID(container.ID)] = container @@ -330,7 +330,7 @@ func inspectContainer(client DockerInterface, dockerID, containerName string) (* } // GetDockerPodInfo returns docker info for all containers in the pod/manifest. -func GetDockerPodInfo(client DockerInterface, manifest api.ContainerManifest, podFullName, uuid string) (api.PodInfo, error) { +func GetDockerPodInfo(client DockerInterface, manifest api.PodSpec, podFullName, uuid string) (api.PodInfo, error) { info := api.PodInfo{} containers, err := client.ListContainers(docker.ListContainersOptions{All: true}) diff --git a/pkg/kubelet/kubelet.go b/pkg/kubelet/kubelet.go index 32e6b02e385..d681e453a7f 100644 --- a/pkg/kubelet/kubelet.go +++ b/pkg/kubelet/kubelet.go @@ -55,7 +55,7 @@ type CadvisorInterface interface { // SyncHandler is an interface implemented by Kubelet, for testability type SyncHandler interface { - SyncPods([]Pod) error + SyncPods([]api.BoundPod) error } type volumeMap map[string]volume.Interface @@ -111,7 +111,7 @@ type Kubelet struct { networkContainerImage string podWorkers *podWorkers resyncInterval time.Duration - pods []Pod + pods []api.BoundPod // Optional, no events will be sent without it etcdClient tools.EtcdClient @@ -213,7 +213,7 @@ func makeEnvironmentVariables(container *api.Container) []string { return result } -func makeBinds(pod *Pod, container *api.Container, podVolumes volumeMap) []string { +func makeBinds(pod *api.BoundPod, container *api.Container, podVolumes volumeMap) []string { binds := []string{} for _, mount := range container.VolumeMounts { vol, ok := podVolumes[mount.Name] @@ -276,10 +276,10 @@ func milliCPUToShares(milliCPU int) int { return shares } -func (kl *Kubelet) mountExternalVolumes(manifest *api.ContainerManifest) (volumeMap, error) { +func (kl *Kubelet) mountExternalVolumes(pod *api.BoundPod) (volumeMap, error) { podVolumes := make(volumeMap) - for _, vol := range manifest.Volumes { - extVolume, err := volume.CreateVolumeBuilder(&vol, manifest.ID, kl.rootDirectory) + for _, vol := range pod.Spec.Volumes { + extVolume, err := volume.CreateVolumeBuilder(&vol, pod.ID, kl.rootDirectory) if err != nil { return nil, err } @@ -323,18 +323,18 @@ func (kl *Kubelet) runHandler(podFullName, uuid string, container *api.Container } // Run a single container from a pod. Returns the docker container ID -func (kl *Kubelet) runContainer(pod *Pod, container *api.Container, podVolumes volumeMap, netMode string) (id dockertools.DockerID, err error) { +func (kl *Kubelet) runContainer(pod *api.BoundPod, container *api.Container, podVolumes volumeMap, netMode string) (id dockertools.DockerID, err error) { envVariables := makeEnvironmentVariables(container) binds := makeBinds(pod, container, podVolumes) exposedPorts, portBindings := makePortsAndBindings(container) opts := docker.CreateContainerOptions{ - Name: dockertools.BuildDockerName(pod.Manifest.UUID, GetPodFullName(pod), container), + Name: dockertools.BuildDockerName(pod.UID, GetPodFullName(pod), container), Config: &docker.Config{ Cmd: container.Command, Env: envVariables, ExposedPorts: exposedPorts, - Hostname: pod.Name, + Hostname: pod.ID, Image: container.Image, Memory: int64(container.Memory), CpuShares: int64(milliCPUToShares(container.CPU)), @@ -358,7 +358,7 @@ func (kl *Kubelet) runContainer(pod *Pod, container *api.Container, podVolumes v Privileged: privileged, }) if err == nil && container.Lifecycle != nil && container.Lifecycle.PostStart != nil { - handlerErr := kl.runHandler(GetPodFullName(pod), pod.Manifest.UUID, container, container.Lifecycle.PostStart) + handlerErr := kl.runHandler(GetPodFullName(pod), pod.UID, container, container.Lifecycle.PostStart) if handlerErr != nil { kl.killContainerByID(dockerContainer.ID, "") return dockertools.DockerID(""), fmt.Errorf("failed to call event handler: %v", handlerErr) @@ -392,11 +392,11 @@ const ( ) // createNetworkContainer starts the network container for a pod. Returns the docker container ID of the newly created container. -func (kl *Kubelet) createNetworkContainer(pod *Pod) (dockertools.DockerID, error) { +func (kl *Kubelet) createNetworkContainer(pod *api.BoundPod) (dockertools.DockerID, error) { var ports []api.Port // Docker only exports ports from the network container. Let's // collect all of the relevant ports and export them. - for _, container := range pod.Manifest.Containers { + for _, container := range pod.Spec.Containers { ports = append(ports, container.Ports...) } container := &api.Container{ @@ -419,12 +419,12 @@ func (kl *Kubelet) createNetworkContainer(pod *Pod) (dockertools.DockerID, error // Delete all containers in a pod (except the network container) returns the number of containers deleted // and an error if one occurs. -func (kl *Kubelet) deleteAllContainers(pod *Pod, podFullName string, dockerContainers dockertools.DockerContainers) (int, error) { +func (kl *Kubelet) deleteAllContainers(pod *api.BoundPod, podFullName string, dockerContainers dockertools.DockerContainers) (int, error) { count := 0 - errs := make(chan error, len(pod.Manifest.Containers)) + errs := make(chan error, len(pod.Spec.Containers)) wg := sync.WaitGroup{} - for _, container := range pod.Manifest.Containers { - if dockerContainer, found, _ := dockerContainers.FindPodContainer(podFullName, pod.Manifest.UUID, container.Name); found { + for _, container := range pod.Spec.Containers { + if dockerContainer, found, _ := dockerContainers.FindPodContainer(podFullName, pod.UID, container.Name); found { count++ wg.Add(1) go func() { @@ -451,9 +451,9 @@ func (kl *Kubelet) deleteAllContainers(pod *Pod, podFullName string, dockerConta type empty struct{} -func (kl *Kubelet) syncPod(pod *Pod, dockerContainers dockertools.DockerContainers) error { +func (kl *Kubelet) syncPod(pod *api.BoundPod, dockerContainers dockertools.DockerContainers) error { podFullName := GetPodFullName(pod) - uuid := pod.Manifest.UUID + uuid := pod.UID containersToKeep := make(map[dockertools.DockerID]empty) killedContainers := make(map[dockertools.DockerID]empty) @@ -484,7 +484,7 @@ func (kl *Kubelet) syncPod(pod *Pod, dockerContainers dockertools.DockerContaine } containersToKeep[netID] = empty{} - podVolumes, err := kl.mountExternalVolumes(&pod.Manifest) + podVolumes, err := kl.mountExternalVolumes(pod) if err != nil { glog.Errorf("Unable to mount volumes for pod %s: (%v) Skipping pod.", podFullName, err) return err @@ -501,7 +501,7 @@ func (kl *Kubelet) syncPod(pod *Pod, dockerContainers dockertools.DockerContaine podState.PodIP = netInfo.PodIP } - for _, container := range pod.Manifest.Containers { + for _, container := range pod.Spec.Containers { expectedHash := dockertools.HashContainer(&container) if dockerContainer, found, hash := dockerContainers.FindPodContainer(podFullName, uuid, container.Name); found { containerID := dockertools.DockerID(dockerContainer.ID) @@ -538,13 +538,13 @@ func (kl *Kubelet) syncPod(pod *Pod, dockerContainers dockertools.DockerContaine // TODO(dawnchen): error handling here? } - if len(recentContainers) > 0 && pod.Manifest.RestartPolicy.Always == nil { - if pod.Manifest.RestartPolicy.Never != nil { + if len(recentContainers) > 0 && pod.Spec.RestartPolicy.Always == nil { + if pod.Spec.RestartPolicy.Never != nil { glog.V(3).Infof("Already ran container with name %s--%s--%s, do nothing", podFullName, uuid, container.Name) continue } - if pod.Manifest.RestartPolicy.OnFailure != nil { + if pod.Spec.RestartPolicy.OnFailure != nil { // Check the exit code of last run if recentContainers[0].State.ExitCode == 0 { glog.V(3).Infof("Already successfully ran container with name %s--%s--%s, do nothing", @@ -605,11 +605,11 @@ type podContainer struct { // Stores all volumes defined by the set of pods into a map. // Keys for each entry are in the format (POD_ID)/(VOLUME_NAME) -func getDesiredVolumes(pods []Pod) map[string]api.Volume { +func getDesiredVolumes(pods []api.BoundPod) map[string]api.Volume { desiredVolumes := make(map[string]api.Volume) for _, pod := range pods { - for _, volume := range pod.Manifest.Volumes { - identifier := path.Join(pod.Manifest.ID, volume.Name) + for _, volume := range pod.Spec.Volumes { + identifier := path.Join(pod.ID, volume.Name) desiredVolumes[identifier] = volume } } @@ -618,7 +618,7 @@ func getDesiredVolumes(pods []Pod) map[string]api.Volume { // Compares the map of current volumes to the map of desired volumes. // If an active volume does not have a respective desired volume, clean it up. -func (kl *Kubelet) reconcileVolumes(pods []Pod) error { +func (kl *Kubelet) reconcileVolumes(pods []api.BoundPod) error { desiredVolumes := getDesiredVolumes(pods) currentVolumes := volume.GetCurrentVolumes(kl.rootDirectory) for name, vol := range currentVolumes { @@ -637,7 +637,7 @@ func (kl *Kubelet) reconcileVolumes(pods []Pod) error { } // SyncPods synchronizes the configured list of pods (desired state) with the host current state. -func (kl *Kubelet) SyncPods(pods []Pod) error { +func (kl *Kubelet) SyncPods(pods []api.BoundPod) error { glog.V(4).Infof("Desired [%s]: %+v", kl.hostname, pods) var err error desiredContainers := make(map[podContainer]empty) @@ -652,11 +652,11 @@ func (kl *Kubelet) SyncPods(pods []Pod) error { for ix := range pods { pod := &pods[ix] podFullName := GetPodFullName(pod) - uuid := pod.Manifest.UUID + uuid := pod.UID // Add all containers (including net) to the map. desiredContainers[podContainer{podFullName, uuid, networkContainerName}] = empty{} - for _, cont := range pod.Manifest.Containers { + for _, cont := range pod.Spec.Containers { desiredContainers[podContainer{podFullName, uuid, cont.Name}] = empty{} } @@ -693,13 +693,13 @@ func (kl *Kubelet) SyncPods(pods []Pod) error { } // filterHostPortConflicts removes pods that conflict on Port.HostPort values -func filterHostPortConflicts(pods []Pod) []Pod { - filtered := []Pod{} +func filterHostPortConflicts(pods []api.BoundPod) []api.BoundPod { + filtered := []api.BoundPod{} ports := map[int]bool{} extract := func(p *api.Port) int { return p.HostPort } for i := range pods { pod := &pods[i] - if errs := validation.AccumulateUniquePorts(pod.Manifest.Containers, ports, extract); len(errs) != 0 { + if errs := validation.AccumulateUniquePorts(pod.Spec.Containers, ports, extract); len(errs) != 0 { glog.Warningf("Pod %s has conflicting ports, ignoring: %v", GetPodFullName(pod), errs) continue } @@ -784,10 +784,10 @@ func (kl *Kubelet) GetKubeletContainerLogs(podFullName, containerName, tail stri // GetPodInfo returns information from Docker about the containers in a pod func (kl *Kubelet) GetPodInfo(podFullName, uuid string) (api.PodInfo, error) { - var manifest api.ContainerManifest + var manifest api.PodSpec for _, pod := range kl.pods { if GetPodFullName(&pod) == podFullName { - manifest = pod.Manifest + manifest = pod.Spec break } } diff --git a/pkg/kubelet/kubelet_test.go b/pkg/kubelet/kubelet_test.go index df323fccc26..b0dff6f9705 100644 --- a/pkg/kubelet/kubelet_test.go +++ b/pkg/kubelet/kubelet_test.go @@ -125,7 +125,7 @@ func TestKillContainer(t *testing.T) { } type channelReader struct { - list [][]Pod + list [][]api.BoundPod wg sync.WaitGroup } @@ -145,7 +145,7 @@ func startReading(channel <-chan interface{}) *channelReader { return cr } -func (cr *channelReader) GetList() [][]Pod { +func (cr *channelReader) GetList() [][]api.BoundPod { cr.wg.Wait() return cr.list } @@ -156,21 +156,23 @@ func TestSyncPodsDoesNothing(t *testing.T) { fakeDocker.ContainerList = []docker.APIContainers{ { // format is k8s__ - Names: []string{"/k8s_bar." + strconv.FormatUint(dockertools.HashContainer(&container), 16) + "_foo.test"}, + Names: []string{"/k8s_bar." + strconv.FormatUint(dockertools.HashContainer(&container), 16) + "_foo.new.test"}, ID: "1234", }, { // network container - Names: []string{"/k8s_net_foo.test_"}, + Names: []string{"/k8s_net_foo.new.test_"}, ID: "9876", }, } - err := kubelet.SyncPods([]Pod{ + err := kubelet.SyncPods([]api.BoundPod{ { - Name: "foo", - Namespace: "test", - Manifest: api.ContainerManifest{ - ID: "foo", + TypeMeta: api.TypeMeta{ + ID: "foo", + Namespace: "new", + Annotations: map[string]string{ConfigSourceAnnotationKey: "test"}, + }, + Spec: api.PodSpec{ Containers: []api.Container{ container, }, @@ -209,12 +211,14 @@ func TestSyncPodsCreatesNetAndContainer(t *testing.T) { kubelet, _, fakeDocker := newTestKubelet(t) kubelet.networkContainerImage = "custom_image_name" fakeDocker.ContainerList = []docker.APIContainers{} - err := kubelet.SyncPods([]Pod{ + err := kubelet.SyncPods([]api.BoundPod{ { - Name: "foo", - Namespace: "test", - Manifest: api.ContainerManifest{ - ID: "foo", + TypeMeta: api.TypeMeta{ + ID: "foo", + Namespace: "new", + Annotations: map[string]string{ConfigSourceAnnotationKey: "test"}, + }, + Spec: api.PodSpec{ Containers: []api.Container{ {Name: "bar"}, }, @@ -242,8 +246,8 @@ func TestSyncPodsCreatesNetAndContainer(t *testing.T) { } if len(fakeDocker.Created) != 2 || - !matchString(t, "k8s_net\\.[a-f0-9]+_foo.test_", fakeDocker.Created[0]) || - !matchString(t, "k8s_bar\\.[a-f0-9]+_foo.test_", fakeDocker.Created[1]) { + !matchString(t, "k8s_net\\.[a-f0-9]+_foo.new.test_", fakeDocker.Created[0]) || + !matchString(t, "k8s_bar\\.[a-f0-9]+_foo.new.test_", fakeDocker.Created[1]) { t.Errorf("Unexpected containers created %v", fakeDocker.Created) } fakeDocker.Unlock() @@ -255,12 +259,14 @@ func TestSyncPodsCreatesNetAndContainerPullsImage(t *testing.T) { puller.HasImages = []string{} kubelet.networkContainerImage = "custom_image_name" fakeDocker.ContainerList = []docker.APIContainers{} - err := kubelet.SyncPods([]Pod{ + err := kubelet.SyncPods([]api.BoundPod{ { - Name: "foo", - Namespace: "test", - Manifest: api.ContainerManifest{ - ID: "foo", + TypeMeta: api.TypeMeta{ + ID: "foo", + Namespace: "new", + Annotations: map[string]string{ConfigSourceAnnotationKey: "test"}, + }, + Spec: api.PodSpec{ Containers: []api.Container{ {Name: "bar"}, }, @@ -282,8 +288,8 @@ func TestSyncPodsCreatesNetAndContainerPullsImage(t *testing.T) { } if len(fakeDocker.Created) != 2 || - !matchString(t, "k8s_net\\.[a-f0-9]+_foo.test_", fakeDocker.Created[0]) || - !matchString(t, "k8s_bar\\.[a-f0-9]+_foo.test_", fakeDocker.Created[1]) { + !matchString(t, "k8s_net\\.[a-f0-9]+_foo.new.test_", fakeDocker.Created[0]) || + !matchString(t, "k8s_bar\\.[a-f0-9]+_foo.new.test_", fakeDocker.Created[1]) { t.Errorf("Unexpected containers created %v", fakeDocker.Created) } fakeDocker.Unlock() @@ -294,16 +300,18 @@ func TestSyncPodsWithNetCreatesContainer(t *testing.T) { fakeDocker.ContainerList = []docker.APIContainers{ { // network container - Names: []string{"/k8s_net_foo.test_"}, + Names: []string{"/k8s_net_foo.new.test_"}, ID: "9876", }, } - err := kubelet.SyncPods([]Pod{ + err := kubelet.SyncPods([]api.BoundPod{ { - Name: "foo", - Namespace: "test", - Manifest: api.ContainerManifest{ - ID: "foo", + TypeMeta: api.TypeMeta{ + ID: "foo", + Namespace: "new", + Annotations: map[string]string{ConfigSourceAnnotationKey: "test"}, + }, + Spec: api.PodSpec{ Containers: []api.Container{ {Name: "bar"}, }, @@ -320,7 +328,7 @@ func TestSyncPodsWithNetCreatesContainer(t *testing.T) { fakeDocker.Lock() if len(fakeDocker.Created) != 1 || - !matchString(t, "k8s_bar\\.[a-f0-9]+_foo.test_", fakeDocker.Created[0]) { + !matchString(t, "k8s_bar\\.[a-f0-9]+_foo.new.test_", fakeDocker.Created[0]) { t.Errorf("Unexpected containers created %v", fakeDocker.Created) } fakeDocker.Unlock() @@ -333,16 +341,18 @@ func TestSyncPodsWithNetCreatesContainerCallsHandler(t *testing.T) { fakeDocker.ContainerList = []docker.APIContainers{ { // network container - Names: []string{"/k8s_net_foo.test_"}, + Names: []string{"/k8s_net_foo.new.test_"}, ID: "9876", }, } - err := kubelet.SyncPods([]Pod{ + err := kubelet.SyncPods([]api.BoundPod{ { - Name: "foo", - Namespace: "test", - Manifest: api.ContainerManifest{ - ID: "foo", + TypeMeta: api.TypeMeta{ + ID: "foo", + Namespace: "new", + Annotations: map[string]string{ConfigSourceAnnotationKey: "test"}, + }, + Spec: api.PodSpec{ Containers: []api.Container{ { Name: "bar", @@ -370,7 +380,7 @@ func TestSyncPodsWithNetCreatesContainerCallsHandler(t *testing.T) { fakeDocker.Lock() if len(fakeDocker.Created) != 1 || - !matchString(t, "k8s_bar\\.[a-f0-9]+_foo.test_", fakeDocker.Created[0]) { + !matchString(t, "k8s_bar\\.[a-f0-9]+_foo.new.test_", fakeDocker.Created[0]) { t.Errorf("Unexpected containers created %v", fakeDocker.Created) } fakeDocker.Unlock() @@ -384,16 +394,18 @@ func TestSyncPodsDeletesWithNoNetContainer(t *testing.T) { fakeDocker.ContainerList = []docker.APIContainers{ { // format is k8s__ - Names: []string{"/k8s_bar_foo.test"}, + Names: []string{"/k8s_bar_foo.new.test"}, ID: "1234", }, } - err := kubelet.SyncPods([]Pod{ + err := kubelet.SyncPods([]api.BoundPod{ { - Name: "foo", - Namespace: "test", - Manifest: api.ContainerManifest{ - ID: "foo", + TypeMeta: api.TypeMeta{ + ID: "foo", + Namespace: "new", + Annotations: map[string]string{ConfigSourceAnnotationKey: "test"}, + }, + Spec: api.PodSpec{ Containers: []api.Container{ {Name: "bar"}, }, @@ -425,12 +437,12 @@ func TestSyncPodsDeletes(t *testing.T) { fakeDocker.ContainerList = []docker.APIContainers{ { // the k8s prefix is required for the kubelet to manage the container - Names: []string{"/k8s_foo_bar.test"}, + Names: []string{"/k8s_foo_bar.new.test"}, ID: "1234", }, { // network container - Names: []string{"/k8s_net_foo.test_"}, + Names: []string{"/k8s_net_foo.new.test_"}, ID: "9876", }, { @@ -438,7 +450,7 @@ func TestSyncPodsDeletes(t *testing.T) { ID: "4567", }, } - err := kubelet.SyncPods([]Pod{}) + err := kubelet.SyncPods([]api.BoundPod{}) if err != nil { t.Errorf("unexpected error: %v", err) } @@ -463,30 +475,32 @@ func TestSyncPodDeletesDuplicate(t *testing.T) { dockerContainers := dockertools.DockerContainers{ "1234": &docker.APIContainers{ // the k8s prefix is required for the kubelet to manage the container - Names: []string{"/k8s_foo_bar.test_1"}, + Names: []string{"/k8s_foo_bar.new.test_1"}, ID: "1234", }, "9876": &docker.APIContainers{ // network container - Names: []string{"/k8s_net_bar.test_"}, + Names: []string{"/k8s_net_bar.new.test_"}, ID: "9876", }, "4567": &docker.APIContainers{ // Duplicate for the same container. - Names: []string{"/k8s_foo_bar.test_2"}, + Names: []string{"/k8s_foo_bar.new.test_2"}, ID: "4567", }, "2304": &docker.APIContainers{ // Container for another pod, untouched. - Names: []string{"/k8s_baz_fiz.test_6"}, + Names: []string{"/k8s_baz_fiz.new.test_6"}, ID: "2304", }, } - err := kubelet.syncPod(&Pod{ - Name: "bar", - Namespace: "test", - Manifest: api.ContainerManifest{ - ID: "bar", + err := kubelet.syncPod(&api.BoundPod{ + TypeMeta: api.TypeMeta{ + ID: "bar", + Namespace: "new", + Annotations: map[string]string{ConfigSourceAnnotationKey: "test"}, + }, + Spec: api.PodSpec{ Containers: []api.Container{ {Name: "foo"}, }, @@ -520,20 +534,22 @@ func TestSyncPodBadHash(t *testing.T) { dockerContainers := dockertools.DockerContainers{ "1234": &docker.APIContainers{ // the k8s prefix is required for the kubelet to manage the container - Names: []string{"/k8s_bar.1234_foo.test"}, + Names: []string{"/k8s_bar.1234_foo.new.test"}, ID: "1234", }, "9876": &docker.APIContainers{ // network container - Names: []string{"/k8s_net_foo.test_"}, + Names: []string{"/k8s_net_foo.new.test_"}, ID: "9876", }, } - err := kubelet.syncPod(&Pod{ - Name: "foo", - Namespace: "test", - Manifest: api.ContainerManifest{ - ID: "foo", + err := kubelet.syncPod(&api.BoundPod{ + TypeMeta: api.TypeMeta{ + ID: "foo", + Namespace: "new", + Annotations: map[string]string{ConfigSourceAnnotationKey: "test"}, + }, + Spec: api.PodSpec{ Containers: []api.Container{ {Name: "bar"}, }, @@ -562,20 +578,22 @@ func TestSyncPodUnhealthy(t *testing.T) { dockerContainers := dockertools.DockerContainers{ "1234": &docker.APIContainers{ // the k8s prefix is required for the kubelet to manage the container - Names: []string{"/k8s_bar_foo.test"}, + Names: []string{"/k8s_bar_foo.new.test"}, ID: "1234", }, "9876": &docker.APIContainers{ // network container - Names: []string{"/k8s_net_foo.test_"}, + Names: []string{"/k8s_net_foo.new.test_"}, ID: "9876", }, } - err := kubelet.syncPod(&Pod{ - Name: "foo", - Namespace: "test", - Manifest: api.ContainerManifest{ - ID: "foo", + err := kubelet.syncPod(&api.BoundPod{ + TypeMeta: api.TypeMeta{ + ID: "foo", + Namespace: "new", + Annotations: map[string]string{ConfigSourceAnnotationKey: "test"}, + }, + Spec: api.PodSpec{ Containers: []api.Container{ {Name: "bar", LivenessProbe: &api.LivenessProbe{ @@ -629,25 +647,31 @@ func TestMakeEnvVariables(t *testing.T) { func TestMountExternalVolumes(t *testing.T) { kubelet, _, _ := newTestKubelet(t) - manifest := api.ContainerManifest{ - Volumes: []api.Volume{ - { - Name: "host-dir", - Source: &api.VolumeSource{ - HostDir: &api.HostDir{"/dir/path"}, + pod := api.BoundPod{ + TypeMeta: api.TypeMeta{ + ID: "foo", + Namespace: "test", + }, + Spec: api.PodSpec{ + Volumes: []api.Volume{ + { + Name: "host-dir", + Source: &api.VolumeSource{ + HostDir: &api.HostDir{"/dir/path"}, + }, }, }, }, } - podVolumes, _ := kubelet.mountExternalVolumes(&manifest) + podVolumes, _ := kubelet.mountExternalVolumes(&pod) expectedPodVolumes := make(volumeMap) expectedPodVolumes["host-dir"] = &volume.HostDir{"/dir/path"} if len(expectedPodVolumes) != len(podVolumes) { - t.Errorf("Unexpected volumes. Expected %#v got %#v. Manifest was: %#v", expectedPodVolumes, podVolumes, manifest) + t.Errorf("Unexpected volumes. Expected %#v got %#v. Manifest was: %#v", expectedPodVolumes, podVolumes, pod) } for name, expectedVolume := range expectedPodVolumes { if _, ok := podVolumes[name]; !ok { - t.Errorf("Pod volumes map is missing key: %s. %#v", expectedVolume, podVolumes) + t.Errorf("api.BoundPod volumes map is missing key: %s. %#v", expectedVolume, podVolumes) } } } @@ -678,9 +702,11 @@ func TestMakeVolumesAndBinds(t *testing.T) { }, } - pod := Pod{ - Name: "pod", - Namespace: "test", + pod := api.BoundPod{ + TypeMeta: api.TypeMeta{ + ID: "pod", + Namespace: "test", + }, } podVolumes := volumeMap{ @@ -769,26 +795,26 @@ func TestMakePortsAndBindings(t *testing.T) { } func TestCheckHostPortConflicts(t *testing.T) { - successCaseAll := []Pod{ - {Manifest: api.ContainerManifest{Containers: []api.Container{{Ports: []api.Port{{HostPort: 80}}}}}}, - {Manifest: api.ContainerManifest{Containers: []api.Container{{Ports: []api.Port{{HostPort: 81}}}}}}, - {Manifest: api.ContainerManifest{Containers: []api.Container{{Ports: []api.Port{{HostPort: 82}}}}}}, + successCaseAll := []api.BoundPod{ + {Spec: api.PodSpec{Containers: []api.Container{{Ports: []api.Port{{HostPort: 80}}}}}}, + {Spec: api.PodSpec{Containers: []api.Container{{Ports: []api.Port{{HostPort: 81}}}}}}, + {Spec: api.PodSpec{Containers: []api.Container{{Ports: []api.Port{{HostPort: 82}}}}}}, } - successCaseNew := Pod{ - Manifest: api.ContainerManifest{Containers: []api.Container{{Ports: []api.Port{{HostPort: 83}}}}}, + successCaseNew := api.BoundPod{ + Spec: api.PodSpec{Containers: []api.Container{{Ports: []api.Port{{HostPort: 83}}}}}, } expected := append(successCaseAll, successCaseNew) if actual := filterHostPortConflicts(expected); !reflect.DeepEqual(actual, expected) { t.Errorf("Expected %#v, Got %#v", expected, actual) } - failureCaseAll := []Pod{ - {Manifest: api.ContainerManifest{Containers: []api.Container{{Ports: []api.Port{{HostPort: 80}}}}}}, - {Manifest: api.ContainerManifest{Containers: []api.Container{{Ports: []api.Port{{HostPort: 81}}}}}}, - {Manifest: api.ContainerManifest{Containers: []api.Container{{Ports: []api.Port{{HostPort: 82}}}}}}, + failureCaseAll := []api.BoundPod{ + {Spec: api.PodSpec{Containers: []api.Container{{Ports: []api.Port{{HostPort: 80}}}}}}, + {Spec: api.PodSpec{Containers: []api.Container{{Ports: []api.Port{{HostPort: 81}}}}}}, + {Spec: api.PodSpec{Containers: []api.Container{{Ports: []api.Port{{HostPort: 82}}}}}}, } - failureCaseNew := Pod{ - Manifest: api.ContainerManifest{Containers: []api.Container{{Ports: []api.Port{{HostPort: 81}}}}}, + failureCaseNew := api.BoundPod{ + Spec: api.PodSpec{Containers: []api.Container{{Ports: []api.Port{{HostPort: 81}}}}}, } if actual := filterHostPortConflicts(append(failureCaseAll, failureCaseNew)); !reflect.DeepEqual(failureCaseAll, actual) { t.Errorf("Expected %#v, Got %#v", expected, actual) @@ -965,7 +991,7 @@ func TestRunInContainerNoSuchPod(t *testing.T) { podNamespace := "etcd" containerName := "containerFoo" output, err := kubelet.RunInContainer( - GetPodFullName(&Pod{Name: podName, Namespace: podNamespace}), + GetPodFullName(&api.BoundPod{TypeMeta: api.TypeMeta{ID: podName, Namespace: podNamespace}}), "", containerName, []string{"ls"}) @@ -990,13 +1016,19 @@ func TestRunInContainer(t *testing.T) { fakeDocker.ContainerList = []docker.APIContainers{ { ID: containerID, - Names: []string{"/k8s_" + containerName + "_" + podName + "." + podNamespace + "_1234"}, + Names: []string{"/k8s_" + containerName + "_" + podName + "." + podNamespace + ".test_1234"}, }, } cmd := []string{"ls"} _, err := kubelet.RunInContainer( - GetPodFullName(&Pod{Name: podName, Namespace: podNamespace}), + GetPodFullName(&api.BoundPod{ + TypeMeta: api.TypeMeta{ + ID: podName, + Namespace: podNamespace, + Annotations: map[string]string{ConfigSourceAnnotationKey: "test"}, + }, + }), "", containerName, cmd) @@ -1128,15 +1160,17 @@ func TestSyncPodEventHandlerFails(t *testing.T) { dockerContainers := dockertools.DockerContainers{ "9876": &docker.APIContainers{ // network container - Names: []string{"/k8s_net_foo.test_"}, + Names: []string{"/k8s_net_foo.new.test_"}, ID: "9876", }, } - err := kubelet.syncPod(&Pod{ - Name: "foo", - Namespace: "test", - Manifest: api.ContainerManifest{ - ID: "foo", + err := kubelet.syncPod(&api.BoundPod{ + TypeMeta: api.TypeMeta{ + ID: "foo", + Namespace: "new", + Annotations: map[string]string{ConfigSourceAnnotationKey: "test"}, + }, + Spec: api.PodSpec{ Containers: []api.Container{ {Name: "bar", Lifecycle: &api.Lifecycle{ diff --git a/pkg/kubelet/runonce.go b/pkg/kubelet/runonce.go index cdb00a6edfd..0053a4bb8b0 100644 --- a/pkg/kubelet/runonce.go +++ b/pkg/kubelet/runonce.go @@ -20,6 +20,7 @@ import ( "fmt" "time" + "github.com/GoogleCloudPlatform/kubernetes/pkg/api" "github.com/GoogleCloudPlatform/kubernetes/pkg/kubelet/dockertools" "github.com/golang/glog" ) @@ -32,7 +33,7 @@ const ( ) type RunPodResult struct { - Pod *Pod + Pod *api.BoundPod Err error } @@ -50,7 +51,7 @@ func (kl *Kubelet) RunOnce(updates <-chan PodUpdate) ([]RunPodResult, error) { } // runOnce runs a given set of pods and returns their status. -func (kl *Kubelet) runOnce(pods []Pod) (results []RunPodResult, err error) { +func (kl *Kubelet) runOnce(pods []api.BoundPod) (results []RunPodResult, err error) { if kl.dockerPuller == nil { kl.dockerPuller = dockertools.NewDockerPuller(kl.dockerClient, kl.pullQPS, kl.pullBurst) } @@ -72,10 +73,10 @@ func (kl *Kubelet) runOnce(pods []Pod) (results []RunPodResult, err error) { results = append(results, res) if res.Err != nil { // TODO(proppy): report which containers failed the pod. - glog.Infof("failed to start pod %q: %v", res.Pod.Name, res.Err) - failedPods = append(failedPods, res.Pod.Name) + glog.Infof("failed to start pod %q: %v", res.Pod.ID, res.Err) + failedPods = append(failedPods, res.Pod.ID) } else { - glog.Infof("started pod %q", res.Pod.Name) + glog.Infof("started pod %q", res.Pod.ID) } } if len(failedPods) > 0 { @@ -86,7 +87,7 @@ func (kl *Kubelet) runOnce(pods []Pod) (results []RunPodResult, err error) { } // runPod runs a single pod and wait until all containers are running. -func (kl *Kubelet) runPod(pod Pod) error { +func (kl *Kubelet) runPod(pod api.BoundPod) error { delay := RunOnceRetryDelay retry := 0 for { @@ -95,18 +96,18 @@ func (kl *Kubelet) runPod(pod Pod) error { return fmt.Errorf("failed to get kubelet docker containers: %v", err) } if running := kl.isPodRunning(pod, dockerContainers); running { - glog.Infof("pod %q containers running", pod.Name) + glog.Infof("pod %q containers running", pod.ID) return nil } - glog.Infof("pod %q containers not running: syncing", pod.Name) + glog.Infof("pod %q containers not running: syncing", pod.ID) if err = kl.syncPod(&pod, dockerContainers); err != nil { return fmt.Errorf("error syncing pod: %v", err) } if retry >= RunOnceMaxRetries { - return fmt.Errorf("timeout error: pod %q containers not running after %d retries", pod.Name, RunOnceMaxRetries) + return fmt.Errorf("timeout error: pod %q containers not running after %d retries", pod.ID, RunOnceMaxRetries) } // TODO(proppy): health checking would be better than waiting + checking the state at the next iteration. - glog.Infof("pod %q containers synced, waiting for %v", pod.Name, delay) + glog.Infof("pod %q containers synced, waiting for %v", pod.ID, delay) <-time.After(delay) retry++ delay *= RunOnceRetryDelayBackoff @@ -114,9 +115,9 @@ func (kl *Kubelet) runPod(pod Pod) error { } // isPodRunning returns true if all containers of a manifest are running. -func (kl *Kubelet) isPodRunning(pod Pod, dockerContainers dockertools.DockerContainers) bool { - for _, container := range pod.Manifest.Containers { - if dockerContainer, found, _ := dockerContainers.FindPodContainer(GetPodFullName(&pod), pod.Manifest.UUID, container.Name); !found || dockerContainer.Status != "running" { +func (kl *Kubelet) isPodRunning(pod api.BoundPod, dockerContainers dockertools.DockerContainers) bool { + for _, container := range pod.Spec.Containers { + if dockerContainer, found, _ := dockerContainers.FindPodContainer(GetPodFullName(&pod), pod.UID, container.Name); !found || dockerContainer.Status != "running" { glog.Infof("container %q not found (%v) or not running: %#v", container.Name, found, dockerContainer) return false } diff --git a/pkg/kubelet/runonce_test.go b/pkg/kubelet/runonce_test.go index c110eccf928..e23be900585 100644 --- a/pkg/kubelet/runonce_test.go +++ b/pkg/kubelet/runonce_test.go @@ -69,12 +69,12 @@ func TestRunOnce(t *testing.T) { kb := &Kubelet{} podContainers := []docker.APIContainers{ { - Names: []string{"/k8s_bar." + strconv.FormatUint(dockertools.HashContainer(&api.Container{Name: "bar"}), 16) + "_foo.test"}, + Names: []string{"/k8s_bar." + strconv.FormatUint(dockertools.HashContainer(&api.Container{Name: "bar"}), 16) + "_foo.new.test"}, ID: "1234", Status: "running", }, { - Names: []string{"/k8s_net_foo.test_"}, + Names: []string{"/k8s_net_foo.new.test_"}, ID: "9876", Status: "running", }, @@ -106,12 +106,14 @@ func TestRunOnce(t *testing.T) { t: t, } kb.dockerPuller = &dockertools.FakeDockerPuller{} - results, err := kb.runOnce([]Pod{ + results, err := kb.runOnce([]api.BoundPod{ { - Name: "foo", - Namespace: "test", - Manifest: api.ContainerManifest{ - ID: "foo", + TypeMeta: api.TypeMeta{ + ID: "foo", + Namespace: "new", + Annotations: map[string]string{ConfigSourceAnnotationKey: "test"}, + }, + Spec: api.PodSpec{ Containers: []api.Container{ {Name: "bar"}, }, @@ -124,7 +126,7 @@ func TestRunOnce(t *testing.T) { if results[0].Err != nil { t.Errorf("unexpected run pod error: %v", results[0].Err) } - if results[0].Pod.Name != "foo" { - t.Errorf("unexpected pod: %q", results[0].Pod.Name) + if results[0].Pod.ID != "foo" { + t.Errorf("unexpected pod: %q", results[0].Pod.ID) } } diff --git a/pkg/kubelet/server.go b/pkg/kubelet/server.go index 0fdf1a86785..16d7f689342 100644 --- a/pkg/kubelet/server.go +++ b/pkg/kubelet/server.go @@ -119,15 +119,26 @@ func (s *Server) handleContainer(w http.ResponseWriter, req *http.Request) { return } // This is to provide backward compatibility. It only supports a single manifest - var pod Pod - err = yaml.Unmarshal(data, &pod.Manifest) + var pod api.BoundPod + var containerManifest api.ContainerManifest + err = yaml.Unmarshal(data, &containerManifest) if err != nil { s.error(w, err) return } + pod.ID = containerManifest.ID + pod.UID = containerManifest.UUID + pod.Spec.Containers = containerManifest.Containers + pod.Spec.Volumes = containerManifest.Volumes + pod.Spec.RestartPolicy = containerManifest.RestartPolicy //TODO: sha1 of manifest? - pod.Name = "1" - s.updates <- PodUpdate{[]Pod{pod}, SET} + if pod.ID == "" { + pod.ID = "1" + } + if pod.UID == "" { + pod.UID = "1" + } + s.updates <- PodUpdate{[]api.BoundPod{pod}, SET} } @@ -139,16 +150,16 @@ func (s *Server) handleContainers(w http.ResponseWriter, req *http.Request) { s.error(w, err) return } - var manifests []api.ContainerManifest - err = yaml.Unmarshal(data, &manifests) + var specs []api.PodSpec + err = yaml.Unmarshal(data, &specs) if err != nil { s.error(w, err) return } - pods := make([]Pod, len(manifests)) - for i := range manifests { - pods[i].Name = fmt.Sprintf("%d", i+1) - pods[i].Manifest = manifests[i] + pods := make([]api.BoundPod, len(specs)) + for i := range specs { + pods[i].ID = fmt.Sprintf("%d", i+1) + pods[i].Spec = specs[i] } s.updates <- PodUpdate{pods, SET} @@ -186,7 +197,14 @@ func (s *Server) handleContainerLogs(w http.ResponseWriter, req *http.Request) { follow, _ := strconv.ParseBool(uriValues.Get("follow")) tail := uriValues.Get("tail") - podFullName := GetPodFullName(&Pod{Name: podID, Namespace: "etcd"}) + podFullName := GetPodFullName(&api.BoundPod{ + TypeMeta: api.TypeMeta{ + ID: podID, + // TODO: I am broken + Namespace: api.NamespaceDefault, + Annotations: map[string]string{ConfigSourceAnnotationKey: "etcd"}, + }, + }) fw := FlushWriter{writer: w} if flusher, ok := w.(http.Flusher); ok { @@ -216,10 +234,17 @@ func (s *Server) handlePodInfo(w http.ResponseWriter, req *http.Request) { return } // TODO: backwards compatibility with existing API, needs API change - podFullName := GetPodFullName(&Pod{Name: podID, Namespace: "etcd"}) + podFullName := GetPodFullName(&api.BoundPod{ + TypeMeta: api.TypeMeta{ + ID: podID, + // TODO: I am broken + Namespace: api.NamespaceDefault, + Annotations: map[string]string{ConfigSourceAnnotationKey: "etcd"}, + }, + }) info, err := s.host.GetPodInfo(podFullName, podUUID) if err == dockertools.ErrNoContainersInPod { - http.Error(w, "Pod does not exist", http.StatusNotFound) + http.Error(w, "api.BoundPod does not exist", http.StatusNotFound) return } if err != nil { @@ -283,7 +308,14 @@ func (s *Server) handleRun(w http.ResponseWriter, req *http.Request) { http.Error(w, "Unexpected path for command running", http.StatusBadRequest) return } - podFullName := GetPodFullName(&Pod{Name: podID, Namespace: "etcd"}) + podFullName := GetPodFullName(&api.BoundPod{ + TypeMeta: api.TypeMeta{ + ID: podID, + // TODO: I am broken + Namespace: api.NamespaceDefault, + Annotations: map[string]string{ConfigSourceAnnotationKey: "etcd"}, + }, + }) command := strings.Split(u.Query().Get("cmd"), " ") data, err := s.host.RunInContainer(podFullName, uuid, container, command) if err != nil { @@ -327,10 +359,24 @@ func (s *Server) serveStats(w http.ResponseWriter, req *http.Request) { errors.New("pod level status currently unimplemented") case 3: // Backward compatibility without uuid information - podFullName := GetPodFullName(&Pod{Name: components[1], Namespace: "etcd"}) + podFullName := GetPodFullName(&api.BoundPod{ + TypeMeta: api.TypeMeta{ + ID: components[1], + // TODO: I am broken + Namespace: api.NamespaceDefault, + Annotations: map[string]string{ConfigSourceAnnotationKey: "etcd"}, + }, + }) stats, err = s.host.GetContainerInfo(podFullName, "", components[2], &query) case 4: - podFullName := GetPodFullName(&Pod{Name: components[1], Namespace: "etcd"}) + podFullName := GetPodFullName(&api.BoundPod{ + TypeMeta: api.TypeMeta{ + ID: components[1], + // TODO: I am broken + Namespace: "", + Annotations: map[string]string{ConfigSourceAnnotationKey: "etcd"}, + }, + }) stats, err = s.host.GetContainerInfo(podFullName, components[2], components[2], &query) default: http.Error(w, "unknown resource.", http.StatusNotFound) diff --git a/pkg/kubelet/server_test.go b/pkg/kubelet/server_test.go index 91a742fbea0..3261337c47c 100644 --- a/pkg/kubelet/server_test.go +++ b/pkg/kubelet/server_test.go @@ -101,7 +101,23 @@ func readResp(resp *http.Response) (string, error) { func TestContainer(t *testing.T) { fw := newServerTest() expected := []api.ContainerManifest{ - {ID: "test_manifest"}, + { + ID: "test_manifest", + UUID: "value", + Containers: []api.Container{ + { + Name: "container", + }, + }, + Volumes: []api.Volume{ + { + Name: "test", + }, + }, + RestartPolicy: api.RestartPolicy{ + Never: &api.RestartPolicyNever{}, + }, + }, } body := bytes.NewBuffer([]byte(util.EncodeJSON(expected[0]))) // Only send a single ContainerManifest resp, err := http.Post(fw.testHTTPServer.URL+"/container", "application/json", body) @@ -114,7 +130,29 @@ func TestContainer(t *testing.T) { if len(received) != 1 { t.Errorf("Expected 1 manifest, but got %v", len(received)) } - expectedPods := []Pod{{Name: "1", Manifest: expected[0]}} + expectedPods := []api.BoundPod{ + { + TypeMeta: api.TypeMeta{ + ID: "test_manifest", + UID: "value", + }, + Spec: api.PodSpec{ + Containers: []api.Container{ + { + Name: "container", + }, + }, + Volumes: []api.Volume{ + { + Name: "test", + }, + }, + RestartPolicy: api.RestartPolicy{ + Never: &api.RestartPolicyNever{}, + }, + }, + }, + } if !reflect.DeepEqual(expectedPods, received[0]) { t.Errorf("Expected %#v, but got %#v", expectedPods, received[0]) } @@ -123,8 +161,38 @@ func TestContainer(t *testing.T) { func TestContainers(t *testing.T) { fw := newServerTest() expected := []api.ContainerManifest{ - {ID: "test_manifest_1"}, - {ID: "test_manifest_2"}, + { + ID: "test_manifest_1", + Containers: []api.Container{ + { + Name: "container", + }, + }, + Volumes: []api.Volume{ + { + Name: "test", + }, + }, + RestartPolicy: api.RestartPolicy{ + Never: &api.RestartPolicyNever{}, + }, + }, + { + ID: "test_manifest_2", + Containers: []api.Container{ + { + Name: "container2", + }, + }, + Volumes: []api.Volume{ + { + Name: "test2", + }, + }, + RestartPolicy: api.RestartPolicy{ + Never: &api.RestartPolicyNever{}, + }, + }, } body := bytes.NewBuffer([]byte(util.EncodeJSON(expected))) resp, err := http.Post(fw.testHTTPServer.URL+"/containers", "application/json", body) @@ -137,7 +205,48 @@ func TestContainers(t *testing.T) { if len(received) != 1 { t.Errorf("Expected 1 update, but got %v", len(received)) } - expectedPods := []Pod{{Name: "1", Manifest: expected[0]}, {Name: "2", Manifest: expected[1]}} + expectedPods := []api.BoundPod{ + { + TypeMeta: api.TypeMeta{ + ID: "1", + }, + Spec: api.PodSpec{ + Containers: []api.Container{ + { + Name: "container", + }, + }, + Volumes: []api.Volume{ + { + Name: "test", + }, + }, + RestartPolicy: api.RestartPolicy{ + Never: &api.RestartPolicyNever{}, + }, + }, + }, + { + TypeMeta: api.TypeMeta{ + ID: "2", + }, + Spec: api.PodSpec{ + Containers: []api.Container{ + { + Name: "container2", + }, + }, + Volumes: []api.Volume{ + { + Name: "test2", + }, + }, + RestartPolicy: api.RestartPolicy{ + Never: &api.RestartPolicyNever{}, + }, + }, + }, + } if !reflect.DeepEqual(expectedPods, received[0]) { t.Errorf("Expected %#v, but got %#v", expectedPods, received[0]) } @@ -149,7 +258,7 @@ func TestPodInfo(t *testing.T) { "goodpod": api.ContainerStatus{}, } fw.fakeKubelet.infoFunc = func(name string) (api.PodInfo, error) { - if name == "goodpod.etcd" { + if name == "goodpod.default.etcd" { return expected, nil } return nil, fmt.Errorf("bad pod %s", name) @@ -175,7 +284,7 @@ func TestContainerInfo(t *testing.T) { fw := newServerTest() expectedInfo := &info.ContainerInfo{} podID := "somepod" - expectedPodID := "somepod" + ".etcd" + expectedPodID := "somepod" + ".default.etcd" expectedContainerName := "goodcontainer" fw.fakeKubelet.containerInfoFunc = func(podID, containerName string, req *info.ContainerInfoRequest) (*info.ContainerInfo, error) { if podID != expectedPodID || containerName != expectedContainerName { @@ -278,7 +387,7 @@ func TestServeRunInContainer(t *testing.T) { fw := newServerTest() output := "foo bar" podName := "foo" - expectedPodName := podName + ".etcd" + expectedPodName := podName + ".default.etcd" expectedContainerName := "baz" expectedCommand := "ls -a" fw.fakeKubelet.runFunc = func(podFullName, uuid, containerName string, cmd []string) ([]byte, error) { @@ -317,7 +426,7 @@ func TestServeRunInContainerWithUUID(t *testing.T) { fw := newServerTest() output := "foo bar" podName := "foo" - expectedPodName := podName + ".etcd" + expectedPodName := podName + ".default.etcd" expectedUuid := "7e00838d_-_3523_-_11e4_-_8421_-_42010af0a720" expectedContainerName := "baz" expectedCommand := "ls -a" @@ -360,7 +469,7 @@ func TestContainerLogs(t *testing.T) { fw := newServerTest() output := "foo bar" podName := "foo" - expectedPodName := podName + ".etcd" + expectedPodName := podName + ".default.etcd" expectedContainerName := "baz" expectedTail := "" expectedFollow := false @@ -399,7 +508,7 @@ func TestContainerLogsWithTail(t *testing.T) { fw := newServerTest() output := "foo bar" podName := "foo" - expectedPodName := podName + ".etcd" + expectedPodName := podName + ".default.etcd" expectedContainerName := "baz" expectedTail := "5" expectedFollow := false @@ -438,7 +547,7 @@ func TestContainerLogsWithFollow(t *testing.T) { fw := newServerTest() output := "foo bar" podName := "foo" - expectedPodName := podName + ".etcd" + expectedPodName := podName + ".default.etcd" expectedContainerName := "baz" expectedTail := "" expectedFollow := true diff --git a/pkg/kubelet/types.go b/pkg/kubelet/types.go index bd063fc129a..d8f167f909d 100644 --- a/pkg/kubelet/types.go +++ b/pkg/kubelet/types.go @@ -22,13 +22,7 @@ import ( "github.com/GoogleCloudPlatform/kubernetes/pkg/api" ) -// Pod represents the structure of a pod on the Kubelet, distinct from the apiserver -// representation of a Pod. -type Pod struct { - Namespace string - Name string - Manifest api.ContainerManifest -} +const ConfigSourceAnnotationKey = "kubernetes/config.source" // PodOperation defines what changes will be made on a pod configuration. type PodOperation int @@ -48,13 +42,13 @@ const ( // sending an array of size one and Op == ADD|REMOVE (with REMOVE, only the ID is required). // For setting the state of the system to a given state for this source configuration, set // Pods as desired and Op to SET, which will reset the system state to that specified in this -// operation for this source channel. To remove all pods, set Pods to empty array and Op to SET. +// operation for this source channel. To remove all pods, set Pods to empty object and Op to SET. type PodUpdate struct { - Pods []Pod + Pods []api.BoundPod Op PodOperation } -// GetPodFullName returns a name that full identifies a pod across all config sources. -func GetPodFullName(pod *Pod) string { - return fmt.Sprintf("%s.%s", pod.Name, pod.Namespace) +// GetPodFullName returns a name that uniquely identifies a pod across all config sources. +func GetPodFullName(pod *api.BoundPod) string { + return fmt.Sprintf("%s.%s.%s", pod.ID, pod.Namespace, pod.Annotations[ConfigSourceAnnotationKey]) } diff --git a/pkg/kubelet/validation.go b/pkg/kubelet/validation.go deleted file mode 100644 index 6cf3cfbada9..00000000000 --- a/pkg/kubelet/validation.go +++ /dev/null @@ -1,33 +0,0 @@ -/* -Copyright 2014 Google Inc. All rights reserved. - -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. -*/ - -package kubelet - -import ( - apierrs "github.com/GoogleCloudPlatform/kubernetes/pkg/api/errors" - "github.com/GoogleCloudPlatform/kubernetes/pkg/api/validation" - "github.com/GoogleCloudPlatform/kubernetes/pkg/util" -) - -func ValidatePod(pod *Pod) (errors []error) { - if !util.IsDNSSubdomain(pod.Name) { - errors = append(errors, apierrs.NewFieldInvalid("name", pod.Name)) - } - if errs := validation.ValidateManifest(&pod.Manifest); len(errs) != 0 { - errors = append(errors, errs...) - } - return errors -} diff --git a/pkg/kubelet/validation_test.go b/pkg/kubelet/validation_test.go deleted file mode 100644 index 499199750bd..00000000000 --- a/pkg/kubelet/validation_test.go +++ /dev/null @@ -1,43 +0,0 @@ -/* -Copyright 2014 Google Inc. All rights reserved. - -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. -*/ - -package kubelet_test - -import ( - "strings" - "testing" - - "github.com/GoogleCloudPlatform/kubernetes/pkg/api" - . "github.com/GoogleCloudPlatform/kubernetes/pkg/kubelet" -) - -func TestValidatePodNoName(t *testing.T) { - errorCases := map[string]Pod{ - // manifest is tested in api/validation_test.go, ensure it is invoked - "empty version": {Name: "test", Manifest: api.ContainerManifest{Version: "", ID: "abc"}}, - - // Name - "zero-length name": {Name: "", Manifest: api.ContainerManifest{Version: "v1beta1"}}, - "name > 255 characters": {Name: strings.Repeat("a", 256), Manifest: api.ContainerManifest{Version: "v1beta1"}}, - "name not a DNS subdomain": {Name: "a.b.c.", Manifest: api.ContainerManifest{Version: "v1beta1"}}, - "name with underscore": {Name: "a_b_c", Manifest: api.ContainerManifest{Version: "v1beta1"}}, - } - for k, v := range errorCases { - if errs := ValidatePod(&v); len(errs) == 0 { - t.Errorf("expected failure for %s", k) - } - } -} diff --git a/pkg/master/master.go b/pkg/master/master.go index 37bb8c6915f..3d4d88a8059 100644 --- a/pkg/master/master.go +++ b/pkg/master/master.go @@ -89,15 +89,15 @@ func NewEtcdHelper(client tools.EtcdGetSet, version string) (helper tools.EtcdHe func New(c *Config) *Master { minionRegistry := makeMinionRegistry(c) serviceRegistry := etcd.NewRegistry(c.EtcdHelper, nil) - manifestFactory := &pod.BasicManifestFactory{ + boundPodFactory := &pod.BasicBoundPodFactory{ ServiceRegistry: serviceRegistry, } m := &Master{ - podRegistry: etcd.NewRegistry(c.EtcdHelper, manifestFactory), + podRegistry: etcd.NewRegistry(c.EtcdHelper, boundPodFactory), controllerRegistry: etcd.NewRegistry(c.EtcdHelper, nil), serviceRegistry: serviceRegistry, endpointRegistry: etcd.NewRegistry(c.EtcdHelper, nil), - bindingRegistry: etcd.NewRegistry(c.EtcdHelper, manifestFactory), + bindingRegistry: etcd.NewRegistry(c.EtcdHelper, boundPodFactory), eventRegistry: event.NewEtcdRegistry(c.EtcdHelper, uint64(c.EventTTL.Seconds())), minionRegistry: minionRegistry, client: c.Client, diff --git a/pkg/registry/etcd/etcd.go b/pkg/registry/etcd/etcd.go index 540a052ac45..001ba16273e 100644 --- a/pkg/registry/etcd/etcd.go +++ b/pkg/registry/etcd/etcd.go @@ -51,15 +51,15 @@ const ( // Registry implements PodRegistry, ControllerRegistry, ServiceRegistry and MinionRegistry, backed by etcd. type Registry struct { tools.EtcdHelper - manifestFactory pod.ManifestFactory + boundPodFactory pod.BoundPodFactory } // NewRegistry creates an etcd registry. -func NewRegistry(helper tools.EtcdHelper, manifestFactory pod.ManifestFactory) *Registry { +func NewRegistry(helper tools.EtcdHelper, boundPodFactory pod.BoundPodFactory) *Registry { registry := &Registry{ EtcdHelper: helper, } - registry.manifestFactory = manifestFactory + registry.boundPodFactory = boundPodFactory return registry } @@ -177,7 +177,7 @@ func (r *Registry) GetPod(ctx api.Context, id string) (*api.Pod, error) { } func makeContainerKey(machine string) string { - return "/registry/hosts/" + machine + "/kubelet" + return "/registry/nodes/" + machine + "/boundpods" } // CreatePod creates a pod based on a specification. @@ -230,18 +230,18 @@ func (r *Registry) assignPod(ctx api.Context, podID string, machine string) erro return err } // TODO: move this to a watch/rectification loop. - manifest, err := r.manifestFactory.MakeManifest(machine, *finalPod) + pod, err := r.boundPodFactory.MakeBoundPod(machine, finalPod) if err != nil { return err } contKey := makeContainerKey(machine) - err = r.AtomicUpdate(contKey, &api.ContainerManifestList{}, func(in runtime.Object) (runtime.Object, error) { - manifests := *in.(*api.ContainerManifestList) - manifests.Items = append(manifests.Items, manifest) - if !constraint.Allowed(manifests.Items) { + err = r.AtomicUpdate(contKey, &api.BoundPods{}, func(in runtime.Object) (runtime.Object, error) { + pods := *in.(*api.BoundPods) + pods.Items = append(pods.Items, *pod) + if !constraint.Allowed(pods.Items) { return nil, fmt.Errorf("The assignment would cause a constraint violation") } - return &manifests, nil + return &pods, nil }) if err != nil { // Put the pod's host back the way it was. This is a terrible hack that @@ -321,13 +321,13 @@ func (r *Registry) DeletePod(ctx api.Context, podID string) error { } // Next, remove the pod from the machine atomically. contKey := makeContainerKey(machine) - return r.AtomicUpdate(contKey, &api.ContainerManifestList{}, func(in runtime.Object) (runtime.Object, error) { - manifests := in.(*api.ContainerManifestList) - newManifests := make([]api.ContainerManifest, 0, len(manifests.Items)) + return r.AtomicUpdate(contKey, &api.BoundPods{}, func(in runtime.Object) (runtime.Object, error) { + pods := in.(*api.BoundPods) + newPods := make([]api.BoundPod, 0, len(pods.Items)) found := false - for _, manifest := range manifests.Items { - if manifest.ID != podID { - newManifests = append(newManifests, manifest) + for _, pod := range pods.Items { + if pod.ID != podID { + newPods = append(newPods, pod) } else { found = true } @@ -336,10 +336,10 @@ func (r *Registry) DeletePod(ctx api.Context, podID string) error { // This really shouldn't happen, it indicates something is broken, and likely // there is a lost pod somewhere. // However it is "deleted" so log it and move on - glog.Warningf("Couldn't find: %s in %#v", podID, manifests) + glog.Warningf("Couldn't find: %s in %#v", podID, pods) } - manifests.Items = newManifests - return manifests, nil + pods.Items = newPods + return pods, nil }) } diff --git a/pkg/registry/etcd/etcd_test.go b/pkg/registry/etcd/etcd_test.go index f738a734aab..51c787646b8 100644 --- a/pkg/registry/etcd/etcd_test.go +++ b/pkg/registry/etcd/etcd_test.go @@ -36,7 +36,7 @@ import ( func NewTestEtcdRegistry(client tools.EtcdClient) *Registry { registry := NewRegistry(tools.EtcdHelper{client, latest.Codec, tools.RuntimeVersionAdapter{latest.ResourceVersioner}}, - &pod.BasicManifestFactory{ + &pod.BasicBoundPodFactory{ ServiceRegistry: ®istrytest.ServiceRegistry{}, }) return registry @@ -160,7 +160,7 @@ func TestEtcdCreatePod(t *testing.T) { }, E: tools.EtcdErrorNotFound, } - fakeClient.Set("/registry/hosts/machine/kubelet", runtime.EncodeOrDie(latest.Codec, &api.ContainerManifestList{}), 0) + fakeClient.Set("/registry/nodes/machine/boundpods", runtime.EncodeOrDie(latest.Codec, &api.BoundPods{}), 0) registry := NewTestEtcdRegistry(fakeClient) err := registry.CreatePod(ctx, &api.Pod{ TypeMeta: api.TypeMeta{ @@ -199,15 +199,15 @@ func TestEtcdCreatePod(t *testing.T) { if pod.ID != "foo" { t.Errorf("Unexpected pod: %#v %s", pod, resp.Node.Value) } - var manifests api.ContainerManifestList - resp, err = fakeClient.Get("/registry/hosts/machine/kubelet", false, false) + var boundPods api.BoundPods + resp, err = fakeClient.Get("/registry/nodes/machine/boundpods", false, false) if err != nil { t.Errorf("unexpected error: %v", err) } - err = latest.Codec.DecodeInto([]byte(resp.Node.Value), &manifests) - if len(manifests.Items) != 1 || manifests.Items[0].ID != "foo" { - t.Errorf("Unexpected manifest list: %#v", manifests) + err = latest.Codec.DecodeInto([]byte(resp.Node.Value), &boundPods) + if len(boundPods.Items) != 1 || boundPods.Items[0].ID != "foo" { + t.Errorf("Unexpected boundPod list: %#v", boundPods) } } @@ -268,7 +268,7 @@ func TestEtcdCreatePodWithContainersError(t *testing.T) { }, E: tools.EtcdErrorNotFound, } - fakeClient.Data["/registry/hosts/machine/kubelet"] = tools.EtcdResponseWithError{ + fakeClient.Data["/registry/nodes/machine/boundpods"] = tools.EtcdResponseWithError{ R: &etcd.Response{ Node: nil, }, @@ -310,7 +310,7 @@ func TestEtcdCreatePodWithContainersNotFound(t *testing.T) { }, E: tools.EtcdErrorNotFound, } - fakeClient.Data["/registry/hosts/machine/kubelet"] = tools.EtcdResponseWithError{ + fakeClient.Data["/registry/nodes/machine/boundpods"] = tools.EtcdResponseWithError{ R: &etcd.Response{ Node: nil, }, @@ -355,15 +355,15 @@ func TestEtcdCreatePodWithContainersNotFound(t *testing.T) { if pod.ID != "foo" { t.Errorf("Unexpected pod: %#v %s", pod, resp.Node.Value) } - var manifests api.ContainerManifestList - resp, err = fakeClient.Get("/registry/hosts/machine/kubelet", false, false) + var boundPods api.BoundPods + resp, err = fakeClient.Get("/registry/nodes/machine/boundpods", false, false) if err != nil { t.Errorf("unexpected error: %v", err) } - err = latest.Codec.DecodeInto([]byte(resp.Node.Value), &manifests) - if len(manifests.Items) != 1 || manifests.Items[0].ID != "foo" { - t.Errorf("Unexpected manifest list: %#v", manifests) + err = latest.Codec.DecodeInto([]byte(resp.Node.Value), &boundPods) + if len(boundPods.Items) != 1 || boundPods.Items[0].ID != "foo" { + t.Errorf("Unexpected boundPod list: %#v", boundPods) } } @@ -378,9 +378,9 @@ func TestEtcdCreatePodWithExistingContainers(t *testing.T) { }, E: tools.EtcdErrorNotFound, } - fakeClient.Set("/registry/hosts/machine/kubelet", runtime.EncodeOrDie(latest.Codec, &api.ContainerManifestList{ - Items: []api.ContainerManifest{ - {ID: "bar"}, + fakeClient.Set("/registry/nodes/machine/boundpods", runtime.EncodeOrDie(latest.Codec, &api.BoundPods{ + Items: []api.BoundPod{ + {TypeMeta: api.TypeMeta{ID: "bar"}}, }, }), 0) registry := NewTestEtcdRegistry(fakeClient) @@ -422,15 +422,15 @@ func TestEtcdCreatePodWithExistingContainers(t *testing.T) { if pod.ID != "foo" { t.Errorf("Unexpected pod: %#v %s", pod, resp.Node.Value) } - var manifests api.ContainerManifestList - resp, err = fakeClient.Get("/registry/hosts/machine/kubelet", false, false) + var boundPods api.BoundPods + resp, err = fakeClient.Get("/registry/nodes/machine/boundpods", false, false) if err != nil { t.Errorf("unexpected error: %v", err) } - err = latest.Codec.DecodeInto([]byte(resp.Node.Value), &manifests) - if len(manifests.Items) != 2 || manifests.Items[1].ID != "foo" { - t.Errorf("Unexpected manifest list: %#v", manifests) + err = latest.Codec.DecodeInto([]byte(resp.Node.Value), &boundPods) + if len(boundPods.Items) != 2 || boundPods.Items[1].ID != "foo" { + t.Errorf("Unexpected boundPod list: %#v", boundPods) } } @@ -511,7 +511,7 @@ func TestEtcdUpdatePodScheduled(t *testing.T) { }, }), 1) - contKey := "/registry/hosts/machine/kubelet" + contKey := "/registry/nodes/machine/boundpods" fakeClient.Set(contKey, runtime.EncodeOrDie(latest.Codec, &api.ContainerManifestList{ Items: []api.ContainerManifest{ { @@ -586,9 +586,9 @@ func TestEtcdDeletePod(t *testing.T) { TypeMeta: api.TypeMeta{ID: "foo"}, DesiredState: api.PodState{Host: "machine"}, }), 0) - fakeClient.Set("/registry/hosts/machine/kubelet", runtime.EncodeOrDie(latest.Codec, &api.ContainerManifestList{ - Items: []api.ContainerManifest{ - {ID: "foo"}, + fakeClient.Set("/registry/nodes/machine/boundpods", runtime.EncodeOrDie(latest.Codec, &api.BoundPods{ + Items: []api.BoundPod{ + {TypeMeta: api.TypeMeta{ID: "foo"}}, }, }), 0) registry := NewTestEtcdRegistry(fakeClient) @@ -602,13 +602,13 @@ func TestEtcdDeletePod(t *testing.T) { } else if fakeClient.DeletedKeys[0] != key { t.Errorf("Unexpected key: %s, expected %s", fakeClient.DeletedKeys[0], key) } - response, err := fakeClient.Get("/registry/hosts/machine/kubelet", false, false) + response, err := fakeClient.Get("/registry/nodes/machine/boundpods", false, false) if err != nil { t.Fatalf("Unexpected error %v", err) } - var manifests api.ContainerManifestList - latest.Codec.DecodeInto([]byte(response.Node.Value), &manifests) - if len(manifests.Items) != 0 { + var boundPods api.BoundPods + latest.Codec.DecodeInto([]byte(response.Node.Value), &boundPods) + if len(boundPods.Items) != 0 { t.Errorf("Unexpected container set: %s, expected empty", response.Node.Value) } } @@ -622,10 +622,10 @@ func TestEtcdDeletePodMultipleContainers(t *testing.T) { TypeMeta: api.TypeMeta{ID: "foo"}, DesiredState: api.PodState{Host: "machine"}, }), 0) - fakeClient.Set("/registry/hosts/machine/kubelet", runtime.EncodeOrDie(latest.Codec, &api.ContainerManifestList{ - Items: []api.ContainerManifest{ - {ID: "foo"}, - {ID: "bar"}, + fakeClient.Set("/registry/nodes/machine/boundpods", runtime.EncodeOrDie(latest.Codec, &api.BoundPods{ + Items: []api.BoundPod{ + {TypeMeta: api.TypeMeta{ID: "foo"}}, + {TypeMeta: api.TypeMeta{ID: "bar"}}, }, }), 0) registry := NewTestEtcdRegistry(fakeClient) @@ -640,17 +640,17 @@ func TestEtcdDeletePodMultipleContainers(t *testing.T) { if fakeClient.DeletedKeys[0] != key { t.Errorf("Unexpected key: %s, expected %s", fakeClient.DeletedKeys[0], key) } - response, err := fakeClient.Get("/registry/hosts/machine/kubelet", false, false) + response, err := fakeClient.Get("/registry/nodes/machine/boundpods", false, false) if err != nil { t.Fatalf("Unexpected error %v", err) } - var manifests api.ContainerManifestList - latest.Codec.DecodeInto([]byte(response.Node.Value), &manifests) - if len(manifests.Items) != 1 { - t.Fatalf("Unexpected manifest set: %#v, expected empty", manifests) + var boundPods api.BoundPods + latest.Codec.DecodeInto([]byte(response.Node.Value), &boundPods) + if len(boundPods.Items) != 1 { + t.Fatalf("Unexpected boundPod set: %#v, expected empty", boundPods) } - if manifests.Items[0].ID != "bar" { - t.Errorf("Deleted wrong manifest: %#v", manifests) + if boundPods.Items[0].ID != "bar" { + t.Errorf("Deleted wrong boundPod: %#v", boundPods) } } diff --git a/pkg/registry/pod/manifest_factory.go b/pkg/registry/pod/bound_pod_factory.go similarity index 67% rename from pkg/registry/pod/manifest_factory.go rename to pkg/registry/pod/bound_pod_factory.go index db34efa43e0..00030acac58 100644 --- a/pkg/registry/pod/manifest_factory.go +++ b/pkg/registry/pod/bound_pod_factory.go @@ -21,24 +21,27 @@ import ( "github.com/GoogleCloudPlatform/kubernetes/pkg/registry/service" ) -type ManifestFactory interface { +type BoundPodFactory interface { // Make a container object for a given pod, given the machine that the pod is running on. - MakeManifest(machine string, pod api.Pod) (api.ContainerManifest, error) + MakeBoundPod(machine string, pod *api.Pod) (*api.BoundPod, error) } -type BasicManifestFactory struct { +type BasicBoundPodFactory struct { // TODO: this should really point at the API rather than a registry ServiceRegistry service.Registry } -func (b *BasicManifestFactory) MakeManifest(machine string, pod api.Pod) (api.ContainerManifest, error) { +func (b *BasicBoundPodFactory) MakeBoundPod(machine string, pod *api.Pod) (*api.BoundPod, error) { envVars, err := service.GetServiceEnvironmentVariables(api.NewContext(), b.ServiceRegistry, machine) if err != nil { - return api.ContainerManifest{}, err + return nil, err } - for ix, container := range pod.DesiredState.Manifest.Containers { - pod.DesiredState.Manifest.ID = pod.ID - pod.DesiredState.Manifest.Containers[ix].Env = append(container.Env, envVars...) + boundPod := &api.BoundPod{} + if err := api.Scheme.Convert(pod, boundPod); err != nil { + return nil, err } - return pod.DesiredState.Manifest, nil + for ix, container := range boundPod.Spec.Containers { + boundPod.Spec.Containers[ix].Env = append(container.Env, envVars...) + } + return boundPod, nil } diff --git a/pkg/registry/pod/manifest_factory_test.go b/pkg/registry/pod/bound_pod_factory_test.go similarity index 78% rename from pkg/registry/pod/manifest_factory_test.go rename to pkg/registry/pod/bound_pod_factory_test.go index e24d127c296..6fb0a96ca2f 100644 --- a/pkg/registry/pod/manifest_factory_test.go +++ b/pkg/registry/pod/bound_pod_factory_test.go @@ -25,13 +25,13 @@ import ( "github.com/GoogleCloudPlatform/kubernetes/pkg/util" ) -func TestMakeManifestNoServices(t *testing.T) { +func TestMakeBoundPodNoServices(t *testing.T) { registry := registrytest.ServiceRegistry{} - factory := &BasicManifestFactory{ + factory := &BasicBoundPodFactory{ ServiceRegistry: ®istry, } - manifest, err := factory.MakeManifest("machine", api.Pod{ + pod, err := factory.MakeBoundPod("machine", &api.Pod{ TypeMeta: api.TypeMeta{ID: "foobar"}, DesiredState: api.PodState{ Manifest: api.ContainerManifest{ @@ -44,19 +44,19 @@ func TestMakeManifestNoServices(t *testing.T) { }, }) if err != nil { - t.Errorf("unexpected error: %v", err) + t.Fatalf("unexpected error: %v", err) } - container := manifest.Containers[0] + container := pod.Spec.Containers[0] if len(container.Env) != 0 { - t.Errorf("Expected zero env vars, got: %#v", manifest) + t.Errorf("Expected zero env vars, got: %#v", pod) } - if manifest.ID != "foobar" { - t.Errorf("Failed to assign ID to manifest: %#v", manifest.ID) + if pod.ID != "foobar" { + t.Errorf("Failed to assign ID to pod: %#v", pod.ID) } } -func TestMakeManifestServices(t *testing.T) { +func TestMakeBoundPodServices(t *testing.T) { registry := registrytest.ServiceRegistry{ List: api.ServiceList{ Items: []api.Service{ @@ -72,11 +72,11 @@ func TestMakeManifestServices(t *testing.T) { }, }, } - factory := &BasicManifestFactory{ + factory := &BasicBoundPodFactory{ ServiceRegistry: ®istry, } - manifest, err := factory.MakeManifest("machine", api.Pod{ + pod, err := factory.MakeBoundPod("machine", &api.Pod{ DesiredState: api.PodState{ Manifest: api.ContainerManifest{ Containers: []api.Container{ @@ -88,10 +88,10 @@ func TestMakeManifestServices(t *testing.T) { }, }) if err != nil { - t.Errorf("unexpected error: %v", err) + t.Fatalf("unexpected error: %v", err) } - container := manifest.Containers[0] + container := pod.Spec.Containers[0] envs := []api.EnvVar{ { Name: "TEST_SERVICE_HOST", @@ -123,8 +123,7 @@ func TestMakeManifestServices(t *testing.T) { }, } if len(container.Env) != len(envs) { - t.Errorf("Expected %d env vars, got %d: %#v", len(envs), len(container.Env), manifest) - return + t.Fatalf("Expected %d env vars, got %d: %#v", len(envs), len(container.Env), pod) } for ix := range container.Env { if !reflect.DeepEqual(envs[ix], container.Env[ix]) { @@ -133,7 +132,7 @@ func TestMakeManifestServices(t *testing.T) { } } -func TestMakeManifestServicesExistingEnvVar(t *testing.T) { +func TestMakeBoundPodServicesExistingEnvVar(t *testing.T) { registry := registrytest.ServiceRegistry{ List: api.ServiceList{ Items: []api.Service{ @@ -149,11 +148,11 @@ func TestMakeManifestServicesExistingEnvVar(t *testing.T) { }, }, } - factory := &BasicManifestFactory{ + factory := &BasicBoundPodFactory{ ServiceRegistry: ®istry, } - manifest, err := factory.MakeManifest("machine", api.Pod{ + pod, err := factory.MakeBoundPod("machine", &api.Pod{ DesiredState: api.PodState{ Manifest: api.ContainerManifest{ Containers: []api.Container{ @@ -170,10 +169,10 @@ func TestMakeManifestServicesExistingEnvVar(t *testing.T) { }, }) if err != nil { - t.Errorf("unexpected error: %v", err) + t.Fatalf("unexpected error: %v", err) } - container := manifest.Containers[0] + container := pod.Spec.Containers[0] envs := []api.EnvVar{ { @@ -210,8 +209,7 @@ func TestMakeManifestServicesExistingEnvVar(t *testing.T) { }, } if len(container.Env) != len(envs) { - t.Errorf("Expected %d env vars, got: %#v", len(envs), manifest) - return + t.Fatalf("Expected %d env vars, got: %#v", len(envs), pod) } for ix := range container.Env { if !reflect.DeepEqual(envs[ix], container.Env[ix]) {