Merge pull request #1840 from eparis/bind_pods

Bind pods
This commit is contained in:
Tim Hockin 2014-10-16 16:58:03 -07:00
commit a1dc200fd4
34 changed files with 941 additions and 565 deletions

View File

@ -17,9 +17,64 @@ limitations under the License.
package api
import (
"github.com/GoogleCloudPlatform/kubernetes/pkg/conversion"
"github.com/GoogleCloudPlatform/kubernetes/pkg/runtime"
)
// Codec is the identity codec for this package - it can only convert itself
// to itself.
var Codec = runtime.CodecFor(Scheme, "")
func init() {
Scheme.AddConversionFuncs(
// Convert ContainerManifest to BoundPod
func(in *ContainerManifest, out *BoundPod, s conversion.Scope) error {
out.Spec.Containers = in.Containers
out.Spec.Volumes = in.Volumes
out.Spec.RestartPolicy = in.RestartPolicy
out.ID = in.ID
out.UID = in.UUID
return nil
},
func(in *BoundPod, out *ContainerManifest, s conversion.Scope) error {
out.Containers = in.Spec.Containers
out.Volumes = in.Spec.Volumes
out.RestartPolicy = in.Spec.RestartPolicy
out.Version = "v1beta2"
out.ID = in.ID
out.UUID = in.UID
return nil
},
func(in *ContainerManifestList, out *BoundPods, s conversion.Scope) error {
if err := s.Convert(&in.Items, &out.Items, 0); err != nil {
return err
}
for i := range out.Items {
item := &out.Items[i]
item.ResourceVersion = in.ResourceVersion
}
return nil
},
func(in *BoundPods, out *ContainerManifestList, s conversion.Scope) error {
if err := s.Convert(&in.Items, &out.Items, 0); err != nil {
return err
}
out.ResourceVersion = in.ResourceVersion
return nil
},
// Convert Pod to BoundPod
func(in *Pod, out *BoundPod, s conversion.Scope) error {
if err := s.Convert(&in.DesiredState.Manifest, out, 0); err != nil {
return err
}
// Only copy a subset of fields, and override manifest attributes with the pod
// metadata
out.UID = in.UID
out.ID = in.ID
out.Namespace = in.Namespace
out.CreationTimestamp = in.CreationTimestamp
return nil
},
)
}

View File

@ -40,7 +40,9 @@ func init() {
&Binding{},
&Event{},
&EventList{},
&ContainerManifest{},
&ContainerManifestList{},
&BoundPod{},
&BoundPods{},
)
}
@ -61,5 +63,7 @@ func (*ServerOp) IsAnAPIObject() {}
func (*ServerOpList) IsAnAPIObject() {}
func (*Event) IsAnAPIObject() {}
func (*EventList) IsAnAPIObject() {}
func (*ContainerManifest) IsAnAPIObject() {}
func (*ContainerManifestList) IsAnAPIObject() {}
func (*BoundPod) IsAnAPIObject() {}
func (*BoundPods) IsAnAPIObject() {}

View File

@ -170,6 +170,10 @@ func TestTypes(t *testing.T) {
t.Errorf("Couldn't make a %v? %v", kind, err)
continue
}
if _, err := runtime.FindTypeMeta(item); err != nil {
t.Logf("%s is not a TypeMeta and cannot be round tripped: %v", kind, err)
continue
}
runTest(t, v1beta1.Codec, item)
runTest(t, v1beta2.Codec, item)
runTest(t, api.Codec, item)

View File

@ -42,7 +42,9 @@ func init() {
&ServerOpList{},
&Event{},
&EventList{},
&ContainerManifest{},
&ContainerManifestList{},
&BoundPod{},
&BoundPods{},
)
}
@ -63,5 +65,7 @@ func (*ServerOp) IsAnAPIObject() {}
func (*ServerOpList) IsAnAPIObject() {}
func (*Event) IsAnAPIObject() {}
func (*EventList) IsAnAPIObject() {}
func (*ContainerManifest) IsAnAPIObject() {}
func (*ContainerManifestList) IsAnAPIObject() {}
func (*BoundPod) IsAnAPIObject() {}
func (*BoundPods) IsAnAPIObject() {}

View File

@ -42,7 +42,9 @@ func init() {
&ServerOpList{},
&Event{},
&EventList{},
&ContainerManifest{},
&ContainerManifestList{},
&BoundPod{},
&BoundPods{},
)
}
@ -63,5 +65,7 @@ func (*ServerOp) IsAnAPIObject() {}
func (*ServerOpList) IsAnAPIObject() {}
func (*Event) IsAnAPIObject() {}
func (*EventList) IsAnAPIObject() {}
func (*ContainerManifest) IsAnAPIObject() {}
func (*ContainerManifestList) IsAnAPIObject() {}
func (*BoundPod) IsAnAPIObject() {}
func (*BoundPods) IsAnAPIObject() {}

View File

@ -437,3 +437,25 @@ func ValidateReadOnlyPersistentDisks(volumes []api.Volume) errs.ErrorList {
}
return allErrs
}
// ValidateBoundPod tests if required fields on a bound pod are set.
func ValidateBoundPod(pod *api.BoundPod) (errors []error) {
if !util.IsDNSSubdomain(pod.ID) {
errors = append(errors, errs.NewFieldInvalid("id", pod.ID))
}
if !util.IsDNSSubdomain(pod.Namespace) {
errors = append(errors, errs.NewFieldInvalid("namespace", pod.Namespace))
}
containerManifest := &api.ContainerManifest{
Version: "v1beta2",
ID: pod.ID,
UUID: pod.UID,
Containers: pod.Spec.Containers,
Volumes: pod.Spec.Volumes,
RestartPolicy: pod.Spec.RestartPolicy,
}
if errs := ValidateManifest(containerManifest); len(errs) != 0 {
errors = append(errors, errs...)
}
return errors
}

View File

@ -823,3 +823,21 @@ func TestValidateReplicationController(t *testing.T) {
}
}
}
func TestValidateBoundPodNoName(t *testing.T) {
errorCases := map[string]api.BoundPod{
// manifest is tested in api/validation_test.go, ensure it is invoked
"empty version": {TypeMeta: api.TypeMeta{ID: "test"}, Spec: api.PodSpec{Containers: []api.Container{{Name: ""}}}},
// Name
"zero-length name": {TypeMeta: api.TypeMeta{ID: ""}},
"name > 255 characters": {TypeMeta: api.TypeMeta{ID: strings.Repeat("a", 256)}},
"name not a DNS subdomain": {TypeMeta: api.TypeMeta{ID: "a.b.c."}},
"name with underscore": {TypeMeta: api.TypeMeta{ID: "a_b_c"}},
}
for k, v := range errorCases {
if errs := ValidateBoundPod(&v); len(errs) == 0 {
t.Errorf("expected failure for %s", k)
}
}
}

View File

@ -33,10 +33,12 @@ import (
)
func NewTestEtcdRegistry(client tools.EtcdClient) *etcdregistry.Registry {
registry := etcdregistry.NewRegistry(tools.EtcdHelper{client, latest.Codec, tools.RuntimeVersionAdapter{latest.ResourceVersioner}},
&pod.BasicManifestFactory{
registry := etcdregistry.NewRegistry(
tools.EtcdHelper{client, latest.Codec, tools.RuntimeVersionAdapter{latest.ResourceVersioner}},
&pod.BasicBoundPodFactory{
ServiceRegistry: &registrytest.ServiceRegistry{},
})
},
)
return registry
}

View File

@ -20,8 +20,8 @@ import (
"github.com/GoogleCloudPlatform/kubernetes/pkg/api"
)
// Allowed returns true if manifests is a collection of manifests
// Allowed returns true if pods is a collection of bound pods
// which can run without conflict on a single minion.
func Allowed(manifests []api.ContainerManifest) bool {
return !PortsConflict(manifests)
func Allowed(pods []api.BoundPod) bool {
return !PortsConflict(pods)
}

View File

@ -30,27 +30,27 @@ func containerWithHostPorts(ports ...int) api.Container {
return c
}
func manifestWithContainers(containers ...api.Container) api.ContainerManifest {
m := api.ContainerManifest{}
func podWithContainers(containers ...api.Container) api.BoundPod {
m := api.BoundPod{}
for _, c := range containers {
m.Containers = append(m.Containers, c)
m.Spec.Containers = append(m.Spec.Containers, c)
}
return m
}
func TestAllowed(t *testing.T) {
table := []struct {
allowed bool
manifests []api.ContainerManifest
allowed bool
pods []api.BoundPod
}{
{
allowed: true,
manifests: []api.ContainerManifest{
manifestWithContainers(
pods: []api.BoundPod{
podWithContainers(
containerWithHostPorts(1, 2, 3),
containerWithHostPorts(4, 5, 6),
),
manifestWithContainers(
podWithContainers(
containerWithHostPorts(7, 8, 9),
containerWithHostPorts(10, 11, 12),
),
@ -58,12 +58,12 @@ func TestAllowed(t *testing.T) {
},
{
allowed: true,
manifests: []api.ContainerManifest{
manifestWithContainers(
pods: []api.BoundPod{
podWithContainers(
containerWithHostPorts(0, 0),
containerWithHostPorts(0, 0),
),
manifestWithContainers(
podWithContainers(
containerWithHostPorts(0, 0),
containerWithHostPorts(0, 0),
),
@ -71,19 +71,19 @@ func TestAllowed(t *testing.T) {
},
{
allowed: false,
manifests: []api.ContainerManifest{
manifestWithContainers(
pods: []api.BoundPod{
podWithContainers(
containerWithHostPorts(3, 3),
),
},
},
{
allowed: false,
manifests: []api.ContainerManifest{
manifestWithContainers(
pods: []api.BoundPod{
podWithContainers(
containerWithHostPorts(6),
),
manifestWithContainers(
podWithContainers(
containerWithHostPorts(6),
),
},
@ -91,8 +91,8 @@ func TestAllowed(t *testing.T) {
}
for _, item := range table {
if e, a := item.allowed, Allowed(item.manifests); e != a {
t.Errorf("Expected %v, got %v: \n%v\v", e, a, item.manifests)
if e, a := item.allowed, Allowed(item.pods); e != a {
t.Errorf("Expected %v, got %v: \n%v\v", e, a, item.pods)
}
}
}

View File

@ -22,10 +22,10 @@ import (
// PortsConflict returns true iff two containers attempt to expose
// the same host port.
func PortsConflict(manifests []api.ContainerManifest) bool {
func PortsConflict(pods []api.BoundPod) bool {
hostPorts := map[int]struct{}{}
for _, manifest := range manifests {
for _, container := range manifest.Containers {
for _, pod := range pods {
for _, container := range pod.Spec.Containers {
for _, port := range container.Ports {
if port.HostPort == 0 {
continue

View File

@ -21,7 +21,9 @@ import (
"reflect"
"sync"
"github.com/GoogleCloudPlatform/kubernetes/pkg/api"
apierrs "github.com/GoogleCloudPlatform/kubernetes/pkg/api/errors"
"github.com/GoogleCloudPlatform/kubernetes/pkg/api/validation"
"github.com/GoogleCloudPlatform/kubernetes/pkg/kubelet"
"github.com/GoogleCloudPlatform/kubernetes/pkg/util"
"github.com/GoogleCloudPlatform/kubernetes/pkg/util/config"
@ -89,7 +91,7 @@ func (c *PodConfig) Sync() {
type podStorage struct {
podLock sync.RWMutex
// map of source name to pod name to pod reference
pods map[string]map[string]*kubelet.Pod
pods map[string]map[string]*api.BoundPod
mode PodConfigNotificationMode
// ensures that updates are delivered in strict order
@ -103,7 +105,7 @@ type podStorage struct {
// TODO: allow initialization of the current state of the store with snapshotted version.
func newPodStorage(updates chan<- kubelet.PodUpdate, mode PodConfigNotificationMode) *podStorage {
return &podStorage{
pods: make(map[string]map[string]*kubelet.Pod),
pods: make(map[string]map[string]*api.BoundPod),
mode: mode,
updates: updates,
}
@ -136,12 +138,12 @@ func (s *podStorage) Merge(source string, change interface{}) error {
s.updates <- *updates
}
if len(deletes.Pods) > 0 || len(adds.Pods) > 0 {
s.updates <- kubelet.PodUpdate{s.MergedState().([]kubelet.Pod), kubelet.SET}
s.updates <- kubelet.PodUpdate{s.MergedState().([]api.BoundPod), kubelet.SET}
}
case PodConfigNotificationSnapshot:
if len(updates.Pods) > 0 || len(deletes.Pods) > 0 || len(adds.Pods) > 0 {
s.updates <- kubelet.PodUpdate{s.MergedState().([]kubelet.Pod), kubelet.SET}
s.updates <- kubelet.PodUpdate{s.MergedState().([]api.BoundPod), kubelet.SET}
}
default:
@ -161,7 +163,7 @@ func (s *podStorage) merge(source string, change interface{}) (adds, updates, de
pods := s.pods[source]
if pods == nil {
pods = make(map[string]*kubelet.Pod)
pods = make(map[string]*api.BoundPod)
}
update := change.(kubelet.PodUpdate)
@ -175,11 +177,11 @@ func (s *podStorage) merge(source string, change interface{}) (adds, updates, de
filtered := filterInvalidPods(update.Pods, source)
for _, ref := range filtered {
name := ref.Name
name := podUniqueName(ref)
if existing, found := pods[name]; found {
if !reflect.DeepEqual(existing.Manifest, ref.Manifest) {
if !reflect.DeepEqual(existing.Spec, ref.Spec) {
// this is an update
existing.Manifest = ref.Manifest
existing.Spec = ref.Spec
updates.Pods = append(updates.Pods, *existing)
continue
}
@ -187,7 +189,10 @@ func (s *podStorage) merge(source string, change interface{}) (adds, updates, de
continue
}
// this is an add
ref.Namespace = source
if ref.Annotations == nil {
ref.Annotations = make(map[string]string)
}
ref.Annotations[kubelet.ConfigSourceAnnotationKey] = source
pods[name] = ref
adds.Pods = append(adds.Pods, *ref)
}
@ -195,7 +200,7 @@ func (s *podStorage) merge(source string, change interface{}) (adds, updates, de
case kubelet.REMOVE:
glog.V(4).Infof("Removing a pod %v", update)
for _, value := range update.Pods {
name := value.Name
name := podUniqueName(&value)
if existing, found := pods[name]; found {
// this is a delete
delete(pods, name)
@ -209,23 +214,26 @@ func (s *podStorage) merge(source string, change interface{}) (adds, updates, de
glog.V(4).Infof("Setting pods for source %s : %v", source, update)
// Clear the old map entries by just creating a new map
oldPods := pods
pods = make(map[string]*kubelet.Pod)
pods = make(map[string]*api.BoundPod)
filtered := filterInvalidPods(update.Pods, source)
for _, ref := range filtered {
name := ref.Name
name := podUniqueName(ref)
if existing, found := oldPods[name]; found {
pods[name] = existing
if !reflect.DeepEqual(existing.Manifest, ref.Manifest) {
if !reflect.DeepEqual(existing.Spec, ref.Spec) {
// this is an update
existing.Manifest = ref.Manifest
existing.Spec = ref.Spec
updates.Pods = append(updates.Pods, *existing)
continue
}
// this is a no-op
continue
}
ref.Namespace = source
if ref.Annotations == nil {
ref.Annotations = make(map[string]string)
}
ref.Annotations[kubelet.ConfigSourceAnnotationKey] = source
pods[name] = ref
adds.Pods = append(adds.Pods, *ref)
}
@ -246,20 +254,21 @@ func (s *podStorage) merge(source string, change interface{}) (adds, updates, de
return adds, updates, deletes
}
func filterInvalidPods(pods []kubelet.Pod, source string) (filtered []*kubelet.Pod) {
func filterInvalidPods(pods []api.BoundPod, source string) (filtered []*api.BoundPod) {
names := util.StringSet{}
for i := range pods {
var errors []error
if names.Has(pods[i].Name) {
errors = append(errors, apierrs.NewFieldDuplicate("name", pods[i].Name))
name := podUniqueName(&pods[i])
if names.Has(name) {
errors = append(errors, apierrs.NewFieldDuplicate("name", pods[i].ID))
} else {
names.Insert(pods[i].Name)
names.Insert(name)
}
if errs := kubelet.ValidatePod(&pods[i]); len(errs) != 0 {
if errs := validation.ValidateBoundPod(&pods[i]); len(errs) != 0 {
errors = append(errors, errs...)
}
if len(errors) > 0 {
glog.Warningf("Pod %d (%s) from %s failed validation, ignoring: %v", i+1, pods[i].Name, source, errors)
glog.Warningf("Pod %d (%s) from %s failed validation, ignoring: %v", i+1, pods[i].ID, source, errors)
continue
}
filtered = append(filtered, &pods[i])
@ -271,20 +280,32 @@ func filterInvalidPods(pods []kubelet.Pod, source string) (filtered []*kubelet.P
func (s *podStorage) Sync() {
s.updateLock.Lock()
defer s.updateLock.Unlock()
s.updates <- kubelet.PodUpdate{s.MergedState().([]kubelet.Pod), kubelet.SET}
s.updates <- kubelet.PodUpdate{s.MergedState().([]api.BoundPod), kubelet.SET}
}
// Object implements config.Accessor
func (s *podStorage) MergedState() interface{} {
s.podLock.RLock()
defer s.podLock.RUnlock()
pods := make([]kubelet.Pod, 0)
for source, sourcePods := range s.pods {
var pods []api.BoundPod
for _, sourcePods := range s.pods {
for _, podRef := range sourcePods {
pod := *podRef
pod.Namespace = source
pods = append(pods, pod)
pod, err := api.Scheme.Copy(podRef)
if err != nil {
glog.Errorf("unable to copy pod: %v", err)
}
pods = append(pods, *pod.(*api.BoundPod))
}
}
return pods
}
// podUniqueName returns a value for a given pod that is unique across a source,
// which is the combination of namespace and ID.
func podUniqueName(pod *api.BoundPod) string {
namespace := pod.Namespace
if len(namespace) == 0 {
namespace = api.NamespaceDefault
}
return fmt.Sprintf("%s.%s", pod.ID, namespace)
}

View File

@ -18,6 +18,7 @@ package config
import (
"reflect"
"sort"
"testing"
"github.com/GoogleCloudPlatform/kubernetes/pkg/api"
@ -32,7 +33,7 @@ func expectEmptyChannel(t *testing.T, ch <-chan interface{}) {
}
}
type sortedPods []kubelet.Pod
type sortedPods []api.BoundPod
func (s sortedPods) Len() int {
return len(s)
@ -41,25 +42,27 @@ func (s sortedPods) Swap(i, j int) {
s[i], s[j] = s[j], s[i]
}
func (s sortedPods) Less(i, j int) bool {
if s[i].Namespace < s[j].Namespace {
return true
}
return s[i].Name < s[j].Name
return s[i].ID < s[j].ID
}
func CreateValidPod(name, namespace string) kubelet.Pod {
return kubelet.Pod{
Name: name,
Namespace: namespace,
Manifest: api.ContainerManifest{
Version: "v1beta1",
func CreateValidPod(name, namespace, source string) api.BoundPod {
return api.BoundPod{
TypeMeta: api.TypeMeta{
ID: name,
Namespace: namespace,
Annotations: map[string]string{kubelet.ConfigSourceAnnotationKey: source},
},
Spec: api.PodSpec{
RestartPolicy: api.RestartPolicy{Always: &api.RestartPolicyAlways{}},
},
}
}
func CreatePodUpdate(op kubelet.PodOperation, pods ...kubelet.Pod) kubelet.PodUpdate {
newPods := make([]kubelet.Pod, len(pods))
func CreatePodUpdate(op kubelet.PodOperation, pods ...api.BoundPod) kubelet.PodUpdate {
if len(pods) == 0 {
return kubelet.PodUpdate{Op: op}
}
newPods := make([]api.BoundPod, len(pods))
for i := range pods {
newPods[i] = pods[i]
}
@ -76,6 +79,7 @@ func createPodConfigTester(mode PodConfigNotificationMode) (chan<- interface{},
func expectPodUpdate(t *testing.T, ch <-chan kubelet.PodUpdate, expected ...kubelet.PodUpdate) {
for i := range expected {
update := <-ch
sort.Sort(sortedPods(update.Pods))
if !reflect.DeepEqual(expected[i], update) {
t.Fatalf("Expected %#v, Got %#v", expected[i], update)
}
@ -95,24 +99,63 @@ func TestNewPodAdded(t *testing.T) {
channel, ch, config := createPodConfigTester(PodConfigNotificationIncremental)
// see an update
podUpdate := CreatePodUpdate(kubelet.ADD, CreateValidPod("foo", ""))
podUpdate := CreatePodUpdate(kubelet.ADD, CreateValidPod("foo", "new", ""))
channel <- podUpdate
expectPodUpdate(t, ch, CreatePodUpdate(kubelet.ADD, CreateValidPod("foo", "test")))
expectPodUpdate(t, ch, CreatePodUpdate(kubelet.ADD, CreateValidPod("foo", "new", "test")))
config.Sync()
expectPodUpdate(t, ch, CreatePodUpdate(kubelet.SET, CreateValidPod("foo", "test")))
expectPodUpdate(t, ch, CreatePodUpdate(kubelet.SET, CreateValidPod("foo", "new", "test")))
}
func TestNewPodAddedInvalidNamespace(t *testing.T) {
channel, ch, config := createPodConfigTester(PodConfigNotificationIncremental)
// see an update
podUpdate := CreatePodUpdate(kubelet.ADD, CreateValidPod("foo", "", ""))
channel <- podUpdate
config.Sync()
expectPodUpdate(t, ch, CreatePodUpdate(kubelet.SET))
}
func TestNewPodAddedDefaultNamespace(t *testing.T) {
channel, ch, config := createPodConfigTester(PodConfigNotificationIncremental)
// see an update
podUpdate := CreatePodUpdate(kubelet.ADD, CreateValidPod("foo", "default", ""))
channel <- podUpdate
expectPodUpdate(t, ch, CreatePodUpdate(kubelet.ADD, CreateValidPod("foo", "default", "test")))
config.Sync()
expectPodUpdate(t, ch, CreatePodUpdate(kubelet.SET, CreateValidPod("foo", "default", "test")))
}
func TestNewPodAddedDifferentNamespaces(t *testing.T) {
channel, ch, config := createPodConfigTester(PodConfigNotificationIncremental)
// see an update
podUpdate := CreatePodUpdate(kubelet.ADD, CreateValidPod("foo", "default", ""))
channel <- podUpdate
expectPodUpdate(t, ch, CreatePodUpdate(kubelet.ADD, CreateValidPod("foo", "default", "test")))
// see an update in another namespace
podUpdate = CreatePodUpdate(kubelet.ADD, CreateValidPod("foo", "new", ""))
channel <- podUpdate
expectPodUpdate(t, ch, CreatePodUpdate(kubelet.ADD, CreateValidPod("foo", "new", "test")))
config.Sync()
expectPodUpdate(t, ch, CreatePodUpdate(kubelet.SET, CreateValidPod("foo", "default", "test"), CreateValidPod("foo", "new", "test")))
}
func TestInvalidPodFiltered(t *testing.T) {
channel, ch, _ := createPodConfigTester(PodConfigNotificationIncremental)
// see an update
podUpdate := CreatePodUpdate(kubelet.ADD, CreateValidPod("foo", ""))
podUpdate := CreatePodUpdate(kubelet.ADD, CreateValidPod("foo", "new", ""))
channel <- podUpdate
expectPodUpdate(t, ch, CreatePodUpdate(kubelet.ADD, CreateValidPod("foo", "test")))
expectPodUpdate(t, ch, CreatePodUpdate(kubelet.ADD, CreateValidPod("foo", "new", "test")))
// add an invalid update
podUpdate = CreatePodUpdate(kubelet.UPDATE, kubelet.Pod{Name: "foo"})
podUpdate = CreatePodUpdate(kubelet.UPDATE, api.BoundPod{TypeMeta: api.TypeMeta{ID: "foo"}})
channel <- podUpdate
expectNoPodUpdate(t, ch)
}
@ -121,16 +164,16 @@ func TestNewPodAddedSnapshotAndUpdates(t *testing.T) {
channel, ch, config := createPodConfigTester(PodConfigNotificationSnapshotAndUpdates)
// see an set
podUpdate := CreatePodUpdate(kubelet.ADD, CreateValidPod("foo", ""))
podUpdate := CreatePodUpdate(kubelet.ADD, CreateValidPod("foo", "new", ""))
channel <- podUpdate
expectPodUpdate(t, ch, CreatePodUpdate(kubelet.SET, CreateValidPod("foo", "test")))
expectPodUpdate(t, ch, CreatePodUpdate(kubelet.SET, CreateValidPod("foo", "new", "test")))
config.Sync()
expectPodUpdate(t, ch, CreatePodUpdate(kubelet.SET, CreateValidPod("foo", "test")))
expectPodUpdate(t, ch, CreatePodUpdate(kubelet.SET, CreateValidPod("foo", "new", "test")))
// container updates are separated as UPDATE
pod := podUpdate.Pods[0]
pod.Manifest.Containers = []api.Container{{Name: "bar", Image: "test"}}
pod.Spec.Containers = []api.Container{{Name: "bar", Image: "test"}}
channel <- CreatePodUpdate(kubelet.ADD, pod)
expectPodUpdate(t, ch, CreatePodUpdate(kubelet.UPDATE, pod))
}
@ -139,16 +182,16 @@ func TestNewPodAddedSnapshot(t *testing.T) {
channel, ch, config := createPodConfigTester(PodConfigNotificationSnapshot)
// see an set
podUpdate := CreatePodUpdate(kubelet.ADD, CreateValidPod("foo", ""))
podUpdate := CreatePodUpdate(kubelet.ADD, CreateValidPod("foo", "new", ""))
channel <- podUpdate
expectPodUpdate(t, ch, CreatePodUpdate(kubelet.SET, CreateValidPod("foo", "test")))
expectPodUpdate(t, ch, CreatePodUpdate(kubelet.SET, CreateValidPod("foo", "new", "test")))
config.Sync()
expectPodUpdate(t, ch, CreatePodUpdate(kubelet.SET, CreateValidPod("foo", "test")))
expectPodUpdate(t, ch, CreatePodUpdate(kubelet.SET, CreateValidPod("foo", "new", "test")))
// container updates are separated as UPDATE
pod := podUpdate.Pods[0]
pod.Manifest.Containers = []api.Container{{Name: "bar", Image: "test"}}
pod.Spec.Containers = []api.Container{{Name: "bar", Image: "test"}}
channel <- CreatePodUpdate(kubelet.ADD, pod)
expectPodUpdate(t, ch, CreatePodUpdate(kubelet.SET, pod))
}
@ -157,21 +200,21 @@ func TestNewPodAddedUpdatedRemoved(t *testing.T) {
channel, ch, _ := createPodConfigTester(PodConfigNotificationIncremental)
// should register an add
podUpdate := CreatePodUpdate(kubelet.ADD, CreateValidPod("foo", ""))
podUpdate := CreatePodUpdate(kubelet.ADD, CreateValidPod("foo", "new", ""))
channel <- podUpdate
expectPodUpdate(t, ch, CreatePodUpdate(kubelet.ADD, CreateValidPod("foo", "test")))
expectPodUpdate(t, ch, CreatePodUpdate(kubelet.ADD, CreateValidPod("foo", "new", "test")))
// should ignore ADDs that are identical
expectNoPodUpdate(t, ch)
// an kubelet.ADD should be converted to kubelet.UPDATE
pod := CreateValidPod("foo", "test")
pod.Manifest.Containers = []api.Container{{Name: "bar", Image: "test"}}
pod := CreateValidPod("foo", "new", "test")
pod.Spec.Containers = []api.Container{{Name: "bar", Image: "test"}}
podUpdate = CreatePodUpdate(kubelet.ADD, pod)
channel <- podUpdate
expectPodUpdate(t, ch, CreatePodUpdate(kubelet.UPDATE, pod))
podUpdate = CreatePodUpdate(kubelet.REMOVE, kubelet.Pod{Name: "foo"})
podUpdate = CreatePodUpdate(kubelet.REMOVE, api.BoundPod{TypeMeta: api.TypeMeta{ID: "foo", Namespace: "new"}})
channel <- podUpdate
expectPodUpdate(t, ch, CreatePodUpdate(kubelet.REMOVE, pod))
}
@ -180,20 +223,20 @@ func TestNewPodAddedUpdatedSet(t *testing.T) {
channel, ch, _ := createPodConfigTester(PodConfigNotificationIncremental)
// should register an add
podUpdate := CreatePodUpdate(kubelet.ADD, CreateValidPod("foo", ""), CreateValidPod("foo2", ""), CreateValidPod("foo3", ""))
podUpdate := CreatePodUpdate(kubelet.ADD, CreateValidPod("foo", "new", ""), CreateValidPod("foo2", "new", ""), CreateValidPod("foo3", "new", ""))
channel <- podUpdate
expectPodUpdate(t, ch, CreatePodUpdate(kubelet.ADD, CreateValidPod("foo", "test"), CreateValidPod("foo2", "test"), CreateValidPod("foo3", "test")))
expectPodUpdate(t, ch, CreatePodUpdate(kubelet.ADD, CreateValidPod("foo", "new", "test"), CreateValidPod("foo2", "new", "test"), CreateValidPod("foo3", "new", "test")))
// should ignore ADDs that are identical
expectNoPodUpdate(t, ch)
// should be converted to an kubelet.ADD, kubelet.REMOVE, and kubelet.UPDATE
pod := CreateValidPod("foo2", "test")
pod.Manifest.Containers = []api.Container{{Name: "bar", Image: "test"}}
podUpdate = CreatePodUpdate(kubelet.SET, pod, CreateValidPod("foo3", ""), CreateValidPod("foo4", "test"))
pod := CreateValidPod("foo2", "new", "test")
pod.Spec.Containers = []api.Container{{Name: "bar", Image: "test"}}
podUpdate = CreatePodUpdate(kubelet.SET, pod, CreateValidPod("foo3", "new", ""), CreateValidPod("foo4", "new", "test"))
channel <- podUpdate
expectPodUpdate(t, ch,
CreatePodUpdate(kubelet.REMOVE, CreateValidPod("foo", "test")),
CreatePodUpdate(kubelet.ADD, CreateValidPod("foo4", "test")),
CreatePodUpdate(kubelet.REMOVE, CreateValidPod("foo", "new", "test")),
CreatePodUpdate(kubelet.ADD, CreateValidPod("foo4", "new", "test")),
CreatePodUpdate(kubelet.UPDATE, pod))
}

View File

@ -21,6 +21,7 @@ import (
"errors"
"fmt"
"path"
"strconv"
"time"
"github.com/GoogleCloudPlatform/kubernetes/pkg/api"
@ -33,7 +34,7 @@ import (
)
func EtcdKeyForHost(hostname string) string {
return path.Join("/", "registry", "hosts", hostname, "kubelet")
return path.Join("/", "registry", "nodes", hostname, "boundpods")
}
type SourceEtcd struct {
@ -86,26 +87,27 @@ func (s *SourceEtcd) run() {
// eventToPods takes a watch.Event object, and turns it into a structured list of pods.
// It returns a list of containers, or an error if one occurs.
func eventToPods(ev watch.Event) ([]kubelet.Pod, error) {
pods := []kubelet.Pod{}
manifests, ok := ev.Object.(*api.ContainerManifestList)
func eventToPods(ev watch.Event) ([]api.BoundPod, error) {
pods := []api.BoundPod{}
boundPods, ok := ev.Object.(*api.BoundPods)
if !ok {
return pods, errors.New("unable to parse response as ContainerManifestList")
return pods, errors.New("unable to parse response as BoundPods")
}
for i, manifest := range manifests.Items {
name := manifest.ID
if name == "" {
name = fmt.Sprintf("%d", i+1)
for i, pod := range boundPods.Items {
if len(pod.ID) == 0 {
pod.ID = fmt.Sprintf("%d", i+1)
}
pods = append(pods, kubelet.Pod{
Name: name,
Manifest: manifest})
// TODO: generate random UID if not present
if pod.UID == "" && !pod.CreationTimestamp.IsZero() {
pod.UID = strconv.FormatInt(pod.CreationTimestamp.Unix(), 10)
}
// Backwards compatibility with old api servers
if len(pod.Namespace) == 0 {
pod.Namespace = api.NamespaceDefault
}
pods = append(pods, pod)
}
return pods, nil
}
func makeContainerKey(machine string) string {
return "/registry/hosts/" + machine + "/kubelet"
}

View File

@ -21,53 +21,52 @@ import (
"testing"
"github.com/GoogleCloudPlatform/kubernetes/pkg/api"
"github.com/GoogleCloudPlatform/kubernetes/pkg/kubelet"
"github.com/GoogleCloudPlatform/kubernetes/pkg/watch"
)
func TestEventToPods(t *testing.T) {
tests := []struct {
input watch.Event
pods []kubelet.Pod
pods []api.BoundPod
fail bool
}{
{
input: watch.Event{Object: nil},
pods: []kubelet.Pod{},
pods: []api.BoundPod{},
fail: true,
},
{
input: watch.Event{Object: &api.ContainerManifestList{}},
pods: []kubelet.Pod{},
input: watch.Event{Object: &api.BoundPods{}},
pods: []api.BoundPod{},
fail: false,
},
{
input: watch.Event{
Object: &api.ContainerManifestList{
Items: []api.ContainerManifest{
{ID: "foo"},
{ID: "bar"},
Object: &api.BoundPods{
Items: []api.BoundPod{
{TypeMeta: api.TypeMeta{ID: "foo"}},
{TypeMeta: api.TypeMeta{ID: "bar"}},
},
},
},
pods: []kubelet.Pod{
{Name: "foo", Manifest: api.ContainerManifest{ID: "foo"}},
{Name: "bar", Manifest: api.ContainerManifest{ID: "bar"}},
pods: []api.BoundPod{
{TypeMeta: api.TypeMeta{ID: "foo", Namespace: "default"}, Spec: api.PodSpec{}},
{TypeMeta: api.TypeMeta{ID: "bar", Namespace: "default"}, Spec: api.PodSpec{}},
},
fail: false,
},
{
input: watch.Event{
Object: &api.ContainerManifestList{
Items: []api.ContainerManifest{
{ID: ""},
{ID: ""},
Object: &api.BoundPods{
Items: []api.BoundPod{
{TypeMeta: api.TypeMeta{ID: "1"}},
{TypeMeta: api.TypeMeta{ID: "2", Namespace: "foo"}},
},
},
},
pods: []kubelet.Pod{
{Name: "1", Manifest: api.ContainerManifest{ID: ""}},
{Name: "2", Manifest: api.ContainerManifest{ID: ""}},
pods: []api.BoundPod{
{TypeMeta: api.TypeMeta{ID: "1", Namespace: "default"}, Spec: api.PodSpec{}},
{TypeMeta: api.TypeMeta{ID: "2", Namespace: "foo"}, Spec: api.PodSpec{}},
},
fail: false,
},

View File

@ -29,8 +29,10 @@ import (
"strings"
"time"
"github.com/GoogleCloudPlatform/kubernetes/pkg/api"
"github.com/GoogleCloudPlatform/kubernetes/pkg/kubelet"
"github.com/GoogleCloudPlatform/kubernetes/pkg/util"
"github.com/golang/glog"
"gopkg.in/v1/yaml"
)
@ -79,7 +81,7 @@ func (s *SourceFile) extractFromPath() error {
if err != nil {
return err
}
s.updates <- kubelet.PodUpdate{[]kubelet.Pod{pod}, kubelet.SET}
s.updates <- kubelet.PodUpdate{[]api.BoundPod{pod}, kubelet.SET}
default:
return fmt.Errorf("path is not a directory or file")
@ -88,28 +90,29 @@ func (s *SourceFile) extractFromPath() error {
return nil
}
func extractFromDir(name string) ([]kubelet.Pod, error) {
pods := []kubelet.Pod{}
func extractFromDir(name string) ([]api.BoundPod, error) {
files, err := filepath.Glob(filepath.Join(name, "[^.]*"))
if err != nil {
return pods, err
return nil, err
}
if len(files) == 0 {
return nil, nil
}
sort.Strings(files)
pods := []api.BoundPod{}
for _, file := range files {
pod, err := extractFromFile(file)
if err != nil {
return []kubelet.Pod{}, err
return nil, err
}
pods = append(pods, pod)
}
return pods, nil
}
func extractFromFile(name string) (kubelet.Pod, error) {
var pod kubelet.Pod
func extractFromFile(name string) (api.BoundPod, error) {
var pod api.BoundPod
file, err := os.Open(name)
if err != nil {
@ -123,15 +126,23 @@ func extractFromFile(name string) (kubelet.Pod, error) {
return pod, err
}
if err := yaml.Unmarshal(data, &pod.Manifest); err != nil {
return pod, fmt.Errorf("could not unmarshal manifest: %v", err)
manifest := &api.ContainerManifest{}
// TODO: use api.Scheme.DecodeInto
if err := yaml.Unmarshal(data, manifest); err != nil {
return pod, err
}
podName := pod.Manifest.ID
if podName == "" {
podName = simpleSubdomainSafeHash(name)
if err := api.Scheme.Convert(manifest, &pod); err != nil {
return pod, err
}
pod.ID = simpleSubdomainSafeHash(name)
if len(pod.UID) == 0 {
pod.UID = simpleSubdomainSafeHash(name)
}
if len(pod.Namespace) == 0 {
pod.Namespace = api.NamespaceDefault
}
pod.Name = podName
return pod, nil
}

View File

@ -26,10 +26,57 @@ import (
"time"
"github.com/GoogleCloudPlatform/kubernetes/pkg/api"
"github.com/GoogleCloudPlatform/kubernetes/pkg/api/validation"
"github.com/GoogleCloudPlatform/kubernetes/pkg/kubelet"
"gopkg.in/v1/yaml"
)
func ExampleManifestAndPod(id string) (api.ContainerManifest, api.BoundPod) {
manifest := api.ContainerManifest{
ID: id,
UUID: "uid",
Containers: []api.Container{
{
Name: "c" + id,
Image: "foo",
},
},
Volumes: []api.Volume{
{
Name: "host-dir",
Source: &api.VolumeSource{
HostDir: &api.HostDir{"/dir/path"},
},
},
},
}
expectedPod := api.BoundPod{
TypeMeta: api.TypeMeta{
ID: id,
UID: "uid",
Namespace: "default",
},
Spec: api.PodSpec{
Containers: []api.Container{
{
Name: "c" + id,
Image: "foo",
},
},
Volumes: []api.Volume{
{
Name: "host-dir",
Source: &api.VolumeSource{
HostDir: &api.HostDir{"/dir/path"},
},
},
},
},
}
return manifest, expectedPod
}
func TestExtractFromNonExistentFile(t *testing.T) {
ch := make(chan interface{}, 1)
c := SourceFile{"/some/fake/file", ch}
@ -70,16 +117,18 @@ func TestReadFromFile(t *testing.T) {
select {
case got := <-ch:
update := got.(kubelet.PodUpdate)
expected := CreatePodUpdate(kubelet.SET, kubelet.Pod{
Name: "test",
Manifest: api.ContainerManifest{
ID: "test",
Version: "v1beta1",
expected := CreatePodUpdate(kubelet.SET, api.BoundPod{
TypeMeta: api.TypeMeta{
ID: simpleSubdomainSafeHash(file.Name()),
UID: simpleSubdomainSafeHash(file.Name()),
Namespace: "default",
},
Spec: api.PodSpec{
Containers: []api.Container{{Image: "test/image"}},
},
})
if !reflect.DeepEqual(expected, update) {
t.Errorf("Expected %#v, Got %#v", expected, update)
t.Fatalf("Expected %#v, Got %#v", expected, update)
}
case <-time.After(2 * time.Millisecond):
@ -95,29 +144,31 @@ func TestExtractFromBadDataFile(t *testing.T) {
c := SourceFile{file.Name(), ch}
err := c.extractFromPath()
if err == nil {
t.Errorf("Expected error")
t.Fatalf("Expected error")
}
expectEmptyChannel(t, ch)
}
func TestExtractFromValidDataFile(t *testing.T) {
manifest := api.ContainerManifest{ID: ""}
manifest, expectedPod := ExampleManifestAndPod("id")
text, err := json.Marshal(manifest)
if err != nil {
t.Errorf("Unexpected error: %v", err)
t.Fatalf("Unexpected error: %v", err)
}
file := writeTestFile(t, os.TempDir(), "test_pod_config", string(text))
defer os.Remove(file.Name())
expectedPod.ID = simpleSubdomainSafeHash(file.Name())
ch := make(chan interface{}, 1)
c := SourceFile{file.Name(), ch}
err = c.extractFromPath()
if err != nil {
t.Errorf("Unexpected error: %v", err)
t.Fatalf("Unexpected error: %v", err)
}
update := (<-ch).(kubelet.PodUpdate)
expected := CreatePodUpdate(kubelet.SET, kubelet.Pod{Name: simpleSubdomainSafeHash(file.Name()), Manifest: manifest})
expected := CreatePodUpdate(kubelet.SET, expectedPod)
if !reflect.DeepEqual(expected, update) {
t.Errorf("Expected %#v, Got %#v", expected, update)
}
@ -126,7 +177,7 @@ func TestExtractFromValidDataFile(t *testing.T) {
func TestExtractFromEmptyDir(t *testing.T) {
dirName, err := ioutil.TempDir("", "foo")
if err != nil {
t.Errorf("Unexpected error: %v", err)
t.Fatalf("Unexpected error: %v", err)
}
defer os.RemoveAll(dirName)
@ -134,7 +185,7 @@ func TestExtractFromEmptyDir(t *testing.T) {
c := SourceFile{dirName, ch}
err = c.extractFromPath()
if err != nil {
t.Errorf("Unexpected error: %v", err)
t.Fatalf("Unexpected error: %v", err)
}
update := (<-ch).(kubelet.PodUpdate)
@ -145,54 +196,55 @@ func TestExtractFromEmptyDir(t *testing.T) {
}
func TestExtractFromDir(t *testing.T) {
manifests := []api.ContainerManifest{
{Version: "v1beta1", Containers: []api.Container{{Name: "1", Image: "foo"}}},
{Version: "v1beta1", Containers: []api.Container{{Name: "2", Image: "bar"}}},
}
manifest, expectedPod := ExampleManifestAndPod("1")
manifest2, expectedPod2 := ExampleManifestAndPod("2")
manifests := []api.ContainerManifest{manifest, manifest2}
pods := []api.BoundPod{expectedPod, expectedPod2}
files := make([]*os.File, len(manifests))
dirName, err := ioutil.TempDir("", "foo")
if err != nil {
t.Errorf("Unexpected error: %v", err)
t.Fatalf("Unexpected error: %v", err)
}
for i, manifest := range manifests {
data, err := json.Marshal(manifest)
if err != nil {
t.Errorf("Unexpected error: %v", err)
continue
}
file, err := ioutil.TempFile(dirName, manifest.ID)
if err != nil {
t.Errorf("Unexpected error: %v", err)
continue
}
name := file.Name()
if err := file.Close(); err != nil {
t.Errorf("Unexpected error: %v", err)
continue
}
ioutil.WriteFile(name, data, 0755)
files[i] = file
pods[i].ID = simpleSubdomainSafeHash(name)
}
ch := make(chan interface{}, 1)
c := SourceFile{dirName, ch}
err = c.extractFromPath()
if err != nil {
t.Errorf("Unexpected error: %v", err)
t.Fatalf("Unexpected error: %v", err)
}
update := (<-ch).(kubelet.PodUpdate)
expected := CreatePodUpdate(
kubelet.SET,
kubelet.Pod{Name: simpleSubdomainSafeHash(files[0].Name()), Manifest: manifests[0]},
kubelet.Pod{Name: simpleSubdomainSafeHash(files[1].Name()), Manifest: manifests[1]},
)
expected := CreatePodUpdate(kubelet.SET, pods...)
sort.Sort(sortedPods(update.Pods))
sort.Sort(sortedPods(expected.Pods))
if !reflect.DeepEqual(expected, update) {
t.Errorf("Expected %#v, Got %#v", expected, update)
t.Fatalf("Expected %#v, Got %#v", expected, update)
}
for i := range update.Pods {
if errs := kubelet.ValidatePod(&update.Pods[i]); len(errs) != 0 {
if errs := validation.ValidateBoundPod(&update.Pods[i]); len(errs) != 0 {
t.Errorf("Expected no validation errors on %#v, Got %#v", update.Pods[i], errs)
}
}

View File

@ -28,6 +28,7 @@ import (
"github.com/GoogleCloudPlatform/kubernetes/pkg/api/validation"
"github.com/GoogleCloudPlatform/kubernetes/pkg/kubelet"
"github.com/GoogleCloudPlatform/kubernetes/pkg/util"
"github.com/golang/glog"
"gopkg.in/v1/yaml"
)
@ -79,6 +80,7 @@ func (s *SourceURL) extractFromURL() error {
// First try as if it's a single manifest
var manifest api.ContainerManifest
// TODO: should be api.Scheme.Decode
singleErr := yaml.Unmarshal(data, &manifest)
if singleErr == nil {
if errs := validation.ValidateManifest(&manifest); len(errs) > 0 {
@ -86,16 +88,23 @@ func (s *SourceURL) extractFromURL() error {
}
}
if singleErr == nil {
pod := kubelet.Pod{Name: manifest.ID, Manifest: manifest}
if pod.Name == "" {
pod.Name = "1"
pod := api.BoundPod{}
if err := api.Scheme.Convert(&manifest, &pod); err != nil {
return err
}
s.updates <- kubelet.PodUpdate{[]kubelet.Pod{pod}, kubelet.SET}
if len(pod.ID) == 0 {
pod.ID = "1"
}
if len(pod.Namespace) == 0 {
pod.Namespace = api.NamespaceDefault
}
s.updates <- kubelet.PodUpdate{[]api.BoundPod{pod}, kubelet.SET}
return nil
}
// That didn't work, so try an array of manifests.
var manifests []api.ContainerManifest
// TODO: should be api.Scheme.Decode
multiErr := yaml.Unmarshal(data, &manifests)
// We're not sure if the person reading the logs is going to care about the single or
// multiple manifest unmarshalling attempt, so we need to put both in the logs, as is
@ -113,18 +122,24 @@ func (s *SourceURL) extractFromURL() error {
// array of manifests (and no error) when unmarshaled as such. In that case,
// if the single manifest at least had a Version, we return the single-manifest
// error (if any).
if len(manifests) == 0 && manifest.Version != "" {
if len(manifests) == 0 && len(manifest.Version) != 0 {
return singleErr
}
pods := []kubelet.Pod{}
for i, manifest := range manifests {
pod := kubelet.Pod{Name: manifest.ID, Manifest: manifest}
if pod.Name == "" {
pod.Name = fmt.Sprintf("%d", i+1)
}
pods = append(pods, pod)
list := api.ContainerManifestList{Items: manifests}
boundPods := &api.BoundPods{}
if err := api.Scheme.Convert(&list, boundPods); err != nil {
return err
}
s.updates <- kubelet.PodUpdate{pods, kubelet.SET}
for i := range boundPods.Items {
pod := &boundPods.Items[i]
if len(pod.ID) == 0 {
pod.ID = fmt.Sprintf("%d", i+1)
}
if len(pod.Namespace) == 0 {
pod.Namespace = api.NamespaceDefault
}
}
s.updates <- kubelet.PodUpdate{boundPods.Items, kubelet.SET}
return nil
}

View File

@ -24,6 +24,7 @@ import (
"time"
"github.com/GoogleCloudPlatform/kubernetes/pkg/api"
"github.com/GoogleCloudPlatform/kubernetes/pkg/api/validation"
"github.com/GoogleCloudPlatform/kubernetes/pkg/kubelet"
"github.com/GoogleCloudPlatform/kubernetes/pkg/util"
)
@ -122,11 +123,12 @@ func TestExtractFromHTTP(t *testing.T) {
desc: "Single manifest",
manifests: api.ContainerManifest{Version: "v1beta1", ID: "foo"},
expected: CreatePodUpdate(kubelet.SET,
kubelet.Pod{
Name: "foo",
Manifest: api.ContainerManifest{
Version: "v1beta1",
ID: "foo",
api.BoundPod{
TypeMeta: api.TypeMeta{
ID: "foo",
Namespace: "default",
},
Spec: api.PodSpec{
RestartPolicy: api.RestartPolicy{Always: &api.RestartPolicyAlways{}},
},
}),
@ -138,13 +140,19 @@ func TestExtractFromHTTP(t *testing.T) {
{Version: "v1beta1", ID: "bar", Containers: []api.Container{{Name: "1", Image: "foo"}}},
},
expected: CreatePodUpdate(kubelet.SET,
kubelet.Pod{
Name: "1",
Manifest: api.ContainerManifest{Version: "v1beta1", ID: "", Containers: []api.Container{{Name: "1", Image: "foo"}}},
api.BoundPod{
TypeMeta: api.TypeMeta{
ID: "1",
Namespace: "default",
},
Spec: api.PodSpec{Containers: []api.Container{{Name: "1", Image: "foo"}}},
},
kubelet.Pod{
Name: "bar",
Manifest: api.ContainerManifest{Version: "v1beta1", ID: "bar", Containers: []api.Container{{Name: "1", Image: "foo"}}},
api.BoundPod{
TypeMeta: api.TypeMeta{
ID: "bar",
Namespace: "default",
},
Spec: api.PodSpec{Containers: []api.Container{{Name: "1", Image: "foo"}}},
}),
},
{
@ -167,13 +175,14 @@ func TestExtractFromHTTP(t *testing.T) {
c := SourceURL{testServer.URL, ch, nil}
if err := c.extractFromURL(); err != nil {
t.Errorf("%s: Unexpected error: %v", testCase.desc, err)
continue
}
update := (<-ch).(kubelet.PodUpdate)
if !reflect.DeepEqual(testCase.expected, update) {
t.Errorf("%s: Expected: %#v, Got: %#v", testCase.desc, testCase.expected, update)
}
for i := range update.Pods {
if errs := kubelet.ValidatePod(&update.Pods[i]); len(errs) != 0 {
if errs := validation.ValidateBoundPod(&update.Pods[i]); len(errs) != 0 {
t.Errorf("%s: Expected no validation errors on %#v, Got %#v", testCase.desc, update.Pods[i], errs)
}
}

View File

@ -212,7 +212,7 @@ func GetKubeletDockerContainers(client DockerInterface, allContainers bool) (Doc
// TODO(dchen1107): Remove the old separator "--" by end of Oct
if !strings.HasPrefix(container.Names[0], "/"+containerNamePrefix+"_") &&
!strings.HasPrefix(container.Names[0], "/"+containerNamePrefix+"--") {
glog.Infof("Docker Container:%s is not managed by kubelet.", container.Names[0])
glog.Infof("Docker Container: %s is not managed by kubelet.", container.Names[0])
continue
}
result[DockerID(container.ID)] = container
@ -330,7 +330,7 @@ func inspectContainer(client DockerInterface, dockerID, containerName string) (*
}
// GetDockerPodInfo returns docker info for all containers in the pod/manifest.
func GetDockerPodInfo(client DockerInterface, manifest api.ContainerManifest, podFullName, uuid string) (api.PodInfo, error) {
func GetDockerPodInfo(client DockerInterface, manifest api.PodSpec, podFullName, uuid string) (api.PodInfo, error) {
info := api.PodInfo{}
containers, err := client.ListContainers(docker.ListContainersOptions{All: true})

View File

@ -55,7 +55,7 @@ type CadvisorInterface interface {
// SyncHandler is an interface implemented by Kubelet, for testability
type SyncHandler interface {
SyncPods([]Pod) error
SyncPods([]api.BoundPod) error
}
type volumeMap map[string]volume.Interface
@ -111,7 +111,7 @@ type Kubelet struct {
networkContainerImage string
podWorkers *podWorkers
resyncInterval time.Duration
pods []Pod
pods []api.BoundPod
// Optional, no events will be sent without it
etcdClient tools.EtcdClient
@ -213,7 +213,7 @@ func makeEnvironmentVariables(container *api.Container) []string {
return result
}
func makeBinds(pod *Pod, container *api.Container, podVolumes volumeMap) []string {
func makeBinds(pod *api.BoundPod, container *api.Container, podVolumes volumeMap) []string {
binds := []string{}
for _, mount := range container.VolumeMounts {
vol, ok := podVolumes[mount.Name]
@ -276,10 +276,10 @@ func milliCPUToShares(milliCPU int) int {
return shares
}
func (kl *Kubelet) mountExternalVolumes(manifest *api.ContainerManifest) (volumeMap, error) {
func (kl *Kubelet) mountExternalVolumes(pod *api.BoundPod) (volumeMap, error) {
podVolumes := make(volumeMap)
for _, vol := range manifest.Volumes {
extVolume, err := volume.CreateVolumeBuilder(&vol, manifest.ID, kl.rootDirectory)
for _, vol := range pod.Spec.Volumes {
extVolume, err := volume.CreateVolumeBuilder(&vol, pod.ID, kl.rootDirectory)
if err != nil {
return nil, err
}
@ -323,18 +323,18 @@ func (kl *Kubelet) runHandler(podFullName, uuid string, container *api.Container
}
// Run a single container from a pod. Returns the docker container ID
func (kl *Kubelet) runContainer(pod *Pod, container *api.Container, podVolumes volumeMap, netMode string) (id dockertools.DockerID, err error) {
func (kl *Kubelet) runContainer(pod *api.BoundPod, container *api.Container, podVolumes volumeMap, netMode string) (id dockertools.DockerID, err error) {
envVariables := makeEnvironmentVariables(container)
binds := makeBinds(pod, container, podVolumes)
exposedPorts, portBindings := makePortsAndBindings(container)
opts := docker.CreateContainerOptions{
Name: dockertools.BuildDockerName(pod.Manifest.UUID, GetPodFullName(pod), container),
Name: dockertools.BuildDockerName(pod.UID, GetPodFullName(pod), container),
Config: &docker.Config{
Cmd: container.Command,
Env: envVariables,
ExposedPorts: exposedPorts,
Hostname: pod.Name,
Hostname: pod.ID,
Image: container.Image,
Memory: int64(container.Memory),
CpuShares: int64(milliCPUToShares(container.CPU)),
@ -358,7 +358,7 @@ func (kl *Kubelet) runContainer(pod *Pod, container *api.Container, podVolumes v
Privileged: privileged,
})
if err == nil && container.Lifecycle != nil && container.Lifecycle.PostStart != nil {
handlerErr := kl.runHandler(GetPodFullName(pod), pod.Manifest.UUID, container, container.Lifecycle.PostStart)
handlerErr := kl.runHandler(GetPodFullName(pod), pod.UID, container, container.Lifecycle.PostStart)
if handlerErr != nil {
kl.killContainerByID(dockerContainer.ID, "")
return dockertools.DockerID(""), fmt.Errorf("failed to call event handler: %v", handlerErr)
@ -392,11 +392,11 @@ const (
)
// createNetworkContainer starts the network container for a pod. Returns the docker container ID of the newly created container.
func (kl *Kubelet) createNetworkContainer(pod *Pod) (dockertools.DockerID, error) {
func (kl *Kubelet) createNetworkContainer(pod *api.BoundPod) (dockertools.DockerID, error) {
var ports []api.Port
// Docker only exports ports from the network container. Let's
// collect all of the relevant ports and export them.
for _, container := range pod.Manifest.Containers {
for _, container := range pod.Spec.Containers {
ports = append(ports, container.Ports...)
}
container := &api.Container{
@ -419,12 +419,12 @@ func (kl *Kubelet) createNetworkContainer(pod *Pod) (dockertools.DockerID, error
// Delete all containers in a pod (except the network container) returns the number of containers deleted
// and an error if one occurs.
func (kl *Kubelet) deleteAllContainers(pod *Pod, podFullName string, dockerContainers dockertools.DockerContainers) (int, error) {
func (kl *Kubelet) deleteAllContainers(pod *api.BoundPod, podFullName string, dockerContainers dockertools.DockerContainers) (int, error) {
count := 0
errs := make(chan error, len(pod.Manifest.Containers))
errs := make(chan error, len(pod.Spec.Containers))
wg := sync.WaitGroup{}
for _, container := range pod.Manifest.Containers {
if dockerContainer, found, _ := dockerContainers.FindPodContainer(podFullName, pod.Manifest.UUID, container.Name); found {
for _, container := range pod.Spec.Containers {
if dockerContainer, found, _ := dockerContainers.FindPodContainer(podFullName, pod.UID, container.Name); found {
count++
wg.Add(1)
go func() {
@ -451,9 +451,9 @@ func (kl *Kubelet) deleteAllContainers(pod *Pod, podFullName string, dockerConta
type empty struct{}
func (kl *Kubelet) syncPod(pod *Pod, dockerContainers dockertools.DockerContainers) error {
func (kl *Kubelet) syncPod(pod *api.BoundPod, dockerContainers dockertools.DockerContainers) error {
podFullName := GetPodFullName(pod)
uuid := pod.Manifest.UUID
uuid := pod.UID
containersToKeep := make(map[dockertools.DockerID]empty)
killedContainers := make(map[dockertools.DockerID]empty)
@ -484,7 +484,7 @@ func (kl *Kubelet) syncPod(pod *Pod, dockerContainers dockertools.DockerContaine
}
containersToKeep[netID] = empty{}
podVolumes, err := kl.mountExternalVolumes(&pod.Manifest)
podVolumes, err := kl.mountExternalVolumes(pod)
if err != nil {
glog.Errorf("Unable to mount volumes for pod %s: (%v) Skipping pod.", podFullName, err)
return err
@ -501,7 +501,7 @@ func (kl *Kubelet) syncPod(pod *Pod, dockerContainers dockertools.DockerContaine
podState.PodIP = netInfo.PodIP
}
for _, container := range pod.Manifest.Containers {
for _, container := range pod.Spec.Containers {
expectedHash := dockertools.HashContainer(&container)
if dockerContainer, found, hash := dockerContainers.FindPodContainer(podFullName, uuid, container.Name); found {
containerID := dockertools.DockerID(dockerContainer.ID)
@ -538,13 +538,13 @@ func (kl *Kubelet) syncPod(pod *Pod, dockerContainers dockertools.DockerContaine
// TODO(dawnchen): error handling here?
}
if len(recentContainers) > 0 && pod.Manifest.RestartPolicy.Always == nil {
if pod.Manifest.RestartPolicy.Never != nil {
if len(recentContainers) > 0 && pod.Spec.RestartPolicy.Always == nil {
if pod.Spec.RestartPolicy.Never != nil {
glog.V(3).Infof("Already ran container with name %s--%s--%s, do nothing",
podFullName, uuid, container.Name)
continue
}
if pod.Manifest.RestartPolicy.OnFailure != nil {
if pod.Spec.RestartPolicy.OnFailure != nil {
// Check the exit code of last run
if recentContainers[0].State.ExitCode == 0 {
glog.V(3).Infof("Already successfully ran container with name %s--%s--%s, do nothing",
@ -605,11 +605,11 @@ type podContainer struct {
// Stores all volumes defined by the set of pods into a map.
// Keys for each entry are in the format (POD_ID)/(VOLUME_NAME)
func getDesiredVolumes(pods []Pod) map[string]api.Volume {
func getDesiredVolumes(pods []api.BoundPod) map[string]api.Volume {
desiredVolumes := make(map[string]api.Volume)
for _, pod := range pods {
for _, volume := range pod.Manifest.Volumes {
identifier := path.Join(pod.Manifest.ID, volume.Name)
for _, volume := range pod.Spec.Volumes {
identifier := path.Join(pod.ID, volume.Name)
desiredVolumes[identifier] = volume
}
}
@ -618,7 +618,7 @@ func getDesiredVolumes(pods []Pod) map[string]api.Volume {
// Compares the map of current volumes to the map of desired volumes.
// If an active volume does not have a respective desired volume, clean it up.
func (kl *Kubelet) reconcileVolumes(pods []Pod) error {
func (kl *Kubelet) reconcileVolumes(pods []api.BoundPod) error {
desiredVolumes := getDesiredVolumes(pods)
currentVolumes := volume.GetCurrentVolumes(kl.rootDirectory)
for name, vol := range currentVolumes {
@ -637,7 +637,7 @@ func (kl *Kubelet) reconcileVolumes(pods []Pod) error {
}
// SyncPods synchronizes the configured list of pods (desired state) with the host current state.
func (kl *Kubelet) SyncPods(pods []Pod) error {
func (kl *Kubelet) SyncPods(pods []api.BoundPod) error {
glog.V(4).Infof("Desired [%s]: %+v", kl.hostname, pods)
var err error
desiredContainers := make(map[podContainer]empty)
@ -652,11 +652,11 @@ func (kl *Kubelet) SyncPods(pods []Pod) error {
for ix := range pods {
pod := &pods[ix]
podFullName := GetPodFullName(pod)
uuid := pod.Manifest.UUID
uuid := pod.UID
// Add all containers (including net) to the map.
desiredContainers[podContainer{podFullName, uuid, networkContainerName}] = empty{}
for _, cont := range pod.Manifest.Containers {
for _, cont := range pod.Spec.Containers {
desiredContainers[podContainer{podFullName, uuid, cont.Name}] = empty{}
}
@ -693,13 +693,13 @@ func (kl *Kubelet) SyncPods(pods []Pod) error {
}
// filterHostPortConflicts removes pods that conflict on Port.HostPort values
func filterHostPortConflicts(pods []Pod) []Pod {
filtered := []Pod{}
func filterHostPortConflicts(pods []api.BoundPod) []api.BoundPod {
filtered := []api.BoundPod{}
ports := map[int]bool{}
extract := func(p *api.Port) int { return p.HostPort }
for i := range pods {
pod := &pods[i]
if errs := validation.AccumulateUniquePorts(pod.Manifest.Containers, ports, extract); len(errs) != 0 {
if errs := validation.AccumulateUniquePorts(pod.Spec.Containers, ports, extract); len(errs) != 0 {
glog.Warningf("Pod %s has conflicting ports, ignoring: %v", GetPodFullName(pod), errs)
continue
}
@ -784,10 +784,10 @@ func (kl *Kubelet) GetKubeletContainerLogs(podFullName, containerName, tail stri
// GetPodInfo returns information from Docker about the containers in a pod
func (kl *Kubelet) GetPodInfo(podFullName, uuid string) (api.PodInfo, error) {
var manifest api.ContainerManifest
var manifest api.PodSpec
for _, pod := range kl.pods {
if GetPodFullName(&pod) == podFullName {
manifest = pod.Manifest
manifest = pod.Spec
break
}
}

View File

@ -125,7 +125,7 @@ func TestKillContainer(t *testing.T) {
}
type channelReader struct {
list [][]Pod
list [][]api.BoundPod
wg sync.WaitGroup
}
@ -145,7 +145,7 @@ func startReading(channel <-chan interface{}) *channelReader {
return cr
}
func (cr *channelReader) GetList() [][]Pod {
func (cr *channelReader) GetList() [][]api.BoundPod {
cr.wg.Wait()
return cr.list
}
@ -156,21 +156,23 @@ func TestSyncPodsDoesNothing(t *testing.T) {
fakeDocker.ContainerList = []docker.APIContainers{
{
// format is k8s_<container-id>_<pod-fullname>
Names: []string{"/k8s_bar." + strconv.FormatUint(dockertools.HashContainer(&container), 16) + "_foo.test"},
Names: []string{"/k8s_bar." + strconv.FormatUint(dockertools.HashContainer(&container), 16) + "_foo.new.test"},
ID: "1234",
},
{
// network container
Names: []string{"/k8s_net_foo.test_"},
Names: []string{"/k8s_net_foo.new.test_"},
ID: "9876",
},
}
err := kubelet.SyncPods([]Pod{
err := kubelet.SyncPods([]api.BoundPod{
{
Name: "foo",
Namespace: "test",
Manifest: api.ContainerManifest{
ID: "foo",
TypeMeta: api.TypeMeta{
ID: "foo",
Namespace: "new",
Annotations: map[string]string{ConfigSourceAnnotationKey: "test"},
},
Spec: api.PodSpec{
Containers: []api.Container{
container,
},
@ -209,12 +211,14 @@ func TestSyncPodsCreatesNetAndContainer(t *testing.T) {
kubelet, _, fakeDocker := newTestKubelet(t)
kubelet.networkContainerImage = "custom_image_name"
fakeDocker.ContainerList = []docker.APIContainers{}
err := kubelet.SyncPods([]Pod{
err := kubelet.SyncPods([]api.BoundPod{
{
Name: "foo",
Namespace: "test",
Manifest: api.ContainerManifest{
ID: "foo",
TypeMeta: api.TypeMeta{
ID: "foo",
Namespace: "new",
Annotations: map[string]string{ConfigSourceAnnotationKey: "test"},
},
Spec: api.PodSpec{
Containers: []api.Container{
{Name: "bar"},
},
@ -242,8 +246,8 @@ func TestSyncPodsCreatesNetAndContainer(t *testing.T) {
}
if len(fakeDocker.Created) != 2 ||
!matchString(t, "k8s_net\\.[a-f0-9]+_foo.test_", fakeDocker.Created[0]) ||
!matchString(t, "k8s_bar\\.[a-f0-9]+_foo.test_", fakeDocker.Created[1]) {
!matchString(t, "k8s_net\\.[a-f0-9]+_foo.new.test_", fakeDocker.Created[0]) ||
!matchString(t, "k8s_bar\\.[a-f0-9]+_foo.new.test_", fakeDocker.Created[1]) {
t.Errorf("Unexpected containers created %v", fakeDocker.Created)
}
fakeDocker.Unlock()
@ -255,12 +259,14 @@ func TestSyncPodsCreatesNetAndContainerPullsImage(t *testing.T) {
puller.HasImages = []string{}
kubelet.networkContainerImage = "custom_image_name"
fakeDocker.ContainerList = []docker.APIContainers{}
err := kubelet.SyncPods([]Pod{
err := kubelet.SyncPods([]api.BoundPod{
{
Name: "foo",
Namespace: "test",
Manifest: api.ContainerManifest{
ID: "foo",
TypeMeta: api.TypeMeta{
ID: "foo",
Namespace: "new",
Annotations: map[string]string{ConfigSourceAnnotationKey: "test"},
},
Spec: api.PodSpec{
Containers: []api.Container{
{Name: "bar"},
},
@ -282,8 +288,8 @@ func TestSyncPodsCreatesNetAndContainerPullsImage(t *testing.T) {
}
if len(fakeDocker.Created) != 2 ||
!matchString(t, "k8s_net\\.[a-f0-9]+_foo.test_", fakeDocker.Created[0]) ||
!matchString(t, "k8s_bar\\.[a-f0-9]+_foo.test_", fakeDocker.Created[1]) {
!matchString(t, "k8s_net\\.[a-f0-9]+_foo.new.test_", fakeDocker.Created[0]) ||
!matchString(t, "k8s_bar\\.[a-f0-9]+_foo.new.test_", fakeDocker.Created[1]) {
t.Errorf("Unexpected containers created %v", fakeDocker.Created)
}
fakeDocker.Unlock()
@ -294,16 +300,18 @@ func TestSyncPodsWithNetCreatesContainer(t *testing.T) {
fakeDocker.ContainerList = []docker.APIContainers{
{
// network container
Names: []string{"/k8s_net_foo.test_"},
Names: []string{"/k8s_net_foo.new.test_"},
ID: "9876",
},
}
err := kubelet.SyncPods([]Pod{
err := kubelet.SyncPods([]api.BoundPod{
{
Name: "foo",
Namespace: "test",
Manifest: api.ContainerManifest{
ID: "foo",
TypeMeta: api.TypeMeta{
ID: "foo",
Namespace: "new",
Annotations: map[string]string{ConfigSourceAnnotationKey: "test"},
},
Spec: api.PodSpec{
Containers: []api.Container{
{Name: "bar"},
},
@ -320,7 +328,7 @@ func TestSyncPodsWithNetCreatesContainer(t *testing.T) {
fakeDocker.Lock()
if len(fakeDocker.Created) != 1 ||
!matchString(t, "k8s_bar\\.[a-f0-9]+_foo.test_", fakeDocker.Created[0]) {
!matchString(t, "k8s_bar\\.[a-f0-9]+_foo.new.test_", fakeDocker.Created[0]) {
t.Errorf("Unexpected containers created %v", fakeDocker.Created)
}
fakeDocker.Unlock()
@ -333,16 +341,18 @@ func TestSyncPodsWithNetCreatesContainerCallsHandler(t *testing.T) {
fakeDocker.ContainerList = []docker.APIContainers{
{
// network container
Names: []string{"/k8s_net_foo.test_"},
Names: []string{"/k8s_net_foo.new.test_"},
ID: "9876",
},
}
err := kubelet.SyncPods([]Pod{
err := kubelet.SyncPods([]api.BoundPod{
{
Name: "foo",
Namespace: "test",
Manifest: api.ContainerManifest{
ID: "foo",
TypeMeta: api.TypeMeta{
ID: "foo",
Namespace: "new",
Annotations: map[string]string{ConfigSourceAnnotationKey: "test"},
},
Spec: api.PodSpec{
Containers: []api.Container{
{
Name: "bar",
@ -370,7 +380,7 @@ func TestSyncPodsWithNetCreatesContainerCallsHandler(t *testing.T) {
fakeDocker.Lock()
if len(fakeDocker.Created) != 1 ||
!matchString(t, "k8s_bar\\.[a-f0-9]+_foo.test_", fakeDocker.Created[0]) {
!matchString(t, "k8s_bar\\.[a-f0-9]+_foo.new.test_", fakeDocker.Created[0]) {
t.Errorf("Unexpected containers created %v", fakeDocker.Created)
}
fakeDocker.Unlock()
@ -384,16 +394,18 @@ func TestSyncPodsDeletesWithNoNetContainer(t *testing.T) {
fakeDocker.ContainerList = []docker.APIContainers{
{
// format is k8s_<container-id>_<pod-fullname>
Names: []string{"/k8s_bar_foo.test"},
Names: []string{"/k8s_bar_foo.new.test"},
ID: "1234",
},
}
err := kubelet.SyncPods([]Pod{
err := kubelet.SyncPods([]api.BoundPod{
{
Name: "foo",
Namespace: "test",
Manifest: api.ContainerManifest{
ID: "foo",
TypeMeta: api.TypeMeta{
ID: "foo",
Namespace: "new",
Annotations: map[string]string{ConfigSourceAnnotationKey: "test"},
},
Spec: api.PodSpec{
Containers: []api.Container{
{Name: "bar"},
},
@ -425,12 +437,12 @@ func TestSyncPodsDeletes(t *testing.T) {
fakeDocker.ContainerList = []docker.APIContainers{
{
// the k8s prefix is required for the kubelet to manage the container
Names: []string{"/k8s_foo_bar.test"},
Names: []string{"/k8s_foo_bar.new.test"},
ID: "1234",
},
{
// network container
Names: []string{"/k8s_net_foo.test_"},
Names: []string{"/k8s_net_foo.new.test_"},
ID: "9876",
},
{
@ -438,7 +450,7 @@ func TestSyncPodsDeletes(t *testing.T) {
ID: "4567",
},
}
err := kubelet.SyncPods([]Pod{})
err := kubelet.SyncPods([]api.BoundPod{})
if err != nil {
t.Errorf("unexpected error: %v", err)
}
@ -463,30 +475,32 @@ func TestSyncPodDeletesDuplicate(t *testing.T) {
dockerContainers := dockertools.DockerContainers{
"1234": &docker.APIContainers{
// the k8s prefix is required for the kubelet to manage the container
Names: []string{"/k8s_foo_bar.test_1"},
Names: []string{"/k8s_foo_bar.new.test_1"},
ID: "1234",
},
"9876": &docker.APIContainers{
// network container
Names: []string{"/k8s_net_bar.test_"},
Names: []string{"/k8s_net_bar.new.test_"},
ID: "9876",
},
"4567": &docker.APIContainers{
// Duplicate for the same container.
Names: []string{"/k8s_foo_bar.test_2"},
Names: []string{"/k8s_foo_bar.new.test_2"},
ID: "4567",
},
"2304": &docker.APIContainers{
// Container for another pod, untouched.
Names: []string{"/k8s_baz_fiz.test_6"},
Names: []string{"/k8s_baz_fiz.new.test_6"},
ID: "2304",
},
}
err := kubelet.syncPod(&Pod{
Name: "bar",
Namespace: "test",
Manifest: api.ContainerManifest{
ID: "bar",
err := kubelet.syncPod(&api.BoundPod{
TypeMeta: api.TypeMeta{
ID: "bar",
Namespace: "new",
Annotations: map[string]string{ConfigSourceAnnotationKey: "test"},
},
Spec: api.PodSpec{
Containers: []api.Container{
{Name: "foo"},
},
@ -520,20 +534,22 @@ func TestSyncPodBadHash(t *testing.T) {
dockerContainers := dockertools.DockerContainers{
"1234": &docker.APIContainers{
// the k8s prefix is required for the kubelet to manage the container
Names: []string{"/k8s_bar.1234_foo.test"},
Names: []string{"/k8s_bar.1234_foo.new.test"},
ID: "1234",
},
"9876": &docker.APIContainers{
// network container
Names: []string{"/k8s_net_foo.test_"},
Names: []string{"/k8s_net_foo.new.test_"},
ID: "9876",
},
}
err := kubelet.syncPod(&Pod{
Name: "foo",
Namespace: "test",
Manifest: api.ContainerManifest{
ID: "foo",
err := kubelet.syncPod(&api.BoundPod{
TypeMeta: api.TypeMeta{
ID: "foo",
Namespace: "new",
Annotations: map[string]string{ConfigSourceAnnotationKey: "test"},
},
Spec: api.PodSpec{
Containers: []api.Container{
{Name: "bar"},
},
@ -562,20 +578,22 @@ func TestSyncPodUnhealthy(t *testing.T) {
dockerContainers := dockertools.DockerContainers{
"1234": &docker.APIContainers{
// the k8s prefix is required for the kubelet to manage the container
Names: []string{"/k8s_bar_foo.test"},
Names: []string{"/k8s_bar_foo.new.test"},
ID: "1234",
},
"9876": &docker.APIContainers{
// network container
Names: []string{"/k8s_net_foo.test_"},
Names: []string{"/k8s_net_foo.new.test_"},
ID: "9876",
},
}
err := kubelet.syncPod(&Pod{
Name: "foo",
Namespace: "test",
Manifest: api.ContainerManifest{
ID: "foo",
err := kubelet.syncPod(&api.BoundPod{
TypeMeta: api.TypeMeta{
ID: "foo",
Namespace: "new",
Annotations: map[string]string{ConfigSourceAnnotationKey: "test"},
},
Spec: api.PodSpec{
Containers: []api.Container{
{Name: "bar",
LivenessProbe: &api.LivenessProbe{
@ -629,25 +647,31 @@ func TestMakeEnvVariables(t *testing.T) {
func TestMountExternalVolumes(t *testing.T) {
kubelet, _, _ := newTestKubelet(t)
manifest := api.ContainerManifest{
Volumes: []api.Volume{
{
Name: "host-dir",
Source: &api.VolumeSource{
HostDir: &api.HostDir{"/dir/path"},
pod := api.BoundPod{
TypeMeta: api.TypeMeta{
ID: "foo",
Namespace: "test",
},
Spec: api.PodSpec{
Volumes: []api.Volume{
{
Name: "host-dir",
Source: &api.VolumeSource{
HostDir: &api.HostDir{"/dir/path"},
},
},
},
},
}
podVolumes, _ := kubelet.mountExternalVolumes(&manifest)
podVolumes, _ := kubelet.mountExternalVolumes(&pod)
expectedPodVolumes := make(volumeMap)
expectedPodVolumes["host-dir"] = &volume.HostDir{"/dir/path"}
if len(expectedPodVolumes) != len(podVolumes) {
t.Errorf("Unexpected volumes. Expected %#v got %#v. Manifest was: %#v", expectedPodVolumes, podVolumes, manifest)
t.Errorf("Unexpected volumes. Expected %#v got %#v. Manifest was: %#v", expectedPodVolumes, podVolumes, pod)
}
for name, expectedVolume := range expectedPodVolumes {
if _, ok := podVolumes[name]; !ok {
t.Errorf("Pod volumes map is missing key: %s. %#v", expectedVolume, podVolumes)
t.Errorf("api.BoundPod volumes map is missing key: %s. %#v", expectedVolume, podVolumes)
}
}
}
@ -678,9 +702,11 @@ func TestMakeVolumesAndBinds(t *testing.T) {
},
}
pod := Pod{
Name: "pod",
Namespace: "test",
pod := api.BoundPod{
TypeMeta: api.TypeMeta{
ID: "pod",
Namespace: "test",
},
}
podVolumes := volumeMap{
@ -769,26 +795,26 @@ func TestMakePortsAndBindings(t *testing.T) {
}
func TestCheckHostPortConflicts(t *testing.T) {
successCaseAll := []Pod{
{Manifest: api.ContainerManifest{Containers: []api.Container{{Ports: []api.Port{{HostPort: 80}}}}}},
{Manifest: api.ContainerManifest{Containers: []api.Container{{Ports: []api.Port{{HostPort: 81}}}}}},
{Manifest: api.ContainerManifest{Containers: []api.Container{{Ports: []api.Port{{HostPort: 82}}}}}},
successCaseAll := []api.BoundPod{
{Spec: api.PodSpec{Containers: []api.Container{{Ports: []api.Port{{HostPort: 80}}}}}},
{Spec: api.PodSpec{Containers: []api.Container{{Ports: []api.Port{{HostPort: 81}}}}}},
{Spec: api.PodSpec{Containers: []api.Container{{Ports: []api.Port{{HostPort: 82}}}}}},
}
successCaseNew := Pod{
Manifest: api.ContainerManifest{Containers: []api.Container{{Ports: []api.Port{{HostPort: 83}}}}},
successCaseNew := api.BoundPod{
Spec: api.PodSpec{Containers: []api.Container{{Ports: []api.Port{{HostPort: 83}}}}},
}
expected := append(successCaseAll, successCaseNew)
if actual := filterHostPortConflicts(expected); !reflect.DeepEqual(actual, expected) {
t.Errorf("Expected %#v, Got %#v", expected, actual)
}
failureCaseAll := []Pod{
{Manifest: api.ContainerManifest{Containers: []api.Container{{Ports: []api.Port{{HostPort: 80}}}}}},
{Manifest: api.ContainerManifest{Containers: []api.Container{{Ports: []api.Port{{HostPort: 81}}}}}},
{Manifest: api.ContainerManifest{Containers: []api.Container{{Ports: []api.Port{{HostPort: 82}}}}}},
failureCaseAll := []api.BoundPod{
{Spec: api.PodSpec{Containers: []api.Container{{Ports: []api.Port{{HostPort: 80}}}}}},
{Spec: api.PodSpec{Containers: []api.Container{{Ports: []api.Port{{HostPort: 81}}}}}},
{Spec: api.PodSpec{Containers: []api.Container{{Ports: []api.Port{{HostPort: 82}}}}}},
}
failureCaseNew := Pod{
Manifest: api.ContainerManifest{Containers: []api.Container{{Ports: []api.Port{{HostPort: 81}}}}},
failureCaseNew := api.BoundPod{
Spec: api.PodSpec{Containers: []api.Container{{Ports: []api.Port{{HostPort: 81}}}}},
}
if actual := filterHostPortConflicts(append(failureCaseAll, failureCaseNew)); !reflect.DeepEqual(failureCaseAll, actual) {
t.Errorf("Expected %#v, Got %#v", expected, actual)
@ -965,7 +991,7 @@ func TestRunInContainerNoSuchPod(t *testing.T) {
podNamespace := "etcd"
containerName := "containerFoo"
output, err := kubelet.RunInContainer(
GetPodFullName(&Pod{Name: podName, Namespace: podNamespace}),
GetPodFullName(&api.BoundPod{TypeMeta: api.TypeMeta{ID: podName, Namespace: podNamespace}}),
"",
containerName,
[]string{"ls"})
@ -990,13 +1016,19 @@ func TestRunInContainer(t *testing.T) {
fakeDocker.ContainerList = []docker.APIContainers{
{
ID: containerID,
Names: []string{"/k8s_" + containerName + "_" + podName + "." + podNamespace + "_1234"},
Names: []string{"/k8s_" + containerName + "_" + podName + "." + podNamespace + ".test_1234"},
},
}
cmd := []string{"ls"}
_, err := kubelet.RunInContainer(
GetPodFullName(&Pod{Name: podName, Namespace: podNamespace}),
GetPodFullName(&api.BoundPod{
TypeMeta: api.TypeMeta{
ID: podName,
Namespace: podNamespace,
Annotations: map[string]string{ConfigSourceAnnotationKey: "test"},
},
}),
"",
containerName,
cmd)
@ -1128,15 +1160,17 @@ func TestSyncPodEventHandlerFails(t *testing.T) {
dockerContainers := dockertools.DockerContainers{
"9876": &docker.APIContainers{
// network container
Names: []string{"/k8s_net_foo.test_"},
Names: []string{"/k8s_net_foo.new.test_"},
ID: "9876",
},
}
err := kubelet.syncPod(&Pod{
Name: "foo",
Namespace: "test",
Manifest: api.ContainerManifest{
ID: "foo",
err := kubelet.syncPod(&api.BoundPod{
TypeMeta: api.TypeMeta{
ID: "foo",
Namespace: "new",
Annotations: map[string]string{ConfigSourceAnnotationKey: "test"},
},
Spec: api.PodSpec{
Containers: []api.Container{
{Name: "bar",
Lifecycle: &api.Lifecycle{

View File

@ -20,6 +20,7 @@ import (
"fmt"
"time"
"github.com/GoogleCloudPlatform/kubernetes/pkg/api"
"github.com/GoogleCloudPlatform/kubernetes/pkg/kubelet/dockertools"
"github.com/golang/glog"
)
@ -32,7 +33,7 @@ const (
)
type RunPodResult struct {
Pod *Pod
Pod *api.BoundPod
Err error
}
@ -50,7 +51,7 @@ func (kl *Kubelet) RunOnce(updates <-chan PodUpdate) ([]RunPodResult, error) {
}
// runOnce runs a given set of pods and returns their status.
func (kl *Kubelet) runOnce(pods []Pod) (results []RunPodResult, err error) {
func (kl *Kubelet) runOnce(pods []api.BoundPod) (results []RunPodResult, err error) {
if kl.dockerPuller == nil {
kl.dockerPuller = dockertools.NewDockerPuller(kl.dockerClient, kl.pullQPS, kl.pullBurst)
}
@ -72,10 +73,10 @@ func (kl *Kubelet) runOnce(pods []Pod) (results []RunPodResult, err error) {
results = append(results, res)
if res.Err != nil {
// TODO(proppy): report which containers failed the pod.
glog.Infof("failed to start pod %q: %v", res.Pod.Name, res.Err)
failedPods = append(failedPods, res.Pod.Name)
glog.Infof("failed to start pod %q: %v", res.Pod.ID, res.Err)
failedPods = append(failedPods, res.Pod.ID)
} else {
glog.Infof("started pod %q", res.Pod.Name)
glog.Infof("started pod %q", res.Pod.ID)
}
}
if len(failedPods) > 0 {
@ -86,7 +87,7 @@ func (kl *Kubelet) runOnce(pods []Pod) (results []RunPodResult, err error) {
}
// runPod runs a single pod and wait until all containers are running.
func (kl *Kubelet) runPod(pod Pod) error {
func (kl *Kubelet) runPod(pod api.BoundPod) error {
delay := RunOnceRetryDelay
retry := 0
for {
@ -95,18 +96,18 @@ func (kl *Kubelet) runPod(pod Pod) error {
return fmt.Errorf("failed to get kubelet docker containers: %v", err)
}
if running := kl.isPodRunning(pod, dockerContainers); running {
glog.Infof("pod %q containers running", pod.Name)
glog.Infof("pod %q containers running", pod.ID)
return nil
}
glog.Infof("pod %q containers not running: syncing", pod.Name)
glog.Infof("pod %q containers not running: syncing", pod.ID)
if err = kl.syncPod(&pod, dockerContainers); err != nil {
return fmt.Errorf("error syncing pod: %v", err)
}
if retry >= RunOnceMaxRetries {
return fmt.Errorf("timeout error: pod %q containers not running after %d retries", pod.Name, RunOnceMaxRetries)
return fmt.Errorf("timeout error: pod %q containers not running after %d retries", pod.ID, RunOnceMaxRetries)
}
// TODO(proppy): health checking would be better than waiting + checking the state at the next iteration.
glog.Infof("pod %q containers synced, waiting for %v", pod.Name, delay)
glog.Infof("pod %q containers synced, waiting for %v", pod.ID, delay)
<-time.After(delay)
retry++
delay *= RunOnceRetryDelayBackoff
@ -114,9 +115,9 @@ func (kl *Kubelet) runPod(pod Pod) error {
}
// isPodRunning returns true if all containers of a manifest are running.
func (kl *Kubelet) isPodRunning(pod Pod, dockerContainers dockertools.DockerContainers) bool {
for _, container := range pod.Manifest.Containers {
if dockerContainer, found, _ := dockerContainers.FindPodContainer(GetPodFullName(&pod), pod.Manifest.UUID, container.Name); !found || dockerContainer.Status != "running" {
func (kl *Kubelet) isPodRunning(pod api.BoundPod, dockerContainers dockertools.DockerContainers) bool {
for _, container := range pod.Spec.Containers {
if dockerContainer, found, _ := dockerContainers.FindPodContainer(GetPodFullName(&pod), pod.UID, container.Name); !found || dockerContainer.Status != "running" {
glog.Infof("container %q not found (%v) or not running: %#v", container.Name, found, dockerContainer)
return false
}

View File

@ -69,12 +69,12 @@ func TestRunOnce(t *testing.T) {
kb := &Kubelet{}
podContainers := []docker.APIContainers{
{
Names: []string{"/k8s_bar." + strconv.FormatUint(dockertools.HashContainer(&api.Container{Name: "bar"}), 16) + "_foo.test"},
Names: []string{"/k8s_bar." + strconv.FormatUint(dockertools.HashContainer(&api.Container{Name: "bar"}), 16) + "_foo.new.test"},
ID: "1234",
Status: "running",
},
{
Names: []string{"/k8s_net_foo.test_"},
Names: []string{"/k8s_net_foo.new.test_"},
ID: "9876",
Status: "running",
},
@ -106,12 +106,14 @@ func TestRunOnce(t *testing.T) {
t: t,
}
kb.dockerPuller = &dockertools.FakeDockerPuller{}
results, err := kb.runOnce([]Pod{
results, err := kb.runOnce([]api.BoundPod{
{
Name: "foo",
Namespace: "test",
Manifest: api.ContainerManifest{
ID: "foo",
TypeMeta: api.TypeMeta{
ID: "foo",
Namespace: "new",
Annotations: map[string]string{ConfigSourceAnnotationKey: "test"},
},
Spec: api.PodSpec{
Containers: []api.Container{
{Name: "bar"},
},
@ -124,7 +126,7 @@ func TestRunOnce(t *testing.T) {
if results[0].Err != nil {
t.Errorf("unexpected run pod error: %v", results[0].Err)
}
if results[0].Pod.Name != "foo" {
t.Errorf("unexpected pod: %q", results[0].Pod.Name)
if results[0].Pod.ID != "foo" {
t.Errorf("unexpected pod: %q", results[0].Pod.ID)
}
}

View File

@ -119,15 +119,26 @@ func (s *Server) handleContainer(w http.ResponseWriter, req *http.Request) {
return
}
// This is to provide backward compatibility. It only supports a single manifest
var pod Pod
err = yaml.Unmarshal(data, &pod.Manifest)
var pod api.BoundPod
var containerManifest api.ContainerManifest
err = yaml.Unmarshal(data, &containerManifest)
if err != nil {
s.error(w, err)
return
}
pod.ID = containerManifest.ID
pod.UID = containerManifest.UUID
pod.Spec.Containers = containerManifest.Containers
pod.Spec.Volumes = containerManifest.Volumes
pod.Spec.RestartPolicy = containerManifest.RestartPolicy
//TODO: sha1 of manifest?
pod.Name = "1"
s.updates <- PodUpdate{[]Pod{pod}, SET}
if pod.ID == "" {
pod.ID = "1"
}
if pod.UID == "" {
pod.UID = "1"
}
s.updates <- PodUpdate{[]api.BoundPod{pod}, SET}
}
@ -139,16 +150,16 @@ func (s *Server) handleContainers(w http.ResponseWriter, req *http.Request) {
s.error(w, err)
return
}
var manifests []api.ContainerManifest
err = yaml.Unmarshal(data, &manifests)
var specs []api.PodSpec
err = yaml.Unmarshal(data, &specs)
if err != nil {
s.error(w, err)
return
}
pods := make([]Pod, len(manifests))
for i := range manifests {
pods[i].Name = fmt.Sprintf("%d", i+1)
pods[i].Manifest = manifests[i]
pods := make([]api.BoundPod, len(specs))
for i := range specs {
pods[i].ID = fmt.Sprintf("%d", i+1)
pods[i].Spec = specs[i]
}
s.updates <- PodUpdate{pods, SET}
@ -186,7 +197,14 @@ func (s *Server) handleContainerLogs(w http.ResponseWriter, req *http.Request) {
follow, _ := strconv.ParseBool(uriValues.Get("follow"))
tail := uriValues.Get("tail")
podFullName := GetPodFullName(&Pod{Name: podID, Namespace: "etcd"})
podFullName := GetPodFullName(&api.BoundPod{
TypeMeta: api.TypeMeta{
ID: podID,
// TODO: I am broken
Namespace: api.NamespaceDefault,
Annotations: map[string]string{ConfigSourceAnnotationKey: "etcd"},
},
})
fw := FlushWriter{writer: w}
if flusher, ok := w.(http.Flusher); ok {
@ -216,10 +234,17 @@ func (s *Server) handlePodInfo(w http.ResponseWriter, req *http.Request) {
return
}
// TODO: backwards compatibility with existing API, needs API change
podFullName := GetPodFullName(&Pod{Name: podID, Namespace: "etcd"})
podFullName := GetPodFullName(&api.BoundPod{
TypeMeta: api.TypeMeta{
ID: podID,
// TODO: I am broken
Namespace: api.NamespaceDefault,
Annotations: map[string]string{ConfigSourceAnnotationKey: "etcd"},
},
})
info, err := s.host.GetPodInfo(podFullName, podUUID)
if err == dockertools.ErrNoContainersInPod {
http.Error(w, "Pod does not exist", http.StatusNotFound)
http.Error(w, "api.BoundPod does not exist", http.StatusNotFound)
return
}
if err != nil {
@ -283,7 +308,14 @@ func (s *Server) handleRun(w http.ResponseWriter, req *http.Request) {
http.Error(w, "Unexpected path for command running", http.StatusBadRequest)
return
}
podFullName := GetPodFullName(&Pod{Name: podID, Namespace: "etcd"})
podFullName := GetPodFullName(&api.BoundPod{
TypeMeta: api.TypeMeta{
ID: podID,
// TODO: I am broken
Namespace: api.NamespaceDefault,
Annotations: map[string]string{ConfigSourceAnnotationKey: "etcd"},
},
})
command := strings.Split(u.Query().Get("cmd"), " ")
data, err := s.host.RunInContainer(podFullName, uuid, container, command)
if err != nil {
@ -327,10 +359,24 @@ func (s *Server) serveStats(w http.ResponseWriter, req *http.Request) {
errors.New("pod level status currently unimplemented")
case 3:
// Backward compatibility without uuid information
podFullName := GetPodFullName(&Pod{Name: components[1], Namespace: "etcd"})
podFullName := GetPodFullName(&api.BoundPod{
TypeMeta: api.TypeMeta{
ID: components[1],
// TODO: I am broken
Namespace: api.NamespaceDefault,
Annotations: map[string]string{ConfigSourceAnnotationKey: "etcd"},
},
})
stats, err = s.host.GetContainerInfo(podFullName, "", components[2], &query)
case 4:
podFullName := GetPodFullName(&Pod{Name: components[1], Namespace: "etcd"})
podFullName := GetPodFullName(&api.BoundPod{
TypeMeta: api.TypeMeta{
ID: components[1],
// TODO: I am broken
Namespace: "",
Annotations: map[string]string{ConfigSourceAnnotationKey: "etcd"},
},
})
stats, err = s.host.GetContainerInfo(podFullName, components[2], components[2], &query)
default:
http.Error(w, "unknown resource.", http.StatusNotFound)

View File

@ -101,7 +101,23 @@ func readResp(resp *http.Response) (string, error) {
func TestContainer(t *testing.T) {
fw := newServerTest()
expected := []api.ContainerManifest{
{ID: "test_manifest"},
{
ID: "test_manifest",
UUID: "value",
Containers: []api.Container{
{
Name: "container",
},
},
Volumes: []api.Volume{
{
Name: "test",
},
},
RestartPolicy: api.RestartPolicy{
Never: &api.RestartPolicyNever{},
},
},
}
body := bytes.NewBuffer([]byte(util.EncodeJSON(expected[0]))) // Only send a single ContainerManifest
resp, err := http.Post(fw.testHTTPServer.URL+"/container", "application/json", body)
@ -114,7 +130,29 @@ func TestContainer(t *testing.T) {
if len(received) != 1 {
t.Errorf("Expected 1 manifest, but got %v", len(received))
}
expectedPods := []Pod{{Name: "1", Manifest: expected[0]}}
expectedPods := []api.BoundPod{
{
TypeMeta: api.TypeMeta{
ID: "test_manifest",
UID: "value",
},
Spec: api.PodSpec{
Containers: []api.Container{
{
Name: "container",
},
},
Volumes: []api.Volume{
{
Name: "test",
},
},
RestartPolicy: api.RestartPolicy{
Never: &api.RestartPolicyNever{},
},
},
},
}
if !reflect.DeepEqual(expectedPods, received[0]) {
t.Errorf("Expected %#v, but got %#v", expectedPods, received[0])
}
@ -123,8 +161,38 @@ func TestContainer(t *testing.T) {
func TestContainers(t *testing.T) {
fw := newServerTest()
expected := []api.ContainerManifest{
{ID: "test_manifest_1"},
{ID: "test_manifest_2"},
{
ID: "test_manifest_1",
Containers: []api.Container{
{
Name: "container",
},
},
Volumes: []api.Volume{
{
Name: "test",
},
},
RestartPolicy: api.RestartPolicy{
Never: &api.RestartPolicyNever{},
},
},
{
ID: "test_manifest_2",
Containers: []api.Container{
{
Name: "container2",
},
},
Volumes: []api.Volume{
{
Name: "test2",
},
},
RestartPolicy: api.RestartPolicy{
Never: &api.RestartPolicyNever{},
},
},
}
body := bytes.NewBuffer([]byte(util.EncodeJSON(expected)))
resp, err := http.Post(fw.testHTTPServer.URL+"/containers", "application/json", body)
@ -137,7 +205,48 @@ func TestContainers(t *testing.T) {
if len(received) != 1 {
t.Errorf("Expected 1 update, but got %v", len(received))
}
expectedPods := []Pod{{Name: "1", Manifest: expected[0]}, {Name: "2", Manifest: expected[1]}}
expectedPods := []api.BoundPod{
{
TypeMeta: api.TypeMeta{
ID: "1",
},
Spec: api.PodSpec{
Containers: []api.Container{
{
Name: "container",
},
},
Volumes: []api.Volume{
{
Name: "test",
},
},
RestartPolicy: api.RestartPolicy{
Never: &api.RestartPolicyNever{},
},
},
},
{
TypeMeta: api.TypeMeta{
ID: "2",
},
Spec: api.PodSpec{
Containers: []api.Container{
{
Name: "container2",
},
},
Volumes: []api.Volume{
{
Name: "test2",
},
},
RestartPolicy: api.RestartPolicy{
Never: &api.RestartPolicyNever{},
},
},
},
}
if !reflect.DeepEqual(expectedPods, received[0]) {
t.Errorf("Expected %#v, but got %#v", expectedPods, received[0])
}
@ -149,7 +258,7 @@ func TestPodInfo(t *testing.T) {
"goodpod": api.ContainerStatus{},
}
fw.fakeKubelet.infoFunc = func(name string) (api.PodInfo, error) {
if name == "goodpod.etcd" {
if name == "goodpod.default.etcd" {
return expected, nil
}
return nil, fmt.Errorf("bad pod %s", name)
@ -175,7 +284,7 @@ func TestContainerInfo(t *testing.T) {
fw := newServerTest()
expectedInfo := &info.ContainerInfo{}
podID := "somepod"
expectedPodID := "somepod" + ".etcd"
expectedPodID := "somepod" + ".default.etcd"
expectedContainerName := "goodcontainer"
fw.fakeKubelet.containerInfoFunc = func(podID, containerName string, req *info.ContainerInfoRequest) (*info.ContainerInfo, error) {
if podID != expectedPodID || containerName != expectedContainerName {
@ -278,7 +387,7 @@ func TestServeRunInContainer(t *testing.T) {
fw := newServerTest()
output := "foo bar"
podName := "foo"
expectedPodName := podName + ".etcd"
expectedPodName := podName + ".default.etcd"
expectedContainerName := "baz"
expectedCommand := "ls -a"
fw.fakeKubelet.runFunc = func(podFullName, uuid, containerName string, cmd []string) ([]byte, error) {
@ -317,7 +426,7 @@ func TestServeRunInContainerWithUUID(t *testing.T) {
fw := newServerTest()
output := "foo bar"
podName := "foo"
expectedPodName := podName + ".etcd"
expectedPodName := podName + ".default.etcd"
expectedUuid := "7e00838d_-_3523_-_11e4_-_8421_-_42010af0a720"
expectedContainerName := "baz"
expectedCommand := "ls -a"
@ -360,7 +469,7 @@ func TestContainerLogs(t *testing.T) {
fw := newServerTest()
output := "foo bar"
podName := "foo"
expectedPodName := podName + ".etcd"
expectedPodName := podName + ".default.etcd"
expectedContainerName := "baz"
expectedTail := ""
expectedFollow := false
@ -399,7 +508,7 @@ func TestContainerLogsWithTail(t *testing.T) {
fw := newServerTest()
output := "foo bar"
podName := "foo"
expectedPodName := podName + ".etcd"
expectedPodName := podName + ".default.etcd"
expectedContainerName := "baz"
expectedTail := "5"
expectedFollow := false
@ -438,7 +547,7 @@ func TestContainerLogsWithFollow(t *testing.T) {
fw := newServerTest()
output := "foo bar"
podName := "foo"
expectedPodName := podName + ".etcd"
expectedPodName := podName + ".default.etcd"
expectedContainerName := "baz"
expectedTail := ""
expectedFollow := true

View File

@ -22,13 +22,7 @@ import (
"github.com/GoogleCloudPlatform/kubernetes/pkg/api"
)
// Pod represents the structure of a pod on the Kubelet, distinct from the apiserver
// representation of a Pod.
type Pod struct {
Namespace string
Name string
Manifest api.ContainerManifest
}
const ConfigSourceAnnotationKey = "kubernetes/config.source"
// PodOperation defines what changes will be made on a pod configuration.
type PodOperation int
@ -48,13 +42,13 @@ const (
// sending an array of size one and Op == ADD|REMOVE (with REMOVE, only the ID is required).
// For setting the state of the system to a given state for this source configuration, set
// Pods as desired and Op to SET, which will reset the system state to that specified in this
// operation for this source channel. To remove all pods, set Pods to empty array and Op to SET.
// operation for this source channel. To remove all pods, set Pods to empty object and Op to SET.
type PodUpdate struct {
Pods []Pod
Pods []api.BoundPod
Op PodOperation
}
// GetPodFullName returns a name that full identifies a pod across all config sources.
func GetPodFullName(pod *Pod) string {
return fmt.Sprintf("%s.%s", pod.Name, pod.Namespace)
// GetPodFullName returns a name that uniquely identifies a pod across all config sources.
func GetPodFullName(pod *api.BoundPod) string {
return fmt.Sprintf("%s.%s.%s", pod.ID, pod.Namespace, pod.Annotations[ConfigSourceAnnotationKey])
}

View File

@ -1,33 +0,0 @@
/*
Copyright 2014 Google Inc. All rights reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package kubelet
import (
apierrs "github.com/GoogleCloudPlatform/kubernetes/pkg/api/errors"
"github.com/GoogleCloudPlatform/kubernetes/pkg/api/validation"
"github.com/GoogleCloudPlatform/kubernetes/pkg/util"
)
func ValidatePod(pod *Pod) (errors []error) {
if !util.IsDNSSubdomain(pod.Name) {
errors = append(errors, apierrs.NewFieldInvalid("name", pod.Name))
}
if errs := validation.ValidateManifest(&pod.Manifest); len(errs) != 0 {
errors = append(errors, errs...)
}
return errors
}

View File

@ -1,43 +0,0 @@
/*
Copyright 2014 Google Inc. All rights reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package kubelet_test
import (
"strings"
"testing"
"github.com/GoogleCloudPlatform/kubernetes/pkg/api"
. "github.com/GoogleCloudPlatform/kubernetes/pkg/kubelet"
)
func TestValidatePodNoName(t *testing.T) {
errorCases := map[string]Pod{
// manifest is tested in api/validation_test.go, ensure it is invoked
"empty version": {Name: "test", Manifest: api.ContainerManifest{Version: "", ID: "abc"}},
// Name
"zero-length name": {Name: "", Manifest: api.ContainerManifest{Version: "v1beta1"}},
"name > 255 characters": {Name: strings.Repeat("a", 256), Manifest: api.ContainerManifest{Version: "v1beta1"}},
"name not a DNS subdomain": {Name: "a.b.c.", Manifest: api.ContainerManifest{Version: "v1beta1"}},
"name with underscore": {Name: "a_b_c", Manifest: api.ContainerManifest{Version: "v1beta1"}},
}
for k, v := range errorCases {
if errs := ValidatePod(&v); len(errs) == 0 {
t.Errorf("expected failure for %s", k)
}
}
}

View File

@ -89,15 +89,15 @@ func NewEtcdHelper(client tools.EtcdGetSet, version string) (helper tools.EtcdHe
func New(c *Config) *Master {
minionRegistry := makeMinionRegistry(c)
serviceRegistry := etcd.NewRegistry(c.EtcdHelper, nil)
manifestFactory := &pod.BasicManifestFactory{
boundPodFactory := &pod.BasicBoundPodFactory{
ServiceRegistry: serviceRegistry,
}
m := &Master{
podRegistry: etcd.NewRegistry(c.EtcdHelper, manifestFactory),
podRegistry: etcd.NewRegistry(c.EtcdHelper, boundPodFactory),
controllerRegistry: etcd.NewRegistry(c.EtcdHelper, nil),
serviceRegistry: serviceRegistry,
endpointRegistry: etcd.NewRegistry(c.EtcdHelper, nil),
bindingRegistry: etcd.NewRegistry(c.EtcdHelper, manifestFactory),
bindingRegistry: etcd.NewRegistry(c.EtcdHelper, boundPodFactory),
eventRegistry: event.NewEtcdRegistry(c.EtcdHelper, uint64(c.EventTTL.Seconds())),
minionRegistry: minionRegistry,
client: c.Client,

View File

@ -51,15 +51,15 @@ const (
// Registry implements PodRegistry, ControllerRegistry, ServiceRegistry and MinionRegistry, backed by etcd.
type Registry struct {
tools.EtcdHelper
manifestFactory pod.ManifestFactory
boundPodFactory pod.BoundPodFactory
}
// NewRegistry creates an etcd registry.
func NewRegistry(helper tools.EtcdHelper, manifestFactory pod.ManifestFactory) *Registry {
func NewRegistry(helper tools.EtcdHelper, boundPodFactory pod.BoundPodFactory) *Registry {
registry := &Registry{
EtcdHelper: helper,
}
registry.manifestFactory = manifestFactory
registry.boundPodFactory = boundPodFactory
return registry
}
@ -177,7 +177,7 @@ func (r *Registry) GetPod(ctx api.Context, id string) (*api.Pod, error) {
}
func makeContainerKey(machine string) string {
return "/registry/hosts/" + machine + "/kubelet"
return "/registry/nodes/" + machine + "/boundpods"
}
// CreatePod creates a pod based on a specification.
@ -230,18 +230,18 @@ func (r *Registry) assignPod(ctx api.Context, podID string, machine string) erro
return err
}
// TODO: move this to a watch/rectification loop.
manifest, err := r.manifestFactory.MakeManifest(machine, *finalPod)
pod, err := r.boundPodFactory.MakeBoundPod(machine, finalPod)
if err != nil {
return err
}
contKey := makeContainerKey(machine)
err = r.AtomicUpdate(contKey, &api.ContainerManifestList{}, func(in runtime.Object) (runtime.Object, error) {
manifests := *in.(*api.ContainerManifestList)
manifests.Items = append(manifests.Items, manifest)
if !constraint.Allowed(manifests.Items) {
err = r.AtomicUpdate(contKey, &api.BoundPods{}, func(in runtime.Object) (runtime.Object, error) {
pods := *in.(*api.BoundPods)
pods.Items = append(pods.Items, *pod)
if !constraint.Allowed(pods.Items) {
return nil, fmt.Errorf("The assignment would cause a constraint violation")
}
return &manifests, nil
return &pods, nil
})
if err != nil {
// Put the pod's host back the way it was. This is a terrible hack that
@ -321,13 +321,13 @@ func (r *Registry) DeletePod(ctx api.Context, podID string) error {
}
// Next, remove the pod from the machine atomically.
contKey := makeContainerKey(machine)
return r.AtomicUpdate(contKey, &api.ContainerManifestList{}, func(in runtime.Object) (runtime.Object, error) {
manifests := in.(*api.ContainerManifestList)
newManifests := make([]api.ContainerManifest, 0, len(manifests.Items))
return r.AtomicUpdate(contKey, &api.BoundPods{}, func(in runtime.Object) (runtime.Object, error) {
pods := in.(*api.BoundPods)
newPods := make([]api.BoundPod, 0, len(pods.Items))
found := false
for _, manifest := range manifests.Items {
if manifest.ID != podID {
newManifests = append(newManifests, manifest)
for _, pod := range pods.Items {
if pod.ID != podID {
newPods = append(newPods, pod)
} else {
found = true
}
@ -336,10 +336,10 @@ func (r *Registry) DeletePod(ctx api.Context, podID string) error {
// This really shouldn't happen, it indicates something is broken, and likely
// there is a lost pod somewhere.
// However it is "deleted" so log it and move on
glog.Warningf("Couldn't find: %s in %#v", podID, manifests)
glog.Warningf("Couldn't find: %s in %#v", podID, pods)
}
manifests.Items = newManifests
return manifests, nil
pods.Items = newPods
return pods, nil
})
}

View File

@ -36,7 +36,7 @@ import (
func NewTestEtcdRegistry(client tools.EtcdClient) *Registry {
registry := NewRegistry(tools.EtcdHelper{client, latest.Codec, tools.RuntimeVersionAdapter{latest.ResourceVersioner}},
&pod.BasicManifestFactory{
&pod.BasicBoundPodFactory{
ServiceRegistry: &registrytest.ServiceRegistry{},
})
return registry
@ -160,7 +160,7 @@ func TestEtcdCreatePod(t *testing.T) {
},
E: tools.EtcdErrorNotFound,
}
fakeClient.Set("/registry/hosts/machine/kubelet", runtime.EncodeOrDie(latest.Codec, &api.ContainerManifestList{}), 0)
fakeClient.Set("/registry/nodes/machine/boundpods", runtime.EncodeOrDie(latest.Codec, &api.BoundPods{}), 0)
registry := NewTestEtcdRegistry(fakeClient)
err := registry.CreatePod(ctx, &api.Pod{
TypeMeta: api.TypeMeta{
@ -199,15 +199,15 @@ func TestEtcdCreatePod(t *testing.T) {
if pod.ID != "foo" {
t.Errorf("Unexpected pod: %#v %s", pod, resp.Node.Value)
}
var manifests api.ContainerManifestList
resp, err = fakeClient.Get("/registry/hosts/machine/kubelet", false, false)
var boundPods api.BoundPods
resp, err = fakeClient.Get("/registry/nodes/machine/boundpods", false, false)
if err != nil {
t.Errorf("unexpected error: %v", err)
}
err = latest.Codec.DecodeInto([]byte(resp.Node.Value), &manifests)
if len(manifests.Items) != 1 || manifests.Items[0].ID != "foo" {
t.Errorf("Unexpected manifest list: %#v", manifests)
err = latest.Codec.DecodeInto([]byte(resp.Node.Value), &boundPods)
if len(boundPods.Items) != 1 || boundPods.Items[0].ID != "foo" {
t.Errorf("Unexpected boundPod list: %#v", boundPods)
}
}
@ -268,7 +268,7 @@ func TestEtcdCreatePodWithContainersError(t *testing.T) {
},
E: tools.EtcdErrorNotFound,
}
fakeClient.Data["/registry/hosts/machine/kubelet"] = tools.EtcdResponseWithError{
fakeClient.Data["/registry/nodes/machine/boundpods"] = tools.EtcdResponseWithError{
R: &etcd.Response{
Node: nil,
},
@ -310,7 +310,7 @@ func TestEtcdCreatePodWithContainersNotFound(t *testing.T) {
},
E: tools.EtcdErrorNotFound,
}
fakeClient.Data["/registry/hosts/machine/kubelet"] = tools.EtcdResponseWithError{
fakeClient.Data["/registry/nodes/machine/boundpods"] = tools.EtcdResponseWithError{
R: &etcd.Response{
Node: nil,
},
@ -355,15 +355,15 @@ func TestEtcdCreatePodWithContainersNotFound(t *testing.T) {
if pod.ID != "foo" {
t.Errorf("Unexpected pod: %#v %s", pod, resp.Node.Value)
}
var manifests api.ContainerManifestList
resp, err = fakeClient.Get("/registry/hosts/machine/kubelet", false, false)
var boundPods api.BoundPods
resp, err = fakeClient.Get("/registry/nodes/machine/boundpods", false, false)
if err != nil {
t.Errorf("unexpected error: %v", err)
}
err = latest.Codec.DecodeInto([]byte(resp.Node.Value), &manifests)
if len(manifests.Items) != 1 || manifests.Items[0].ID != "foo" {
t.Errorf("Unexpected manifest list: %#v", manifests)
err = latest.Codec.DecodeInto([]byte(resp.Node.Value), &boundPods)
if len(boundPods.Items) != 1 || boundPods.Items[0].ID != "foo" {
t.Errorf("Unexpected boundPod list: %#v", boundPods)
}
}
@ -378,9 +378,9 @@ func TestEtcdCreatePodWithExistingContainers(t *testing.T) {
},
E: tools.EtcdErrorNotFound,
}
fakeClient.Set("/registry/hosts/machine/kubelet", runtime.EncodeOrDie(latest.Codec, &api.ContainerManifestList{
Items: []api.ContainerManifest{
{ID: "bar"},
fakeClient.Set("/registry/nodes/machine/boundpods", runtime.EncodeOrDie(latest.Codec, &api.BoundPods{
Items: []api.BoundPod{
{TypeMeta: api.TypeMeta{ID: "bar"}},
},
}), 0)
registry := NewTestEtcdRegistry(fakeClient)
@ -422,15 +422,15 @@ func TestEtcdCreatePodWithExistingContainers(t *testing.T) {
if pod.ID != "foo" {
t.Errorf("Unexpected pod: %#v %s", pod, resp.Node.Value)
}
var manifests api.ContainerManifestList
resp, err = fakeClient.Get("/registry/hosts/machine/kubelet", false, false)
var boundPods api.BoundPods
resp, err = fakeClient.Get("/registry/nodes/machine/boundpods", false, false)
if err != nil {
t.Errorf("unexpected error: %v", err)
}
err = latest.Codec.DecodeInto([]byte(resp.Node.Value), &manifests)
if len(manifests.Items) != 2 || manifests.Items[1].ID != "foo" {
t.Errorf("Unexpected manifest list: %#v", manifests)
err = latest.Codec.DecodeInto([]byte(resp.Node.Value), &boundPods)
if len(boundPods.Items) != 2 || boundPods.Items[1].ID != "foo" {
t.Errorf("Unexpected boundPod list: %#v", boundPods)
}
}
@ -511,7 +511,7 @@ func TestEtcdUpdatePodScheduled(t *testing.T) {
},
}), 1)
contKey := "/registry/hosts/machine/kubelet"
contKey := "/registry/nodes/machine/boundpods"
fakeClient.Set(contKey, runtime.EncodeOrDie(latest.Codec, &api.ContainerManifestList{
Items: []api.ContainerManifest{
{
@ -586,9 +586,9 @@ func TestEtcdDeletePod(t *testing.T) {
TypeMeta: api.TypeMeta{ID: "foo"},
DesiredState: api.PodState{Host: "machine"},
}), 0)
fakeClient.Set("/registry/hosts/machine/kubelet", runtime.EncodeOrDie(latest.Codec, &api.ContainerManifestList{
Items: []api.ContainerManifest{
{ID: "foo"},
fakeClient.Set("/registry/nodes/machine/boundpods", runtime.EncodeOrDie(latest.Codec, &api.BoundPods{
Items: []api.BoundPod{
{TypeMeta: api.TypeMeta{ID: "foo"}},
},
}), 0)
registry := NewTestEtcdRegistry(fakeClient)
@ -602,13 +602,13 @@ func TestEtcdDeletePod(t *testing.T) {
} else if fakeClient.DeletedKeys[0] != key {
t.Errorf("Unexpected key: %s, expected %s", fakeClient.DeletedKeys[0], key)
}
response, err := fakeClient.Get("/registry/hosts/machine/kubelet", false, false)
response, err := fakeClient.Get("/registry/nodes/machine/boundpods", false, false)
if err != nil {
t.Fatalf("Unexpected error %v", err)
}
var manifests api.ContainerManifestList
latest.Codec.DecodeInto([]byte(response.Node.Value), &manifests)
if len(manifests.Items) != 0 {
var boundPods api.BoundPods
latest.Codec.DecodeInto([]byte(response.Node.Value), &boundPods)
if len(boundPods.Items) != 0 {
t.Errorf("Unexpected container set: %s, expected empty", response.Node.Value)
}
}
@ -622,10 +622,10 @@ func TestEtcdDeletePodMultipleContainers(t *testing.T) {
TypeMeta: api.TypeMeta{ID: "foo"},
DesiredState: api.PodState{Host: "machine"},
}), 0)
fakeClient.Set("/registry/hosts/machine/kubelet", runtime.EncodeOrDie(latest.Codec, &api.ContainerManifestList{
Items: []api.ContainerManifest{
{ID: "foo"},
{ID: "bar"},
fakeClient.Set("/registry/nodes/machine/boundpods", runtime.EncodeOrDie(latest.Codec, &api.BoundPods{
Items: []api.BoundPod{
{TypeMeta: api.TypeMeta{ID: "foo"}},
{TypeMeta: api.TypeMeta{ID: "bar"}},
},
}), 0)
registry := NewTestEtcdRegistry(fakeClient)
@ -640,17 +640,17 @@ func TestEtcdDeletePodMultipleContainers(t *testing.T) {
if fakeClient.DeletedKeys[0] != key {
t.Errorf("Unexpected key: %s, expected %s", fakeClient.DeletedKeys[0], key)
}
response, err := fakeClient.Get("/registry/hosts/machine/kubelet", false, false)
response, err := fakeClient.Get("/registry/nodes/machine/boundpods", false, false)
if err != nil {
t.Fatalf("Unexpected error %v", err)
}
var manifests api.ContainerManifestList
latest.Codec.DecodeInto([]byte(response.Node.Value), &manifests)
if len(manifests.Items) != 1 {
t.Fatalf("Unexpected manifest set: %#v, expected empty", manifests)
var boundPods api.BoundPods
latest.Codec.DecodeInto([]byte(response.Node.Value), &boundPods)
if len(boundPods.Items) != 1 {
t.Fatalf("Unexpected boundPod set: %#v, expected empty", boundPods)
}
if manifests.Items[0].ID != "bar" {
t.Errorf("Deleted wrong manifest: %#v", manifests)
if boundPods.Items[0].ID != "bar" {
t.Errorf("Deleted wrong boundPod: %#v", boundPods)
}
}

View File

@ -21,24 +21,27 @@ import (
"github.com/GoogleCloudPlatform/kubernetes/pkg/registry/service"
)
type ManifestFactory interface {
type BoundPodFactory interface {
// Make a container object for a given pod, given the machine that the pod is running on.
MakeManifest(machine string, pod api.Pod) (api.ContainerManifest, error)
MakeBoundPod(machine string, pod *api.Pod) (*api.BoundPod, error)
}
type BasicManifestFactory struct {
type BasicBoundPodFactory struct {
// TODO: this should really point at the API rather than a registry
ServiceRegistry service.Registry
}
func (b *BasicManifestFactory) MakeManifest(machine string, pod api.Pod) (api.ContainerManifest, error) {
func (b *BasicBoundPodFactory) MakeBoundPod(machine string, pod *api.Pod) (*api.BoundPod, error) {
envVars, err := service.GetServiceEnvironmentVariables(api.NewContext(), b.ServiceRegistry, machine)
if err != nil {
return api.ContainerManifest{}, err
return nil, err
}
for ix, container := range pod.DesiredState.Manifest.Containers {
pod.DesiredState.Manifest.ID = pod.ID
pod.DesiredState.Manifest.Containers[ix].Env = append(container.Env, envVars...)
boundPod := &api.BoundPod{}
if err := api.Scheme.Convert(pod, boundPod); err != nil {
return nil, err
}
return pod.DesiredState.Manifest, nil
for ix, container := range boundPod.Spec.Containers {
boundPod.Spec.Containers[ix].Env = append(container.Env, envVars...)
}
return boundPod, nil
}

View File

@ -25,13 +25,13 @@ import (
"github.com/GoogleCloudPlatform/kubernetes/pkg/util"
)
func TestMakeManifestNoServices(t *testing.T) {
func TestMakeBoundPodNoServices(t *testing.T) {
registry := registrytest.ServiceRegistry{}
factory := &BasicManifestFactory{
factory := &BasicBoundPodFactory{
ServiceRegistry: &registry,
}
manifest, err := factory.MakeManifest("machine", api.Pod{
pod, err := factory.MakeBoundPod("machine", &api.Pod{
TypeMeta: api.TypeMeta{ID: "foobar"},
DesiredState: api.PodState{
Manifest: api.ContainerManifest{
@ -44,19 +44,19 @@ func TestMakeManifestNoServices(t *testing.T) {
},
})
if err != nil {
t.Errorf("unexpected error: %v", err)
t.Fatalf("unexpected error: %v", err)
}
container := manifest.Containers[0]
container := pod.Spec.Containers[0]
if len(container.Env) != 0 {
t.Errorf("Expected zero env vars, got: %#v", manifest)
t.Errorf("Expected zero env vars, got: %#v", pod)
}
if manifest.ID != "foobar" {
t.Errorf("Failed to assign ID to manifest: %#v", manifest.ID)
if pod.ID != "foobar" {
t.Errorf("Failed to assign ID to pod: %#v", pod.ID)
}
}
func TestMakeManifestServices(t *testing.T) {
func TestMakeBoundPodServices(t *testing.T) {
registry := registrytest.ServiceRegistry{
List: api.ServiceList{
Items: []api.Service{
@ -72,11 +72,11 @@ func TestMakeManifestServices(t *testing.T) {
},
},
}
factory := &BasicManifestFactory{
factory := &BasicBoundPodFactory{
ServiceRegistry: &registry,
}
manifest, err := factory.MakeManifest("machine", api.Pod{
pod, err := factory.MakeBoundPod("machine", &api.Pod{
DesiredState: api.PodState{
Manifest: api.ContainerManifest{
Containers: []api.Container{
@ -88,10 +88,10 @@ func TestMakeManifestServices(t *testing.T) {
},
})
if err != nil {
t.Errorf("unexpected error: %v", err)
t.Fatalf("unexpected error: %v", err)
}
container := manifest.Containers[0]
container := pod.Spec.Containers[0]
envs := []api.EnvVar{
{
Name: "TEST_SERVICE_HOST",
@ -123,8 +123,7 @@ func TestMakeManifestServices(t *testing.T) {
},
}
if len(container.Env) != len(envs) {
t.Errorf("Expected %d env vars, got %d: %#v", len(envs), len(container.Env), manifest)
return
t.Fatalf("Expected %d env vars, got %d: %#v", len(envs), len(container.Env), pod)
}
for ix := range container.Env {
if !reflect.DeepEqual(envs[ix], container.Env[ix]) {
@ -133,7 +132,7 @@ func TestMakeManifestServices(t *testing.T) {
}
}
func TestMakeManifestServicesExistingEnvVar(t *testing.T) {
func TestMakeBoundPodServicesExistingEnvVar(t *testing.T) {
registry := registrytest.ServiceRegistry{
List: api.ServiceList{
Items: []api.Service{
@ -149,11 +148,11 @@ func TestMakeManifestServicesExistingEnvVar(t *testing.T) {
},
},
}
factory := &BasicManifestFactory{
factory := &BasicBoundPodFactory{
ServiceRegistry: &registry,
}
manifest, err := factory.MakeManifest("machine", api.Pod{
pod, err := factory.MakeBoundPod("machine", &api.Pod{
DesiredState: api.PodState{
Manifest: api.ContainerManifest{
Containers: []api.Container{
@ -170,10 +169,10 @@ func TestMakeManifestServicesExistingEnvVar(t *testing.T) {
},
})
if err != nil {
t.Errorf("unexpected error: %v", err)
t.Fatalf("unexpected error: %v", err)
}
container := manifest.Containers[0]
container := pod.Spec.Containers[0]
envs := []api.EnvVar{
{
@ -210,8 +209,7 @@ func TestMakeManifestServicesExistingEnvVar(t *testing.T) {
},
}
if len(container.Env) != len(envs) {
t.Errorf("Expected %d env vars, got: %#v", len(envs), manifest)
return
t.Fatalf("Expected %d env vars, got: %#v", len(envs), pod)
}
for ix := range container.Env {
if !reflect.DeepEqual(envs[ix], container.Env[ix]) {