mirror of
https://github.com/k3s-io/kubernetes.git
synced 2025-07-27 05:27:21 +00:00
Merge pull request #22760 from mikedanese/flake-pods
Auto commit by PR queue bot
This commit is contained in:
commit
c9977cc774
@ -46,7 +46,7 @@ type resourceTest struct {
|
|||||||
|
|
||||||
func logPodsOnNodes(c *client.Client, nodeNames []string) {
|
func logPodsOnNodes(c *client.Client, nodeNames []string) {
|
||||||
for _, n := range nodeNames {
|
for _, n := range nodeNames {
|
||||||
podList, err := GetKubeletPods(c, n)
|
podList, err := GetKubeletRunningPods(c, n)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
Logf("Unable to retrieve kubelet pods for node %v", n)
|
Logf("Unable to retrieve kubelet pods for node %v", n)
|
||||||
continue
|
continue
|
||||||
|
@ -32,7 +32,6 @@ import (
|
|||||||
cadvisorapi "github.com/google/cadvisor/info/v1"
|
cadvisorapi "github.com/google/cadvisor/info/v1"
|
||||||
"github.com/prometheus/common/model"
|
"github.com/prometheus/common/model"
|
||||||
"k8s.io/kubernetes/pkg/api"
|
"k8s.io/kubernetes/pkg/api"
|
||||||
"k8s.io/kubernetes/pkg/client/restclient"
|
|
||||||
client "k8s.io/kubernetes/pkg/client/unversioned"
|
client "k8s.io/kubernetes/pkg/client/unversioned"
|
||||||
"k8s.io/kubernetes/pkg/kubelet/metrics"
|
"k8s.io/kubernetes/pkg/kubelet/metrics"
|
||||||
"k8s.io/kubernetes/pkg/kubelet/server/stats"
|
"k8s.io/kubernetes/pkg/kubelet/server/stats"
|
||||||
@ -42,11 +41,6 @@ import (
|
|||||||
"k8s.io/kubernetes/pkg/util/wait"
|
"k8s.io/kubernetes/pkg/util/wait"
|
||||||
)
|
)
|
||||||
|
|
||||||
const (
|
|
||||||
// timeout for proxy requests.
|
|
||||||
proxyTimeout = 2 * time.Minute
|
|
||||||
)
|
|
||||||
|
|
||||||
// KubeletMetric stores metrics scraped from the kubelet server's /metric endpoint.
|
// KubeletMetric stores metrics scraped from the kubelet server's /metric endpoint.
|
||||||
// TODO: Get some more structure around the metrics and this type
|
// TODO: Get some more structure around the metrics and this type
|
||||||
type KubeletMetric struct {
|
type KubeletMetric struct {
|
||||||
@ -342,46 +336,9 @@ type usageDataPerContainer struct {
|
|||||||
memWorkSetData []uint64
|
memWorkSetData []uint64
|
||||||
}
|
}
|
||||||
|
|
||||||
// Performs a get on a node proxy endpoint given the nodename and rest client.
|
|
||||||
func nodeProxyRequest(c *client.Client, node, endpoint string) (restclient.Result, error) {
|
|
||||||
// proxy tends to hang in some cases when Node is not ready. Add an artificial timeout for this call.
|
|
||||||
// This will leak a goroutine if proxy hangs. #22165
|
|
||||||
subResourceProxyAvailable, err := serverVersionGTE(subResourceServiceAndNodeProxyVersion, c)
|
|
||||||
if err != nil {
|
|
||||||
return restclient.Result{}, err
|
|
||||||
}
|
|
||||||
var result restclient.Result
|
|
||||||
finished := make(chan struct{})
|
|
||||||
go func() {
|
|
||||||
if subResourceProxyAvailable {
|
|
||||||
result = c.Get().
|
|
||||||
Resource("nodes").
|
|
||||||
SubResource("proxy").
|
|
||||||
Name(fmt.Sprintf("%v:%v", node, ports.KubeletPort)).
|
|
||||||
Suffix(endpoint).
|
|
||||||
Do()
|
|
||||||
|
|
||||||
} else {
|
|
||||||
result = c.Get().
|
|
||||||
Prefix("proxy").
|
|
||||||
Resource("nodes").
|
|
||||||
Name(fmt.Sprintf("%v:%v", node, ports.KubeletPort)).
|
|
||||||
Suffix(endpoint).
|
|
||||||
Do()
|
|
||||||
}
|
|
||||||
finished <- struct{}{}
|
|
||||||
}()
|
|
||||||
select {
|
|
||||||
case <-finished:
|
|
||||||
return result, nil
|
|
||||||
case <-time.After(proxyTimeout):
|
|
||||||
return restclient.Result{}, nil
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// Retrieve metrics from the kubelet server of the given node.
|
// Retrieve metrics from the kubelet server of the given node.
|
||||||
func getKubeletMetricsThroughProxy(c *client.Client, node string) (string, error) {
|
func getKubeletMetricsThroughProxy(c *client.Client, node string) (string, error) {
|
||||||
client, err := nodeProxyRequest(c, node, "metrics")
|
client, err := NodeProxyRequest(c, node, "metrics")
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return "", err
|
return "", err
|
||||||
}
|
}
|
||||||
@ -408,7 +365,7 @@ func getKubeletMetricsThroughNode(nodeName string) (string, error) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func getKubeletHeapStats(c *client.Client, nodeName string) (string, error) {
|
func getKubeletHeapStats(c *client.Client, nodeName string) (string, error) {
|
||||||
client, err := nodeProxyRequest(c, nodeName, "debug/pprof/heap")
|
client, err := NodeProxyRequest(c, nodeName, "debug/pprof/heap")
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return "", err
|
return "", err
|
||||||
}
|
}
|
||||||
@ -423,31 +380,10 @@ func getKubeletHeapStats(c *client.Client, nodeName string) (string, error) {
|
|||||||
return strings.Join(lines[len(lines)-numLines:], "\n"), nil
|
return strings.Join(lines[len(lines)-numLines:], "\n"), nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// GetKubeletPods retrieves the list of running pods on the kubelet. The pods
|
|
||||||
// includes necessary information (e.g., UID, name, namespace for
|
|
||||||
// pods/containers), but do not contain the full spec.
|
|
||||||
func GetKubeletPods(c *client.Client, node string) (*api.PodList, error) {
|
|
||||||
result := &api.PodList{}
|
|
||||||
client, err := nodeProxyRequest(c, node, "runningpods")
|
|
||||||
if err != nil {
|
|
||||||
return &api.PodList{}, err
|
|
||||||
}
|
|
||||||
if err = client.Into(result); err != nil {
|
|
||||||
return &api.PodList{}, err
|
|
||||||
}
|
|
||||||
return result, nil
|
|
||||||
}
|
|
||||||
|
|
||||||
func PrintAllKubeletPods(c *client.Client, nodeName string) {
|
func PrintAllKubeletPods(c *client.Client, nodeName string) {
|
||||||
result, err := nodeProxyRequest(c, nodeName, "pods")
|
podList, err := GetKubeletPods(c, nodeName)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
Logf("Unable to retrieve kubelet pods for node %v", nodeName)
|
Logf("Unable to retrieve kubelet pods for node %v: %v", nodeName, err)
|
||||||
return
|
|
||||||
}
|
|
||||||
podList := &api.PodList{}
|
|
||||||
err = result.Into(podList)
|
|
||||||
if err != nil {
|
|
||||||
Logf("Unable to cast result to pods for node %v", nodeName)
|
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
for _, p := range podList.Items {
|
for _, p := range podList.Items {
|
||||||
|
@ -351,6 +351,28 @@ var _ = Describe("Pods", func() {
|
|||||||
Failf("Failed to delete pod: %v", err)
|
Failf("Failed to delete pod: %v", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
By("verifying the kubelet observed the termination notice")
|
||||||
|
pod, err = podClient.Get(pod.Name)
|
||||||
|
Expect(wait.Poll(time.Second*5, time.Second*30, func() (bool, error) {
|
||||||
|
podList, err := GetKubeletPods(framework.Client, pod.Spec.NodeName)
|
||||||
|
if err != nil {
|
||||||
|
Logf("Unable to retrieve kubelet pods for node %v: %v", pod.Spec.NodeName, err)
|
||||||
|
return false, nil
|
||||||
|
}
|
||||||
|
for _, kubeletPod := range podList.Items {
|
||||||
|
if pod.Name != kubeletPod.Name {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
if kubeletPod.ObjectMeta.DeletionTimestamp == nil {
|
||||||
|
Logf("deletion has not yet been observed")
|
||||||
|
return false, nil
|
||||||
|
}
|
||||||
|
return true, nil
|
||||||
|
}
|
||||||
|
Logf("no pod exists with the name we were looking for, assuming the termination request was observed and completed")
|
||||||
|
return true, nil
|
||||||
|
})).NotTo(HaveOccurred(), "kubelet never observed the termination notice")
|
||||||
|
|
||||||
By("verifying pod deletion was observed")
|
By("verifying pod deletion was observed")
|
||||||
deleted := false
|
deleted := false
|
||||||
timeout := false
|
timeout := false
|
||||||
|
@ -56,6 +56,7 @@ import (
|
|||||||
"k8s.io/kubernetes/pkg/kubectl"
|
"k8s.io/kubernetes/pkg/kubectl"
|
||||||
"k8s.io/kubernetes/pkg/kubelet/util/format"
|
"k8s.io/kubernetes/pkg/kubelet/util/format"
|
||||||
"k8s.io/kubernetes/pkg/labels"
|
"k8s.io/kubernetes/pkg/labels"
|
||||||
|
"k8s.io/kubernetes/pkg/master/ports"
|
||||||
"k8s.io/kubernetes/pkg/runtime"
|
"k8s.io/kubernetes/pkg/runtime"
|
||||||
sshutil "k8s.io/kubernetes/pkg/ssh"
|
sshutil "k8s.io/kubernetes/pkg/ssh"
|
||||||
"k8s.io/kubernetes/pkg/types"
|
"k8s.io/kubernetes/pkg/types"
|
||||||
@ -3554,6 +3555,70 @@ func GetReadyNodes(f *Framework) (nodes *api.NodeList, err error) {
|
|||||||
return nodes, nil
|
return nodes, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// timeout for proxy requests.
|
||||||
|
const proxyTimeout = 2 * time.Minute
|
||||||
|
|
||||||
|
// NodeProxyRequest performs a get on a node proxy endpoint given the nodename and rest client.
|
||||||
|
func NodeProxyRequest(c *client.Client, node, endpoint string) (restclient.Result, error) {
|
||||||
|
// proxy tends to hang in some cases when Node is not ready. Add an artificial timeout for this call.
|
||||||
|
// This will leak a goroutine if proxy hangs. #22165
|
||||||
|
subResourceProxyAvailable, err := serverVersionGTE(subResourceServiceAndNodeProxyVersion, c)
|
||||||
|
if err != nil {
|
||||||
|
return restclient.Result{}, err
|
||||||
|
}
|
||||||
|
var result restclient.Result
|
||||||
|
finished := make(chan struct{})
|
||||||
|
go func() {
|
||||||
|
if subResourceProxyAvailable {
|
||||||
|
result = c.Get().
|
||||||
|
Resource("nodes").
|
||||||
|
SubResource("proxy").
|
||||||
|
Name(fmt.Sprintf("%v:%v", node, ports.KubeletPort)).
|
||||||
|
Suffix(endpoint).
|
||||||
|
Do()
|
||||||
|
|
||||||
|
} else {
|
||||||
|
result = c.Get().
|
||||||
|
Prefix("proxy").
|
||||||
|
Resource("nodes").
|
||||||
|
Name(fmt.Sprintf("%v:%v", node, ports.KubeletPort)).
|
||||||
|
Suffix(endpoint).
|
||||||
|
Do()
|
||||||
|
}
|
||||||
|
finished <- struct{}{}
|
||||||
|
}()
|
||||||
|
select {
|
||||||
|
case <-finished:
|
||||||
|
return result, nil
|
||||||
|
case <-time.After(proxyTimeout):
|
||||||
|
return restclient.Result{}, nil
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// GetKubeletPods retrieves the list of pods on the kubelet
|
||||||
|
func GetKubeletPods(c *client.Client, node string) (*api.PodList, error) {
|
||||||
|
return getKubeletPods(c, node, "pods")
|
||||||
|
}
|
||||||
|
|
||||||
|
// GetKubeletRunningPods retrieves the list of running pods on the kubelet. The pods
|
||||||
|
// includes necessary information (e.g., UID, name, namespace for
|
||||||
|
// pods/containers), but do not contain the full spec.
|
||||||
|
func GetKubeletRunningPods(c *client.Client, node string) (*api.PodList, error) {
|
||||||
|
return getKubeletPods(c, node, "runningpods")
|
||||||
|
}
|
||||||
|
|
||||||
|
func getKubeletPods(c *client.Client, node, resource string) (*api.PodList, error) {
|
||||||
|
result := &api.PodList{}
|
||||||
|
client, err := NodeProxyRequest(c, node, resource)
|
||||||
|
if err != nil {
|
||||||
|
return &api.PodList{}, err
|
||||||
|
}
|
||||||
|
if err = client.Into(result); err != nil {
|
||||||
|
return &api.PodList{}, err
|
||||||
|
}
|
||||||
|
return result, nil
|
||||||
|
}
|
||||||
|
|
||||||
// LaunchWebserverPod launches a pod serving http on port 8080 to act
|
// LaunchWebserverPod launches a pod serving http on port 8080 to act
|
||||||
// as the target for networking connectivity checks. The ip address
|
// as the target for networking connectivity checks. The ip address
|
||||||
// of the created pod will be returned if the pod is launched
|
// of the created pod will be returned if the pod is launched
|
||||||
|
Loading…
Reference in New Issue
Block a user