diff --git a/pkg/kubelet/container/runtime.go b/pkg/kubelet/container/runtime.go index c0eeaea422c..3403f401c71 100644 --- a/pkg/kubelet/container/runtime.go +++ b/pkg/kubelet/container/runtime.go @@ -284,6 +284,24 @@ func (p *Pod) FindContainerByName(containerName string) *Container { return nil } +// ToAPIPod converts Pod to api.Pod. Note that if a field in api.Pod has no +// corresponding field in Pod, the field would not be populated. +func (p *Pod) ToAPIPod() *api.Pod { + var pod api.Pod + pod.UID = p.ID + pod.Name = p.Name + pod.Namespace = p.Namespace + pod.Status = p.Status + + for _, c := range p.Containers { + var container api.Container + container.Name = c.Name + container.Image = c.Image + pod.Spec.Containers = append(pod.Spec.Containers, container) + } + return &pod +} + // IsEmpty returns true if the pod is empty. func (p *Pod) IsEmpty() bool { return reflect.DeepEqual(p, &Pod{}) diff --git a/pkg/kubelet/kubelet.go b/pkg/kubelet/kubelet.go index e56c76ac729..6dc4bbfb1ac 100644 --- a/pkg/kubelet/kubelet.go +++ b/pkg/kubelet/kubelet.go @@ -1851,6 +1851,23 @@ func (kl *Kubelet) GetPods() []*api.Pod { return kl.podManager.GetPods() } +// GetRunningPods returns all pods running on kubelet from looking at the +// container runtime cache. This function converts kubecontainer.Pod to +// api.Pod, so only the fields that exist in both kubecontainer.Pod and +// api.Pod are considered meaningful. +func (kl *Kubelet) GetRunningPods() ([]*api.Pod, error) { + pods, err := kl.runtimeCache.GetPods() + if err != nil { + return nil, err + } + + apiPods := make([]*api.Pod, 0, len(pods)) + for _, pod := range pods { + apiPods = append(apiPods, pod.ToAPIPod()) + } + return apiPods, nil +} + func (kl *Kubelet) GetPodByFullName(podFullName string) (*api.Pod, bool) { return kl.podManager.GetPodByFullName(podFullName) } diff --git a/pkg/kubelet/server.go b/pkg/kubelet/server.go index 32d0c3a7dd3..7d76a62d081 100644 --- a/pkg/kubelet/server.go +++ b/pkg/kubelet/server.go @@ -101,6 +101,7 @@ type HostInterface interface { GetRawContainerInfo(containerName string, req *cadvisorApi.ContainerInfoRequest, subcontainers bool) (map[string]*cadvisorApi.ContainerInfo, error) GetCachedMachineInfo() (*cadvisorApi.MachineInfo, error) GetPods() []*api.Pod + GetRunningPods() ([]*api.Pod, error) GetPodByName(namespace, name string) (*api.Pod, bool) RunInContainer(name string, uid types.UID, container string, cmd []string) ([]byte, error) ExecInContainer(name string, uid types.UID, container string, cmd []string, in io.Reader, out, err io.WriteCloser, tty bool) error @@ -148,6 +149,8 @@ func (s *Server) InstallDebuggingHandlers() { s.mux.HandleFunc("/logs/", s.handleLogs) s.mux.HandleFunc("/containerLogs/", s.handleContainerLogs) s.mux.Handle("/metrics", prometheus.Handler()) + // The /runningpods endpoint is used for testing only. + s.mux.HandleFunc("/runningpods", s.handleRunningPods) s.mux.HandleFunc("/debug/pprof/", pprof.Index) s.mux.HandleFunc("/debug/pprof/profile", pprof.Profile) @@ -280,14 +283,38 @@ func (s *Server) handleContainerLogs(w http.ResponseWriter, req *http.Request) { } } -// handlePods returns a list of pod bound to the Kubelet and their spec -func (s *Server) handlePods(w http.ResponseWriter, req *http.Request) { - pods := s.host.GetPods() +// encodePods creates an api.PodList object from pods and returns the encoded +// PodList. +func encodePods(pods []*api.Pod) (data []byte, err error) { podList := new(api.PodList) for _, pod := range pods { podList.Items = append(podList.Items, *pod) } - data, err := latest.Codec.Encode(podList) + return latest.Codec.Encode(podList) +} + +// handlePods returns a list of pods bound to the Kubelet and their spec. +func (s *Server) handlePods(w http.ResponseWriter, req *http.Request) { + pods := s.host.GetPods() + data, err := encodePods(pods) + if err != nil { + s.error(w, err) + return + } + w.Header().Add("Content-type", "application/json") + w.Write(data) +} + +// handleRunningPods returns a list of pods running on Kubelet. The list is +// provided by the container runtime, and is different from the list returned +// by handlePods, which is a set of desired pods to run. +func (s *Server) handleRunningPods(w http.ResponseWriter, req *http.Request) { + pods, err := s.host.GetRunningPods() + if err != nil { + s.error(w, err) + return + } + data, err := encodePods(pods) if err != nil { s.error(w, err) return diff --git a/pkg/kubelet/server_test.go b/pkg/kubelet/server_test.go index 51de9625a75..39c7a4eaa77 100644 --- a/pkg/kubelet/server_test.go +++ b/pkg/kubelet/server_test.go @@ -47,6 +47,7 @@ type fakeKubelet struct { rawInfoFunc func(query *cadvisorApi.ContainerInfoRequest) (map[string]*cadvisorApi.ContainerInfo, error) machineInfoFunc func() (*cadvisorApi.MachineInfo, error) podsFunc func() []*api.Pod + runningPodsFunc func() ([]*api.Pod, error) logFunc func(w http.ResponseWriter, req *http.Request) runFunc func(podFullName string, uid types.UID, containerName string, cmd []string) ([]byte, error) containerVersionFunc func() (kubecontainer.Version, error) @@ -91,6 +92,10 @@ func (fk *fakeKubelet) GetPods() []*api.Pod { return fk.podsFunc() } +func (fk *fakeKubelet) GetRunningPods() ([]*api.Pod, error) { + return fk.runningPodsFunc() +} + func (fk *fakeKubelet) ServeLogs(w http.ResponseWriter, req *http.Request) { fk.logFunc(w, req) } diff --git a/test/e2e/kubelet.go b/test/e2e/kubelet.go new file mode 100644 index 00000000000..ddaa35e16f2 --- /dev/null +++ b/test/e2e/kubelet.go @@ -0,0 +1,152 @@ +/* +Copyright 2015 The Kubernetes Authors All rights reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package e2e + +import ( + "fmt" + "strings" + "time" + + "github.com/GoogleCloudPlatform/kubernetes/pkg/client" + "github.com/GoogleCloudPlatform/kubernetes/pkg/fields" + "github.com/GoogleCloudPlatform/kubernetes/pkg/labels" + "github.com/GoogleCloudPlatform/kubernetes/pkg/util" + "github.com/GoogleCloudPlatform/kubernetes/pkg/util/wait" + + . "github.com/onsi/ginkgo" + . "github.com/onsi/gomega" +) + +const ( + // Interval to poll /runningpods on a node + pollInterval = 1 * time.Second +) + +// getPodMatches returns a set of pod names on the given node that matches the +// podNamePrefix and namespace. +func getPodMatches(c *client.Client, nodeName string, podNamePrefix string, namespace string) util.StringSet { + matches := util.NewStringSet() + Logf("Checking pods on node %v via /runningpods endpoint", nodeName) + runningPods, err := GetKubeletPods(c, nodeName) + if err != nil { + Logf("Error checking running pods on %v: %v", nodeName, err) + return matches + } + for _, pod := range runningPods.Items { + if pod.Namespace == namespace && strings.HasPrefix(pod.Name, podNamePrefix) { + matches.Insert(pod.Name) + } + } + return matches +} + +// waitTillNPodsRunningOnNodes polls the /runningpods endpoint on kubelet until +// it finds targetNumPods pods that match the given criteria (namespace and +// podNamePrefix). Note that we usually use label selector to filter pods that +// belong to the same RC. However, we use podNamePrefix with namespace here +// because pods returned from /runningpods do not contain the original label +// information; they are reconstructed by examining the container runtime. In +// the scope of this test, we do not expect pod naming conflicts so +// podNamePrefix should be sufficient to identify the pods. +func waitTillNPodsRunningOnNodes(c *client.Client, nodeNames util.StringSet, podNamePrefix string, namespace string, targetNumPods int, timeout time.Duration) error { + return wait.Poll(pollInterval, timeout, func() (bool, error) { + matchCh := make(chan util.StringSet, len(nodeNames)) + for _, item := range nodeNames.List() { + // Launch a goroutine per node to check the pods running on the nodes. + nodeName := item + go func() { + matchCh <- getPodMatches(c, nodeName, podNamePrefix, namespace) + }() + } + + seen := util.NewStringSet() + for i := 0; i < len(nodeNames.List()); i++ { + seen = seen.Union(<-matchCh) + } + if seen.Len() == targetNumPods { + return true, nil + } + Logf("Waiting for %d pods to be running on the node; %d are currently running;", targetNumPods, seen.Len()) + return false, nil + }) +} + +var _ = Describe("Clean up pods on node", func() { + var numNodes int + var nodeNames util.StringSet + framework := NewFramework("kubelet-delete") + + BeforeEach(func() { + nodes, err := framework.Client.Nodes().List(labels.Everything(), fields.Everything()) + expectNoError(err) + numNodes = len(nodes.Items) + nodeNames = util.NewStringSet() + for _, node := range nodes.Items { + nodeNames.Insert(node.Name) + } + }) + + type DeleteTest struct { + podsPerNode int + timeout time.Duration + } + + deleteTests := []DeleteTest{ + {podsPerNode: 10, timeout: 1 * time.Minute}, + } + + for _, itArg := range deleteTests { + name := fmt.Sprintf( + "kubelet should be able to delete %d pods per node in %v.", itArg.podsPerNode, itArg.timeout) + It(name, func() { + totalPods := itArg.podsPerNode * numNodes + + By(fmt.Sprintf("Creating a RC of %d pods and wait until all pods of this RC are running", totalPods)) + rcName := fmt.Sprintf("cleanup%d-%s", totalPods, string(util.NewUUID())) + + Expect(RunRC(RCConfig{ + Client: framework.Client, + Name: rcName, + Namespace: framework.Namespace.Name, + Image: "gcr.io/google_containers/pause:go", + Replicas: totalPods, + })).NotTo(HaveOccurred()) + + // Perform a sanity check so that we know all desired pods are + // running on the nodes according to kubelet. The timeout is set to + // only 30 seconds here because RunRC already waited for all pods to + // transition to the running status. + Expect(waitTillNPodsRunningOnNodes(framework.Client, nodeNames, rcName, framework.Namespace.Name, totalPods, + time.Second*30)).NotTo(HaveOccurred()) + + By("Deleting the RC") + DeleteRC(framework.Client, framework.Namespace.Name, rcName) + // Check that the pods really are gone by querying /runningpods on the + // node. The /runningpods handler checks the container runtime (or its + // cache) and returns a list of running pods. Some possible causes of + // failures are: + // - kubelet deadlock + // - a bug in graceful termination (if it is enabled) + // - docker slow to delete pods (or resource problems causing slowness) + start := time.Now() + Expect(waitTillNPodsRunningOnNodes(framework.Client, nodeNames, rcName, framework.Namespace.Name, 0, + itArg.timeout)).NotTo(HaveOccurred()) + Logf("Deleting %d pods on %d nodes completed in %v after the RC was deleted", totalPods, len(nodeNames), + time.Since(start)) + }) + } +}) diff --git a/test/e2e/kubelet_stats.go b/test/e2e/kubelet_stats.go index 049a6c877b7..a973540f475 100644 --- a/test/e2e/kubelet_stats.go +++ b/test/e2e/kubelet_stats.go @@ -25,6 +25,7 @@ import ( "strings" "time" + "github.com/GoogleCloudPlatform/kubernetes/pkg/api" "github.com/GoogleCloudPlatform/kubernetes/pkg/client" "github.com/GoogleCloudPlatform/kubernetes/pkg/kubelet/metrics" "github.com/GoogleCloudPlatform/kubernetes/pkg/master/ports" @@ -171,15 +172,19 @@ func HighLatencyKubeletOperations(c *client.Client, threshold time.Duration, nod return badMetrics, nil } -// Retrieve metrics from the kubelet server of the given node. -func getKubeletMetricsThroughProxy(c *client.Client, node string) (string, error) { - metric, err := c.Get(). +// Performs a get on a node proxy endpoint given the nodename and rest client. +func nodeProxyRequest(c *client.Client, node, endpoint string) client.Result { + return c.Get(). Prefix("proxy"). Resource("nodes"). Name(fmt.Sprintf("%v:%v", node, ports.KubeletPort)). - Suffix("metrics"). - Do(). - Raw() + Suffix(endpoint). + Do() +} + +// Retrieve metrics from the kubelet server of the given node. +func getKubeletMetricsThroughProxy(c *client.Client, node string) (string, error) { + metric, err := nodeProxyRequest(c, node, "metrics").Raw() if err != nil { return "", err } @@ -200,3 +205,14 @@ func getKubeletMetricsThroughNode(nodeName string) (string, error) { } return string(body), nil } + +// GetKubeletPods retrieves the list of running pods on the kubelet. The pods +// includes necessary information (e.g., UID, name, namespace for +// pods/containers), but do not contain the full spec. +func GetKubeletPods(c *client.Client, node string) (*api.PodList, error) { + result := &api.PodList{} + if err := nodeProxyRequest(c, node, "runningpods").Into(result); err != nil { + return &api.PodList{}, err + } + return result, nil +}