Merge pull request #3376 from erictune/channel_api

Kublet watches Pods.
This commit is contained in:
Daniel Smith
2015-01-15 14:48:12 -08:00
7 changed files with 79 additions and 19 deletions

View File

@@ -191,13 +191,13 @@ func startComponents(manifestURL string) (apiServerURL string) {
// Kubelet (localhost)
testRootDir := makeTempDirOrDie("kubelet_integ_1.")
glog.Infof("Using %s as root dir for kubelet #1", testRootDir)
standalone.SimpleRunKubelet(cl, etcdClient, &fakeDocker1, machineList[0], testRootDir, manifestURL, "127.0.0.1", 10250, api.NamespaceDefault)
standalone.SimpleRunKubelet(cl, nil, &fakeDocker1, machineList[0], testRootDir, manifestURL, "127.0.0.1", 10250, api.NamespaceDefault)
// Kubelet (machine)
// Create a second kubelet so that the guestbook example's two redis slaves both
// have a place they can schedule.
testRootDir = makeTempDirOrDie("kubelet_integ_2.")
glog.Infof("Using %s as root dir for kubelet #2", testRootDir)
standalone.SimpleRunKubelet(cl, etcdClient, &fakeDocker2, machineList[1], testRootDir, "", "127.0.0.1", 10251, api.NamespaceDefault)
standalone.SimpleRunKubelet(cl, nil, &fakeDocker2, machineList[1], testRootDir, "", "127.0.0.1", 10251, api.NamespaceDefault)
return apiServer.URL
}

View File

@@ -57,7 +57,7 @@ var (
allowPrivileged = flag.Bool("allow_privileged", false, "If true, allow containers to request privileged mode. [default=false]")
registryPullQPS = flag.Float64("registry_qps", 0.0, "If > 0, limit registry pull QPS to this value. If 0, unlimited. [default=0.0]")
registryBurst = flag.Int("registry_burst", 10, "Maximum size of a bursty pulls, temporarily allows pulls to burst to this number, while still not exceeding registry_qps. Only used if --registry_qps > 0")
runonce = flag.Bool("runonce", false, "If true, exit after spawning pods from local manifests or remote urls. Exclusive with --etcd_servers and --enable-server")
runonce = flag.Bool("runonce", false, "If true, exit after spawning pods from local manifests or remote urls. Exclusive with --etcd_servers, --api_servers, and --enable-server")
enableDebuggingHandlers = flag.Bool("enable_debugging_handlers", true, "Enables server endpoints for log collection and local running of containers and commands")
minimumGCAge = flag.Duration("minimum_container_ttl_duration", 1*time.Minute, "Minimum age for a finished container before it is garbage collected. Examples: '300ms', '10s' or '2h45m'")
maxContainerCount = flag.Int("maximum_dead_containers_per_container", 5, "Maximum number of old instances of a container to retain per container. Each container takes up some disk space. Default: 5.")
@@ -73,15 +73,19 @@ var (
func init() {
flag.Var(&etcdServerList, "etcd_servers", "List of etcd servers to watch (http://ip:port), comma separated. Mutually exclusive with -etcd_config")
flag.Var(&address, "address", "The IP address for the info server to serve on (set to 0.0.0.0 for all interfaces)")
flag.Var(&apiServerList, "api_servers", "List of Kubernetes API servers to publish events to. (ip:port), comma separated.")
flag.Var(&apiServerList, "api_servers", "List of Kubernetes API servers for publishing events, and reading pods and services. (ip:port), comma separated.")
flag.Var(&clusterDNS, "cluster_dns", "IP address for a cluster DNS server. If set, kubelet will configure all containers to use this for DNS resolution in addition to the host's DNS servers")
}
func setupRunOnce() {
if *runonce {
// Don't use remote (etcd or apiserver) sources
if len(etcdServerList) > 0 {
glog.Fatalf("invalid option: --runonce and --etcd_servers are mutually exclusive")
}
if len(apiServerList) > 0 {
glog.Fatalf("invalid option: --runonce and --api_servers are mutually exclusive")
}
if *enableServer {
glog.Infof("--runonce is set, disabling server")
*enableServer = false
@@ -97,6 +101,18 @@ func main() {
verflag.PrintAndExitIfRequested()
// Cluster creation scripts support both kubernetes versions that 1) support kublet watching
// apiserver for pods, and 2) ones that don't. So they ca set both --etcd_servers and
// --api_servers. The current code will ignore the --etcd_servers flag, while older kubelet
// code will use the --etd_servers flag for pods, and use --api_servers for event publising.
//
// TODO(erictune): convert all cloud provider scripts and Google Container Engine to
// use only --api_servers, then delete --etcd_servers flag and the resulting dead code.
if len(etcdServerList) > 0 && len(apiServerList) > 0 {
glog.Infof("Both --etcd_servers and --api_servers are set. Not using etcd source.")
etcdServerList = util.StringList{}
}
setupRunOnce()
if err := util.ApplyOomScoreAdj(*oomScoreAdj); err != nil {

View File

@@ -54,7 +54,7 @@ func startComponents(etcdClient tools.EtcdClient, cl *client.Client, addr string
standalone.RunControllerManager(machineList, cl, *nodeMilliCPU, *nodeMemory)
dockerClient := util.ConnectToDockerOrDie(*dockerEndpoint)
standalone.SimpleRunKubelet(cl, etcdClient, dockerClient, machineList[0], "/tmp/kubernetes", "", "127.0.0.1", 10250, *masterServiceNamespace)
standalone.SimpleRunKubelet(cl, nil, dockerClient, machineList[0], "/tmp/kubernetes", "", "127.0.0.1", 10250, *masterServiceNamespace)
}
func newApiClient(addr string, port int) *client.Client {

View File

@@ -43,25 +43,31 @@ func (lw fakePodLW) Watch(resourceVersion string) (watch.Interface, error) {
var _ cache.ListerWatcher = fakePodLW{}
func TestNewSourceApiserver(t *testing.T) {
podv1 := api.Pod{
func TestNewSourceApiserver_UpdatesAndMultiplePods(t *testing.T) {
pod1v1 := api.Pod{
ObjectMeta: api.ObjectMeta{Name: "p"},
Spec: api.PodSpec{Containers: []api.Container{{Image: "image/one"}}}}
podv2 := api.Pod{
pod1v2 := api.Pod{
ObjectMeta: api.ObjectMeta{Name: "p"},
Spec: api.PodSpec{Containers: []api.Container{{Image: "image/two"}}}}
pod2 := api.Pod{
ObjectMeta: api.ObjectMeta{Name: "q"},
Spec: api.PodSpec{Containers: []api.Container{{Image: "image/blah"}}}}
expectedBoundPodv1 := api.BoundPod{
expectedBoundPod1v1 := api.BoundPod{
ObjectMeta: api.ObjectMeta{Name: "p", SelfLink: "/api/v1beta1/boundPods/p"},
Spec: api.PodSpec{Containers: []api.Container{{Image: "image/one"}}}}
expectedBoundPodv2 := api.BoundPod{
expectedBoundPod1v2 := api.BoundPod{
ObjectMeta: api.ObjectMeta{Name: "p", SelfLink: "/api/v1beta1/boundPods/p"},
Spec: api.PodSpec{Containers: []api.Container{{Image: "image/two"}}}}
expectedBoundPod2 := api.BoundPod{
ObjectMeta: api.ObjectMeta{Name: "q", SelfLink: "/api/v1beta1/boundPods/q"},
Spec: api.PodSpec{Containers: []api.Container{{Image: "image/blah"}}}}
// Setup fake api client.
fakeWatch := watch.NewFake()
lw := fakePodLW{
listResp: &api.PodList{Items: []api.Pod{podv1}},
listResp: &api.PodList{Items: []api.Pod{pod1v1}},
watchResp: fakeWatch,
}
@@ -74,23 +80,54 @@ func TestNewSourceApiserver(t *testing.T) {
t.Errorf("Unable to read from channel when expected")
}
update := got.(kubelet.PodUpdate)
expected := CreatePodUpdate(kubelet.SET, kubelet.ApiserverSource, expectedBoundPodv1)
expected := CreatePodUpdate(kubelet.SET, kubelet.ApiserverSource, expectedBoundPod1v1)
if !api.Semantic.DeepEqual(expected, update) {
t.Errorf("Expected %#v; Got %#v", expected, update)
}
fakeWatch.Modify(&podv2)
// Add another pod
fakeWatch.Add(&pod2)
got, ok = <-ch
if !ok {
t.Errorf("Unable to read from channel when expected")
}
update = got.(kubelet.PodUpdate)
expected = CreatePodUpdate(kubelet.SET, kubelet.ApiserverSource, expectedBoundPodv2)
if !api.Semantic.DeepEqual(expected, update) {
t.Fatalf("Expected %#v, Got %#v", expected, update)
// Could be sorted either of these two ways:
expectedA := CreatePodUpdate(kubelet.SET, kubelet.ApiserverSource, expectedBoundPod1v1, expectedBoundPod2)
expectedB := CreatePodUpdate(kubelet.SET, kubelet.ApiserverSource, expectedBoundPod2, expectedBoundPod1v1)
if !api.Semantic.DeepEqual(expectedA, update) && !api.Semantic.DeepEqual(expectedB, update) {
t.Errorf("Expected %#v or %#v, Got %#v", expectedA, expectedB, update)
}
fakeWatch.Delete(&podv2)
// Modify pod1
fakeWatch.Modify(&pod1v2)
got, ok = <-ch
if !ok {
t.Errorf("Unable to read from channel when expected")
}
update = got.(kubelet.PodUpdate)
expectedA = CreatePodUpdate(kubelet.SET, kubelet.ApiserverSource, expectedBoundPod1v2, expectedBoundPod2)
expectedB = CreatePodUpdate(kubelet.SET, kubelet.ApiserverSource, expectedBoundPod2, expectedBoundPod1v2)
if !api.Semantic.DeepEqual(expectedA, update) && !api.Semantic.DeepEqual(expectedB, update) {
t.Errorf("Expected %#v or %#v, Got %#v", expectedA, expectedB, update)
}
// Delete pod1
fakeWatch.Delete(&pod1v2)
got, ok = <-ch
if !ok {
t.Errorf("Unable to read from channel when expected")
}
update = got.(kubelet.PodUpdate)
expected = CreatePodUpdate(kubelet.SET, kubelet.ApiserverSource, expectedBoundPod2)
if !api.Semantic.DeepEqual(expected, update) {
t.Errorf("Expected %#v, Got %#v", expected, update)
}
// Delete pod2
fakeWatch.Delete(&pod2)
got, ok = <-ch
if !ok {
t.Errorf("Unable to read from channel when expected")

View File

@@ -100,6 +100,7 @@ func NewMainKubelet(
hostname: hostname,
dockerClient: dockerClient,
etcdClient: etcdClient,
kubeClient: kubeClient,
rootDirectory: rootDirectory,
resyncInterval: resyncInterval,
networkContainerImage: networkContainerImage,

View File

@@ -47,6 +47,7 @@ func init() {
}
func newTestKubelet(t *testing.T) (*Kubelet, *tools.FakeEtcdClient, *dockertools.FakeDockerClient) {
// TODO: get rid of fakeEtcdClient and return value.
fakeEtcdClient := tools.NewFakeEtcdClient(t)
fakeDocker := &dockertools.FakeDockerClient{
RemovedImages: util.StringSet{},
@@ -55,10 +56,11 @@ func newTestKubelet(t *testing.T) (*Kubelet, *tools.FakeEtcdClient, *dockertools
kubelet := &Kubelet{}
kubelet.dockerClient = fakeDocker
kubelet.dockerPuller = &dockertools.FakeDockerPuller{}
kubelet.etcdClient = fakeEtcdClient
kubelet.rootDirectory = "/tmp/kubelet"
kubelet.podWorkers = newPodWorkers()
kubelet.sourceReady = func(source string) bool { return true }
kubelet.masterServiceNamespace = api.NamespaceDefault
kubelet.serviceLister = testServiceLister{}
return kubelet, fakeEtcdClient, fakeDocker
}

View File

@@ -229,10 +229,14 @@ func makePodSourceConfig(kc *KubeletConfig) *config.PodConfig {
glog.Infof("Adding manifest url: %v", kc.ManifestURL)
config.NewSourceURL(kc.ManifestURL, kc.HttpCheckFrequency, cfg.Channel(kubelet.HTTPSource))
}
if !reflect.ValueOf(kc.EtcdClient).IsNil() {
if kc.EtcdClient != nil && !reflect.ValueOf(kc.EtcdClient).IsNil() {
glog.Infof("Watching for etcd configs at %v", kc.EtcdClient.GetCluster())
config.NewSourceEtcd(config.EtcdKeyForHost(kc.Hostname), kc.EtcdClient, cfg.Channel(kubelet.EtcdSource))
}
if kc.KubeClient != nil && !reflect.ValueOf(kc.KubeClient).IsNil() {
glog.Infof("Watching apiserver")
config.NewSourceApiserver(kc.KubeClient, kc.Hostname, cfg.Channel(kubelet.ApiserverSource))
}
return cfg
}