From b759f67ee3ba14b71fdb23e3eca84278a0d79d3d Mon Sep 17 00:00:00 2001 From: Eric Tune Date: Fri, 21 Nov 2014 13:14:30 -0800 Subject: [PATCH 1/2] Kublet watches Pods. Added a kubelet config source for watching pods on apiserver. The pods are converted to boundpods for merging with other config sources. The preferred way to create a kubelet is now to pass an apiserver client but not an etcd client. Changed cmd/integration to use apiserver to talk to kubelets. And cmd/kubernetes. Unit, integration, and e2e tests pass, except for a failure of the pd e2e test which was unrelated. --- cmd/integration/integration.go | 4 ++-- cmd/kubelet/kubelet.go | 20 ++++++++++++++++++-- cmd/kubernetes/kubernetes.go | 2 +- pkg/kubelet/kubelet.go | 1 + pkg/kubelet/kubelet_test.go | 4 +++- pkg/standalone/standalone.go | 6 +++++- 6 files changed, 30 insertions(+), 7 deletions(-) diff --git a/cmd/integration/integration.go b/cmd/integration/integration.go index d3377d07fc0..484fe02ca02 100644 --- a/cmd/integration/integration.go +++ b/cmd/integration/integration.go @@ -192,13 +192,13 @@ func startComponents(manifestURL string) (apiServerURL string) { // Kubelet (localhost) testRootDir := makeTempDirOrDie("kubelet_integ_1.") glog.Infof("Using %s as root dir for kubelet #1", testRootDir) - standalone.SimpleRunKubelet(cl, etcdClient, &fakeDocker1, machineList[0], testRootDir, manifestURL, "127.0.0.1", 10250, api.NamespaceDefault) + standalone.SimpleRunKubelet(cl, nil, &fakeDocker1, machineList[0], testRootDir, manifestURL, "127.0.0.1", 10250, api.NamespaceDefault) // Kubelet (machine) // Create a second kubelet so that the guestbook example's two redis slaves both // have a place they can schedule. testRootDir = makeTempDirOrDie("kubelet_integ_2.") glog.Infof("Using %s as root dir for kubelet #2", testRootDir) - standalone.SimpleRunKubelet(cl, etcdClient, &fakeDocker2, machineList[1], testRootDir, "", "127.0.0.1", 10251, api.NamespaceDefault) + standalone.SimpleRunKubelet(cl, nil, &fakeDocker2, machineList[1], testRootDir, "", "127.0.0.1", 10251, api.NamespaceDefault) return apiServer.URL } diff --git a/cmd/kubelet/kubelet.go b/cmd/kubelet/kubelet.go index d9385d9bb70..769653f7f7a 100644 --- a/cmd/kubelet/kubelet.go +++ b/cmd/kubelet/kubelet.go @@ -56,7 +56,7 @@ var ( allowPrivileged = flag.Bool("allow_privileged", false, "If true, allow containers to request privileged mode. [default=false]") registryPullQPS = flag.Float64("registry_qps", 0.0, "If > 0, limit registry pull QPS to this value. If 0, unlimited. [default=0.0]") registryBurst = flag.Int("registry_burst", 10, "Maximum size of a bursty pulls, temporarily allows pulls to burst to this number, while still not exceeding registry_qps. Only used if --registry_qps > 0") - runonce = flag.Bool("runonce", false, "If true, exit after spawning pods from local manifests or remote urls. Exclusive with --etcd_servers and --enable-server") + runonce = flag.Bool("runonce", false, "If true, exit after spawning pods from local manifests or remote urls. Exclusive with --etcd_servers, --api_servers, and --enable-server") enableDebuggingHandlers = flag.Bool("enable_debugging_handlers", true, "Enables server endpoints for log collection and local running of containers and commands") minimumGCAge = flag.Duration("minimum_container_ttl_duration", 1*time.Minute, "Minimum age for a finished container before it is garbage collected. Examples: '300ms', '10s' or '2h45m'") maxContainerCount = flag.Int("maximum_dead_containers_per_container", 5, "Maximum number of old instances of a container to retain per container. Each container takes up some disk space. Default: 5.") @@ -72,15 +72,19 @@ var ( func init() { flag.Var(&etcdServerList, "etcd_servers", "List of etcd servers to watch (http://ip:port), comma separated. Mutually exclusive with -etcd_config") flag.Var(&address, "address", "The IP address for the info server to serve on (set to 0.0.0.0 for all interfaces)") - flag.Var(&apiServerList, "api_servers", "List of Kubernetes API servers to publish events to. (ip:port), comma separated.") + flag.Var(&apiServerList, "api_servers", "List of Kubernetes API servers for publishing events, and reading pods and services. (ip:port), comma separated.") flag.Var(&clusterDNS, "cluster_dns", "IP address for a cluster DNS server. If set, kubelet will configure all containers to use this for DNS resolution in addition to the host's DNS servers") } func setupRunOnce() { if *runonce { + // Don't use remote (etcd or apiserver) sources if len(etcdServerList) > 0 { glog.Fatalf("invalid option: --runonce and --etcd_servers are mutually exclusive") } + if len(apiServerList) > 0 { + glog.Fatalf("invalid option: --runonce and --api_servers are mutually exclusive") + } if *enableServer { glog.Infof("--runonce is set, disabling server") *enableServer = false @@ -96,6 +100,18 @@ func main() { verflag.PrintAndExitIfRequested() + // Cluster creation scripts support both kubernetes versions that 1) support kublet watching + // apiserver for pods, and 2) ones that don't. So they ca set both --etcd_servers and + // --api_servers. The current code will ignore the --etcd_servers flag, while older kubelet + // code will use the --etd_servers flag for pods, and use --api_servers for event publising. + // + // TODO(erictune): convert all cloud provider scripts and Google Container Engine to + // use only --api_servers, then delete --etcd_servers flag and the resulting dead code. + if len(etcdServerList) > 0 && len(apiServerList) > 0 { + glog.Infof("Both --etcd_servers and --api_servers are set. Not using etcd source.") + etcdServerList = util.StringList{} + } + setupRunOnce() if err := util.ApplyOomScoreAdj(*oomScoreAdj); err != nil { diff --git a/cmd/kubernetes/kubernetes.go b/cmd/kubernetes/kubernetes.go index 44ccaaa34ed..61393260d15 100644 --- a/cmd/kubernetes/kubernetes.go +++ b/cmd/kubernetes/kubernetes.go @@ -54,7 +54,7 @@ func startComponents(etcdClient tools.EtcdClient, cl *client.Client, addr string standalone.RunControllerManager(machineList, cl, *nodeMilliCPU, *nodeMemory) dockerClient := util.ConnectToDockerOrDie(*dockerEndpoint) - standalone.SimpleRunKubelet(cl, etcdClient, dockerClient, machineList[0], "/tmp/kubernetes", "", "127.0.0.1", 10250, *masterServiceNamespace) + standalone.SimpleRunKubelet(cl, nil, dockerClient, machineList[0], "/tmp/kubernetes", "", "127.0.0.1", 10250, *masterServiceNamespace) } func newApiClient(addr string, port int) *client.Client { diff --git a/pkg/kubelet/kubelet.go b/pkg/kubelet/kubelet.go index 1f3c0c1c4e3..9014b1aee3c 100644 --- a/pkg/kubelet/kubelet.go +++ b/pkg/kubelet/kubelet.go @@ -99,6 +99,7 @@ func NewMainKubelet( hostname: hostname, dockerClient: dockerClient, etcdClient: etcdClient, + kubeClient: kubeClient, rootDirectory: rootDirectory, resyncInterval: resyncInterval, networkContainerImage: networkContainerImage, diff --git a/pkg/kubelet/kubelet_test.go b/pkg/kubelet/kubelet_test.go index ce2101c4a1d..82bb1fdbd8d 100644 --- a/pkg/kubelet/kubelet_test.go +++ b/pkg/kubelet/kubelet_test.go @@ -46,6 +46,7 @@ func init() { } func newTestKubelet(t *testing.T) (*Kubelet, *tools.FakeEtcdClient, *dockertools.FakeDockerClient) { + // TODO: get rid of fakeEtcdClient and return value. fakeEtcdClient := tools.NewFakeEtcdClient(t) fakeDocker := &dockertools.FakeDockerClient{ RemovedImages: util.StringSet{}, @@ -54,10 +55,11 @@ func newTestKubelet(t *testing.T) (*Kubelet, *tools.FakeEtcdClient, *dockertools kubelet := &Kubelet{} kubelet.dockerClient = fakeDocker kubelet.dockerPuller = &dockertools.FakeDockerPuller{} - kubelet.etcdClient = fakeEtcdClient kubelet.rootDirectory = "/tmp/kubelet" kubelet.podWorkers = newPodWorkers() kubelet.sourceReady = func(source string) bool { return true } + kubelet.masterServiceNamespace = api.NamespaceDefault + kubelet.serviceLister = testServiceLister{} return kubelet, fakeEtcdClient, fakeDocker } diff --git a/pkg/standalone/standalone.go b/pkg/standalone/standalone.go index 36da0e6deac..b3d7218dcb3 100644 --- a/pkg/standalone/standalone.go +++ b/pkg/standalone/standalone.go @@ -229,10 +229,14 @@ func makePodSourceConfig(kc *KubeletConfig) *config.PodConfig { glog.Infof("Adding manifest url: %v", kc.ManifestURL) config.NewSourceURL(kc.ManifestURL, kc.HttpCheckFrequency, cfg.Channel(kubelet.HTTPSource)) } - if !reflect.ValueOf(kc.EtcdClient).IsNil() { + if kc.EtcdClient != nil && !reflect.ValueOf(kc.EtcdClient).IsNil() { glog.Infof("Watching for etcd configs at %v", kc.EtcdClient.GetCluster()) config.NewSourceEtcd(config.EtcdKeyForHost(kc.Hostname), kc.EtcdClient, cfg.Channel(kubelet.EtcdSource)) } + if kc.KubeClient != nil && !reflect.ValueOf(kc.KubeClient).IsNil() { + glog.Infof("Watching apiserver") + config.NewSourceApiserver(kc.KubeClient, kc.Hostname, cfg.Channel(kubelet.ApiserverSource)) + } return cfg } From 2e002b10956529c1445c8d9389276dc9b040bcf1 Mon Sep 17 00:00:00 2001 From: Eric Tune Date: Thu, 15 Jan 2015 13:36:49 -0800 Subject: [PATCH 2/2] Add second pod to test. --- pkg/kubelet/config/apiserver_test.go | 61 ++++++++++++++++++++++------ 1 file changed, 49 insertions(+), 12 deletions(-) diff --git a/pkg/kubelet/config/apiserver_test.go b/pkg/kubelet/config/apiserver_test.go index c57167b0047..936ff22aaec 100644 --- a/pkg/kubelet/config/apiserver_test.go +++ b/pkg/kubelet/config/apiserver_test.go @@ -43,25 +43,31 @@ func (lw fakePodLW) Watch(resourceVersion string) (watch.Interface, error) { var _ cache.ListerWatcher = fakePodLW{} -func TestNewSourceApiserver(t *testing.T) { - podv1 := api.Pod{ +func TestNewSourceApiserver_UpdatesAndMultiplePods(t *testing.T) { + pod1v1 := api.Pod{ ObjectMeta: api.ObjectMeta{Name: "p"}, Spec: api.PodSpec{Containers: []api.Container{{Image: "image/one"}}}} - podv2 := api.Pod{ + pod1v2 := api.Pod{ ObjectMeta: api.ObjectMeta{Name: "p"}, Spec: api.PodSpec{Containers: []api.Container{{Image: "image/two"}}}} + pod2 := api.Pod{ + ObjectMeta: api.ObjectMeta{Name: "q"}, + Spec: api.PodSpec{Containers: []api.Container{{Image: "image/blah"}}}} - expectedBoundPodv1 := api.BoundPod{ + expectedBoundPod1v1 := api.BoundPod{ ObjectMeta: api.ObjectMeta{Name: "p", SelfLink: "/api/v1beta1/boundPods/p"}, Spec: api.PodSpec{Containers: []api.Container{{Image: "image/one"}}}} - expectedBoundPodv2 := api.BoundPod{ + expectedBoundPod1v2 := api.BoundPod{ ObjectMeta: api.ObjectMeta{Name: "p", SelfLink: "/api/v1beta1/boundPods/p"}, Spec: api.PodSpec{Containers: []api.Container{{Image: "image/two"}}}} + expectedBoundPod2 := api.BoundPod{ + ObjectMeta: api.ObjectMeta{Name: "q", SelfLink: "/api/v1beta1/boundPods/q"}, + Spec: api.PodSpec{Containers: []api.Container{{Image: "image/blah"}}}} // Setup fake api client. fakeWatch := watch.NewFake() lw := fakePodLW{ - listResp: &api.PodList{Items: []api.Pod{podv1}}, + listResp: &api.PodList{Items: []api.Pod{pod1v1}}, watchResp: fakeWatch, } @@ -74,23 +80,54 @@ func TestNewSourceApiserver(t *testing.T) { t.Errorf("Unable to read from channel when expected") } update := got.(kubelet.PodUpdate) - expected := CreatePodUpdate(kubelet.SET, kubelet.ApiserverSource, expectedBoundPodv1) + expected := CreatePodUpdate(kubelet.SET, kubelet.ApiserverSource, expectedBoundPod1v1) if !api.Semantic.DeepEqual(expected, update) { t.Errorf("Expected %#v; Got %#v", expected, update) } - fakeWatch.Modify(&podv2) + // Add another pod + fakeWatch.Add(&pod2) got, ok = <-ch if !ok { t.Errorf("Unable to read from channel when expected") } update = got.(kubelet.PodUpdate) - expected = CreatePodUpdate(kubelet.SET, kubelet.ApiserverSource, expectedBoundPodv2) - if !api.Semantic.DeepEqual(expected, update) { - t.Fatalf("Expected %#v, Got %#v", expected, update) + // Could be sorted either of these two ways: + expectedA := CreatePodUpdate(kubelet.SET, kubelet.ApiserverSource, expectedBoundPod1v1, expectedBoundPod2) + expectedB := CreatePodUpdate(kubelet.SET, kubelet.ApiserverSource, expectedBoundPod2, expectedBoundPod1v1) + + if !api.Semantic.DeepEqual(expectedA, update) && !api.Semantic.DeepEqual(expectedB, update) { + t.Errorf("Expected %#v or %#v, Got %#v", expectedA, expectedB, update) } - fakeWatch.Delete(&podv2) + // Modify pod1 + fakeWatch.Modify(&pod1v2) + got, ok = <-ch + if !ok { + t.Errorf("Unable to read from channel when expected") + } + update = got.(kubelet.PodUpdate) + expectedA = CreatePodUpdate(kubelet.SET, kubelet.ApiserverSource, expectedBoundPod1v2, expectedBoundPod2) + expectedB = CreatePodUpdate(kubelet.SET, kubelet.ApiserverSource, expectedBoundPod2, expectedBoundPod1v2) + + if !api.Semantic.DeepEqual(expectedA, update) && !api.Semantic.DeepEqual(expectedB, update) { + t.Errorf("Expected %#v or %#v, Got %#v", expectedA, expectedB, update) + } + + // Delete pod1 + fakeWatch.Delete(&pod1v2) + got, ok = <-ch + if !ok { + t.Errorf("Unable to read from channel when expected") + } + update = got.(kubelet.PodUpdate) + expected = CreatePodUpdate(kubelet.SET, kubelet.ApiserverSource, expectedBoundPod2) + if !api.Semantic.DeepEqual(expected, update) { + t.Errorf("Expected %#v, Got %#v", expected, update) + } + + // Delete pod2 + fakeWatch.Delete(&pod2) got, ok = <-ch if !ok { t.Errorf("Unable to read from channel when expected")