Added kubelet config source from apiserver.

This commit is contained in:
Eric Tune 2015-01-09 23:07:08 -08:00
parent fa152ab3f1
commit 18bcef5235
3 changed files with 185 additions and 0 deletions

View File

@ -0,0 +1,57 @@
/*
Copyright 2015 Google Inc. All rights reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
// Reads the pod configuration from the Kubernetes apiserver.
package config
import (
"github.com/GoogleCloudPlatform/kubernetes/pkg/api"
"github.com/GoogleCloudPlatform/kubernetes/pkg/client"
"github.com/GoogleCloudPlatform/kubernetes/pkg/client/cache"
"github.com/GoogleCloudPlatform/kubernetes/pkg/kubelet"
"github.com/GoogleCloudPlatform/kubernetes/pkg/labels"
"github.com/golang/glog"
)
// NewSourceApiserver creates a config source that watches and pulls from the apiserver.
func NewSourceApiserver(client *client.Client, hostname string, updates chan<- interface{}) {
lw := &cache.ListWatch{
Client: client,
FieldSelector: labels.OneTermEqualSelector("Status.Host", hostname),
Resource: "pods",
}
newSourceApiserverFromLW(lw, updates)
}
// newSourceApiserverFromLW holds creates a config source that watches an pulls from the apiserver.
func newSourceApiserverFromLW(lw cache.ListerWatcher, updates chan<- interface{}) {
send := func(objs []interface{}) {
var bpods []api.BoundPod
for _, o := range objs {
pod := o.(*api.Pod)
bpod := api.BoundPod{}
if err := api.Scheme.Convert(pod, &bpod); err != nil {
glog.Errorf("Unable to interpret Pod from apiserver as a BoundPod: %v: %+v", err, pod)
continue
}
// Make a dummy self link so that references to this bound pod will work.
bpod.SelfLink = "/api/v1beta1/boundPods/" + bpod.Name
bpods = append(bpods, bpod)
}
updates <- kubelet.PodUpdate{bpods, kubelet.SET, kubelet.ApiserverSource}
}
cache.NewReflector(lw, &api.Pod{}, cache.NewUndeltaStore(send)).Run()
}

View File

@ -0,0 +1,126 @@
/*
Copyright 2014 Google Inc. All rights reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package config
import (
"testing"
"github.com/GoogleCloudPlatform/kubernetes/pkg/api"
"github.com/GoogleCloudPlatform/kubernetes/pkg/client/cache"
"github.com/GoogleCloudPlatform/kubernetes/pkg/kubelet"
"github.com/GoogleCloudPlatform/kubernetes/pkg/runtime"
"github.com/GoogleCloudPlatform/kubernetes/pkg/watch"
)
const hostname string = "mcaa1"
type fakePodLW struct {
listResp runtime.Object
watchResp watch.Interface
}
func (lw fakePodLW) List() (runtime.Object, error) {
return lw.listResp, nil
}
func (lw fakePodLW) Watch(resourceVersion string) (watch.Interface, error) {
return lw.watchResp, nil
}
var _ cache.ListerWatcher = fakePodLW{}
func TestNewSourceApiserver(t *testing.T) {
podv1 := api.Pod{
ObjectMeta: api.ObjectMeta{Name: "p"},
Spec: api.PodSpec{Containers: []api.Container{{Image: "image/one"}}}}
podv2 := api.Pod{
ObjectMeta: api.ObjectMeta{Name: "p"},
Spec: api.PodSpec{Containers: []api.Container{{Image: "image/two"}}}}
expectedBoundPodv1 := api.BoundPod{
ObjectMeta: api.ObjectMeta{Name: "p", SelfLink: "/api/v1beta1/boundPods/p"},
Spec: api.PodSpec{Containers: []api.Container{{Image: "image/one"}}}}
expectedBoundPodv2 := api.BoundPod{
ObjectMeta: api.ObjectMeta{Name: "p", SelfLink: "/api/v1beta1/boundPods/p"},
Spec: api.PodSpec{Containers: []api.Container{{Image: "image/two"}}}}
// Setup fake api client.
fakeWatch := watch.NewFake()
lw := fakePodLW{
listResp: &api.PodList{Items: []api.Pod{podv1}},
watchResp: fakeWatch,
}
ch := make(chan interface{})
newSourceApiserverFromLW(lw, ch)
got, ok := <-ch
if !ok {
t.Errorf("Unable to read from channel when expected")
}
update := got.(kubelet.PodUpdate)
expected := CreatePodUpdate(kubelet.SET, kubelet.ApiserverSource, expectedBoundPodv1)
if !api.Semantic.DeepEqual(expected, update) {
t.Errorf("Expected %#v; Got %#v", expected, update)
}
fakeWatch.Modify(&podv2)
got, ok = <-ch
if !ok {
t.Errorf("Unable to read from channel when expected")
}
update = got.(kubelet.PodUpdate)
expected = CreatePodUpdate(kubelet.SET, kubelet.ApiserverSource, expectedBoundPodv2)
if !api.Semantic.DeepEqual(expected, update) {
t.Fatalf("Expected %#v, Got %#v", expected, update)
}
fakeWatch.Delete(&podv2)
got, ok = <-ch
if !ok {
t.Errorf("Unable to read from channel when expected")
}
update = got.(kubelet.PodUpdate)
expected = CreatePodUpdate(kubelet.SET, kubelet.ApiserverSource)
if !api.Semantic.DeepEqual(expected, update) {
t.Errorf("Expected %#v, Got %#v", expected, update)
}
}
func TestNewSourceApiserverInitialEmptySendsEmptyPodUpdate(t *testing.T) {
// Setup fake api client.
fakeWatch := watch.NewFake()
lw := fakePodLW{
listResp: &api.PodList{Items: []api.Pod{}},
watchResp: fakeWatch,
}
ch := make(chan interface{})
newSourceApiserverFromLW(lw, ch)
got, ok := <-ch
if !ok {
t.Errorf("Unable to read from channel when expected")
}
update := got.(kubelet.PodUpdate)
expected := CreatePodUpdate(kubelet.SET, kubelet.ApiserverSource)
if !api.Semantic.DeepEqual(expected, update) {
t.Errorf("Expected %#v; Got %#v", expected, update)
}
}

View File

@ -46,6 +46,8 @@ const (
HTTPSource = "http"
// Updates received to the kubelet server
ServerSource = "server"
// Updates from Kubernetes API Server
ApiserverSource = "api"
// Updates from all sources
AllSource = "*"
)