From de75a42cb23ad986765924ea166da386f6d2d5d3 Mon Sep 17 00:00:00 2001 From: Yu-Ju Hong Date: Tue, 23 Jun 2015 16:01:12 -0700 Subject: [PATCH 1/2] Add a kubelet /runningpods endpoint /runningpods returns a list of pods currently running on the kubelet. The list is composed by examining the container runtime, and may be different from the desired pods to run known by kubelet. This is useful for tests to verify that pods are indeed deleted on nodes. --- pkg/kubelet/container/runtime.go | 18 ++++++++++++++++ pkg/kubelet/kubelet.go | 17 ++++++++++++++++ pkg/kubelet/server.go | 35 ++++++++++++++++++++++++++++---- pkg/kubelet/server_test.go | 5 +++++ 4 files changed, 71 insertions(+), 4 deletions(-) diff --git a/pkg/kubelet/container/runtime.go b/pkg/kubelet/container/runtime.go index c0eeaea422c..3403f401c71 100644 --- a/pkg/kubelet/container/runtime.go +++ b/pkg/kubelet/container/runtime.go @@ -284,6 +284,24 @@ func (p *Pod) FindContainerByName(containerName string) *Container { return nil } +// ToAPIPod converts Pod to api.Pod. Note that if a field in api.Pod has no +// corresponding field in Pod, the field would not be populated. +func (p *Pod) ToAPIPod() *api.Pod { + var pod api.Pod + pod.UID = p.ID + pod.Name = p.Name + pod.Namespace = p.Namespace + pod.Status = p.Status + + for _, c := range p.Containers { + var container api.Container + container.Name = c.Name + container.Image = c.Image + pod.Spec.Containers = append(pod.Spec.Containers, container) + } + return &pod +} + // IsEmpty returns true if the pod is empty. func (p *Pod) IsEmpty() bool { return reflect.DeepEqual(p, &Pod{}) diff --git a/pkg/kubelet/kubelet.go b/pkg/kubelet/kubelet.go index 4b9e91890eb..4b6411df78e 100644 --- a/pkg/kubelet/kubelet.go +++ b/pkg/kubelet/kubelet.go @@ -1850,6 +1850,23 @@ func (kl *Kubelet) GetPods() []*api.Pod { return kl.podManager.GetPods() } +// GetRunningPods returns all pods running on kubelet from looking at the +// container runtime cache. This function converts kubecontainer.Pod to +// api.Pod, so only the fields that exist in both kubecontainer.Pod and +// api.Pod are considered meaningful. +func (kl *Kubelet) GetRunningPods() ([]*api.Pod, error) { + pods, err := kl.runtimeCache.GetPods() + if err != nil { + return nil, err + } + + apiPods := make([]*api.Pod, 0, len(pods)) + for _, pod := range pods { + apiPods = append(apiPods, pod.ToAPIPod()) + } + return apiPods, nil +} + func (kl *Kubelet) GetPodByFullName(podFullName string) (*api.Pod, bool) { return kl.podManager.GetPodByFullName(podFullName) } diff --git a/pkg/kubelet/server.go b/pkg/kubelet/server.go index 32d0c3a7dd3..7d76a62d081 100644 --- a/pkg/kubelet/server.go +++ b/pkg/kubelet/server.go @@ -101,6 +101,7 @@ type HostInterface interface { GetRawContainerInfo(containerName string, req *cadvisorApi.ContainerInfoRequest, subcontainers bool) (map[string]*cadvisorApi.ContainerInfo, error) GetCachedMachineInfo() (*cadvisorApi.MachineInfo, error) GetPods() []*api.Pod + GetRunningPods() ([]*api.Pod, error) GetPodByName(namespace, name string) (*api.Pod, bool) RunInContainer(name string, uid types.UID, container string, cmd []string) ([]byte, error) ExecInContainer(name string, uid types.UID, container string, cmd []string, in io.Reader, out, err io.WriteCloser, tty bool) error @@ -148,6 +149,8 @@ func (s *Server) InstallDebuggingHandlers() { s.mux.HandleFunc("/logs/", s.handleLogs) s.mux.HandleFunc("/containerLogs/", s.handleContainerLogs) s.mux.Handle("/metrics", prometheus.Handler()) + // The /runningpods endpoint is used for testing only. + s.mux.HandleFunc("/runningpods", s.handleRunningPods) s.mux.HandleFunc("/debug/pprof/", pprof.Index) s.mux.HandleFunc("/debug/pprof/profile", pprof.Profile) @@ -280,14 +283,38 @@ func (s *Server) handleContainerLogs(w http.ResponseWriter, req *http.Request) { } } -// handlePods returns a list of pod bound to the Kubelet and their spec -func (s *Server) handlePods(w http.ResponseWriter, req *http.Request) { - pods := s.host.GetPods() +// encodePods creates an api.PodList object from pods and returns the encoded +// PodList. +func encodePods(pods []*api.Pod) (data []byte, err error) { podList := new(api.PodList) for _, pod := range pods { podList.Items = append(podList.Items, *pod) } - data, err := latest.Codec.Encode(podList) + return latest.Codec.Encode(podList) +} + +// handlePods returns a list of pods bound to the Kubelet and their spec. +func (s *Server) handlePods(w http.ResponseWriter, req *http.Request) { + pods := s.host.GetPods() + data, err := encodePods(pods) + if err != nil { + s.error(w, err) + return + } + w.Header().Add("Content-type", "application/json") + w.Write(data) +} + +// handleRunningPods returns a list of pods running on Kubelet. The list is +// provided by the container runtime, and is different from the list returned +// by handlePods, which is a set of desired pods to run. +func (s *Server) handleRunningPods(w http.ResponseWriter, req *http.Request) { + pods, err := s.host.GetRunningPods() + if err != nil { + s.error(w, err) + return + } + data, err := encodePods(pods) if err != nil { s.error(w, err) return diff --git a/pkg/kubelet/server_test.go b/pkg/kubelet/server_test.go index 51de9625a75..39c7a4eaa77 100644 --- a/pkg/kubelet/server_test.go +++ b/pkg/kubelet/server_test.go @@ -47,6 +47,7 @@ type fakeKubelet struct { rawInfoFunc func(query *cadvisorApi.ContainerInfoRequest) (map[string]*cadvisorApi.ContainerInfo, error) machineInfoFunc func() (*cadvisorApi.MachineInfo, error) podsFunc func() []*api.Pod + runningPodsFunc func() ([]*api.Pod, error) logFunc func(w http.ResponseWriter, req *http.Request) runFunc func(podFullName string, uid types.UID, containerName string, cmd []string) ([]byte, error) containerVersionFunc func() (kubecontainer.Version, error) @@ -91,6 +92,10 @@ func (fk *fakeKubelet) GetPods() []*api.Pod { return fk.podsFunc() } +func (fk *fakeKubelet) GetRunningPods() ([]*api.Pod, error) { + return fk.runningPodsFunc() +} + func (fk *fakeKubelet) ServeLogs(w http.ResponseWriter, req *http.Request) { fk.logFunc(w, req) } From a10a6a296e4efa90483febce5f6147b620e84e7b Mon Sep 17 00:00:00 2001 From: Yu-Ju Hong Date: Wed, 24 Jun 2015 13:00:56 -0700 Subject: [PATCH 2/2] Add an e2e test to verify that pods are deleted on nodes The test verifies that kubelet deletes the pods/containers within a reasonable time. It queries the kubelet /runningpods endpoint to retrieve a list of running pods directly. The outline of the test is: - Create an RC - Wait until all pods are running (based on the pod status) - Verify pods are running by querying the /runningpods - Delete the RC - Check all pods are deleted by querying /runningpods --- test/e2e/kubelet.go | 152 ++++++++++++++++++++++++++++++++++++++ test/e2e/kubelet_stats.go | 28 +++++-- 2 files changed, 174 insertions(+), 6 deletions(-) create mode 100644 test/e2e/kubelet.go diff --git a/test/e2e/kubelet.go b/test/e2e/kubelet.go new file mode 100644 index 00000000000..ddaa35e16f2 --- /dev/null +++ b/test/e2e/kubelet.go @@ -0,0 +1,152 @@ +/* +Copyright 2015 The Kubernetes Authors All rights reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package e2e + +import ( + "fmt" + "strings" + "time" + + "github.com/GoogleCloudPlatform/kubernetes/pkg/client" + "github.com/GoogleCloudPlatform/kubernetes/pkg/fields" + "github.com/GoogleCloudPlatform/kubernetes/pkg/labels" + "github.com/GoogleCloudPlatform/kubernetes/pkg/util" + "github.com/GoogleCloudPlatform/kubernetes/pkg/util/wait" + + . "github.com/onsi/ginkgo" + . "github.com/onsi/gomega" +) + +const ( + // Interval to poll /runningpods on a node + pollInterval = 1 * time.Second +) + +// getPodMatches returns a set of pod names on the given node that matches the +// podNamePrefix and namespace. +func getPodMatches(c *client.Client, nodeName string, podNamePrefix string, namespace string) util.StringSet { + matches := util.NewStringSet() + Logf("Checking pods on node %v via /runningpods endpoint", nodeName) + runningPods, err := GetKubeletPods(c, nodeName) + if err != nil { + Logf("Error checking running pods on %v: %v", nodeName, err) + return matches + } + for _, pod := range runningPods.Items { + if pod.Namespace == namespace && strings.HasPrefix(pod.Name, podNamePrefix) { + matches.Insert(pod.Name) + } + } + return matches +} + +// waitTillNPodsRunningOnNodes polls the /runningpods endpoint on kubelet until +// it finds targetNumPods pods that match the given criteria (namespace and +// podNamePrefix). Note that we usually use label selector to filter pods that +// belong to the same RC. However, we use podNamePrefix with namespace here +// because pods returned from /runningpods do not contain the original label +// information; they are reconstructed by examining the container runtime. In +// the scope of this test, we do not expect pod naming conflicts so +// podNamePrefix should be sufficient to identify the pods. +func waitTillNPodsRunningOnNodes(c *client.Client, nodeNames util.StringSet, podNamePrefix string, namespace string, targetNumPods int, timeout time.Duration) error { + return wait.Poll(pollInterval, timeout, func() (bool, error) { + matchCh := make(chan util.StringSet, len(nodeNames)) + for _, item := range nodeNames.List() { + // Launch a goroutine per node to check the pods running on the nodes. + nodeName := item + go func() { + matchCh <- getPodMatches(c, nodeName, podNamePrefix, namespace) + }() + } + + seen := util.NewStringSet() + for i := 0; i < len(nodeNames.List()); i++ { + seen = seen.Union(<-matchCh) + } + if seen.Len() == targetNumPods { + return true, nil + } + Logf("Waiting for %d pods to be running on the node; %d are currently running;", targetNumPods, seen.Len()) + return false, nil + }) +} + +var _ = Describe("Clean up pods on node", func() { + var numNodes int + var nodeNames util.StringSet + framework := NewFramework("kubelet-delete") + + BeforeEach(func() { + nodes, err := framework.Client.Nodes().List(labels.Everything(), fields.Everything()) + expectNoError(err) + numNodes = len(nodes.Items) + nodeNames = util.NewStringSet() + for _, node := range nodes.Items { + nodeNames.Insert(node.Name) + } + }) + + type DeleteTest struct { + podsPerNode int + timeout time.Duration + } + + deleteTests := []DeleteTest{ + {podsPerNode: 10, timeout: 1 * time.Minute}, + } + + for _, itArg := range deleteTests { + name := fmt.Sprintf( + "kubelet should be able to delete %d pods per node in %v.", itArg.podsPerNode, itArg.timeout) + It(name, func() { + totalPods := itArg.podsPerNode * numNodes + + By(fmt.Sprintf("Creating a RC of %d pods and wait until all pods of this RC are running", totalPods)) + rcName := fmt.Sprintf("cleanup%d-%s", totalPods, string(util.NewUUID())) + + Expect(RunRC(RCConfig{ + Client: framework.Client, + Name: rcName, + Namespace: framework.Namespace.Name, + Image: "gcr.io/google_containers/pause:go", + Replicas: totalPods, + })).NotTo(HaveOccurred()) + + // Perform a sanity check so that we know all desired pods are + // running on the nodes according to kubelet. The timeout is set to + // only 30 seconds here because RunRC already waited for all pods to + // transition to the running status. + Expect(waitTillNPodsRunningOnNodes(framework.Client, nodeNames, rcName, framework.Namespace.Name, totalPods, + time.Second*30)).NotTo(HaveOccurred()) + + By("Deleting the RC") + DeleteRC(framework.Client, framework.Namespace.Name, rcName) + // Check that the pods really are gone by querying /runningpods on the + // node. The /runningpods handler checks the container runtime (or its + // cache) and returns a list of running pods. Some possible causes of + // failures are: + // - kubelet deadlock + // - a bug in graceful termination (if it is enabled) + // - docker slow to delete pods (or resource problems causing slowness) + start := time.Now() + Expect(waitTillNPodsRunningOnNodes(framework.Client, nodeNames, rcName, framework.Namespace.Name, 0, + itArg.timeout)).NotTo(HaveOccurred()) + Logf("Deleting %d pods on %d nodes completed in %v after the RC was deleted", totalPods, len(nodeNames), + time.Since(start)) + }) + } +}) diff --git a/test/e2e/kubelet_stats.go b/test/e2e/kubelet_stats.go index 049a6c877b7..a973540f475 100644 --- a/test/e2e/kubelet_stats.go +++ b/test/e2e/kubelet_stats.go @@ -25,6 +25,7 @@ import ( "strings" "time" + "github.com/GoogleCloudPlatform/kubernetes/pkg/api" "github.com/GoogleCloudPlatform/kubernetes/pkg/client" "github.com/GoogleCloudPlatform/kubernetes/pkg/kubelet/metrics" "github.com/GoogleCloudPlatform/kubernetes/pkg/master/ports" @@ -171,15 +172,19 @@ func HighLatencyKubeletOperations(c *client.Client, threshold time.Duration, nod return badMetrics, nil } -// Retrieve metrics from the kubelet server of the given node. -func getKubeletMetricsThroughProxy(c *client.Client, node string) (string, error) { - metric, err := c.Get(). +// Performs a get on a node proxy endpoint given the nodename and rest client. +func nodeProxyRequest(c *client.Client, node, endpoint string) client.Result { + return c.Get(). Prefix("proxy"). Resource("nodes"). Name(fmt.Sprintf("%v:%v", node, ports.KubeletPort)). - Suffix("metrics"). - Do(). - Raw() + Suffix(endpoint). + Do() +} + +// Retrieve metrics from the kubelet server of the given node. +func getKubeletMetricsThroughProxy(c *client.Client, node string) (string, error) { + metric, err := nodeProxyRequest(c, node, "metrics").Raw() if err != nil { return "", err } @@ -200,3 +205,14 @@ func getKubeletMetricsThroughNode(nodeName string) (string, error) { } return string(body), nil } + +// GetKubeletPods retrieves the list of running pods on the kubelet. The pods +// includes necessary information (e.g., UID, name, namespace for +// pods/containers), but do not contain the full spec. +func GetKubeletPods(c *client.Client, node string) (*api.PodList, error) { + result := &api.PodList{} + if err := nodeProxyRequest(c, node, "runningpods").Into(result); err != nil { + return &api.PodList{}, err + } + return result, nil +}