diff --git a/pkg/kubelet/config/apiserver.go b/pkg/kubelet/config/apiserver.go new file mode 100644 index 00000000000..113a5ec800b --- /dev/null +++ b/pkg/kubelet/config/apiserver.go @@ -0,0 +1,57 @@ +/* +Copyright 2015 Google Inc. All rights reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +// Reads the pod configuration from the Kubernetes apiserver. +package config + +import ( + "github.com/GoogleCloudPlatform/kubernetes/pkg/api" + "github.com/GoogleCloudPlatform/kubernetes/pkg/client" + "github.com/GoogleCloudPlatform/kubernetes/pkg/client/cache" + "github.com/GoogleCloudPlatform/kubernetes/pkg/kubelet" + "github.com/GoogleCloudPlatform/kubernetes/pkg/labels" + "github.com/golang/glog" +) + +// NewSourceApiserver creates a config source that watches and pulls from the apiserver. +func NewSourceApiserver(client *client.Client, hostname string, updates chan<- interface{}) { + lw := &cache.ListWatch{ + Client: client, + FieldSelector: labels.OneTermEqualSelector("Status.Host", hostname), + Resource: "pods", + } + newSourceApiserverFromLW(lw, updates) +} + +// newSourceApiserverFromLW holds creates a config source that watches an pulls from the apiserver. +func newSourceApiserverFromLW(lw cache.ListerWatcher, updates chan<- interface{}) { + send := func(objs []interface{}) { + var bpods []api.BoundPod + for _, o := range objs { + pod := o.(*api.Pod) + bpod := api.BoundPod{} + if err := api.Scheme.Convert(pod, &bpod); err != nil { + glog.Errorf("Unable to interpret Pod from apiserver as a BoundPod: %v: %+v", err, pod) + continue + } + // Make a dummy self link so that references to this bound pod will work. + bpod.SelfLink = "/api/v1beta1/boundPods/" + bpod.Name + bpods = append(bpods, bpod) + } + updates <- kubelet.PodUpdate{bpods, kubelet.SET, kubelet.ApiserverSource} + } + cache.NewReflector(lw, &api.Pod{}, cache.NewUndeltaStore(send)).Run() +} diff --git a/pkg/kubelet/config/apiserver_test.go b/pkg/kubelet/config/apiserver_test.go new file mode 100644 index 00000000000..c57167b0047 --- /dev/null +++ b/pkg/kubelet/config/apiserver_test.go @@ -0,0 +1,126 @@ +/* +Copyright 2014 Google Inc. All rights reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package config + +import ( + "testing" + + "github.com/GoogleCloudPlatform/kubernetes/pkg/api" + "github.com/GoogleCloudPlatform/kubernetes/pkg/client/cache" + "github.com/GoogleCloudPlatform/kubernetes/pkg/kubelet" + "github.com/GoogleCloudPlatform/kubernetes/pkg/runtime" + "github.com/GoogleCloudPlatform/kubernetes/pkg/watch" +) + +const hostname string = "mcaa1" + +type fakePodLW struct { + listResp runtime.Object + watchResp watch.Interface +} + +func (lw fakePodLW) List() (runtime.Object, error) { + return lw.listResp, nil +} + +func (lw fakePodLW) Watch(resourceVersion string) (watch.Interface, error) { + return lw.watchResp, nil +} + +var _ cache.ListerWatcher = fakePodLW{} + +func TestNewSourceApiserver(t *testing.T) { + podv1 := api.Pod{ + ObjectMeta: api.ObjectMeta{Name: "p"}, + Spec: api.PodSpec{Containers: []api.Container{{Image: "image/one"}}}} + podv2 := api.Pod{ + ObjectMeta: api.ObjectMeta{Name: "p"}, + Spec: api.PodSpec{Containers: []api.Container{{Image: "image/two"}}}} + + expectedBoundPodv1 := api.BoundPod{ + ObjectMeta: api.ObjectMeta{Name: "p", SelfLink: "/api/v1beta1/boundPods/p"}, + Spec: api.PodSpec{Containers: []api.Container{{Image: "image/one"}}}} + expectedBoundPodv2 := api.BoundPod{ + ObjectMeta: api.ObjectMeta{Name: "p", SelfLink: "/api/v1beta1/boundPods/p"}, + Spec: api.PodSpec{Containers: []api.Container{{Image: "image/two"}}}} + + // Setup fake api client. + fakeWatch := watch.NewFake() + lw := fakePodLW{ + listResp: &api.PodList{Items: []api.Pod{podv1}}, + watchResp: fakeWatch, + } + + ch := make(chan interface{}) + + newSourceApiserverFromLW(lw, ch) + + got, ok := <-ch + if !ok { + t.Errorf("Unable to read from channel when expected") + } + update := got.(kubelet.PodUpdate) + expected := CreatePodUpdate(kubelet.SET, kubelet.ApiserverSource, expectedBoundPodv1) + if !api.Semantic.DeepEqual(expected, update) { + t.Errorf("Expected %#v; Got %#v", expected, update) + } + + fakeWatch.Modify(&podv2) + got, ok = <-ch + if !ok { + t.Errorf("Unable to read from channel when expected") + } + update = got.(kubelet.PodUpdate) + expected = CreatePodUpdate(kubelet.SET, kubelet.ApiserverSource, expectedBoundPodv2) + if !api.Semantic.DeepEqual(expected, update) { + t.Fatalf("Expected %#v, Got %#v", expected, update) + } + + fakeWatch.Delete(&podv2) + got, ok = <-ch + if !ok { + t.Errorf("Unable to read from channel when expected") + } + update = got.(kubelet.PodUpdate) + expected = CreatePodUpdate(kubelet.SET, kubelet.ApiserverSource) + if !api.Semantic.DeepEqual(expected, update) { + t.Errorf("Expected %#v, Got %#v", expected, update) + } +} + +func TestNewSourceApiserverInitialEmptySendsEmptyPodUpdate(t *testing.T) { + // Setup fake api client. + fakeWatch := watch.NewFake() + lw := fakePodLW{ + listResp: &api.PodList{Items: []api.Pod{}}, + watchResp: fakeWatch, + } + + ch := make(chan interface{}) + + newSourceApiserverFromLW(lw, ch) + + got, ok := <-ch + if !ok { + t.Errorf("Unable to read from channel when expected") + } + update := got.(kubelet.PodUpdate) + expected := CreatePodUpdate(kubelet.SET, kubelet.ApiserverSource) + if !api.Semantic.DeepEqual(expected, update) { + t.Errorf("Expected %#v; Got %#v", expected, update) + } +} diff --git a/pkg/kubelet/types.go b/pkg/kubelet/types.go index 28787fb0ac1..870d5a7fa6c 100644 --- a/pkg/kubelet/types.go +++ b/pkg/kubelet/types.go @@ -46,6 +46,8 @@ const ( HTTPSource = "http" // Updates received to the kubelet server ServerSource = "server" + // Updates from Kubernetes API Server + ApiserverSource = "api" // Updates from all sources AllSource = "*" )