mirror of
https://github.com/k3s-io/kubernetes.git
synced 2025-07-26 13:07:07 +00:00
1. Scheduler bug-fix + scheduler-focussed E2E tests 2. Add cgroup v2 support for in-place pod resize 3. Enable full E2E pod resize test for containerd>=1.6.9 and EventedPLEG related changes. Co-Authored-By: Vinay Kulkarni <vskibum@gmail.com>
281 lines
8.2 KiB
Go
281 lines
8.2 KiB
Go
/*
|
|
Copyright 2023 The Kubernetes Authors.
|
|
|
|
Licensed under the Apache License, Version 2.0 (the "License");
|
|
you may not use this file except in compliance with the License.
|
|
You may obtain a copy of the License at
|
|
|
|
http://www.apache.org/licenses/LICENSE-2.0
|
|
|
|
Unless required by applicable law or agreed to in writing, software
|
|
distributed under the License is distributed on an "AS IS" BASIS,
|
|
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
See the License for the specific language governing permissions and
|
|
limitations under the License.
|
|
*/
|
|
|
|
package prober
|
|
|
|
import (
|
|
"context"
|
|
"fmt"
|
|
"io/ioutil"
|
|
"net"
|
|
"net/http"
|
|
"sync"
|
|
"sync/atomic"
|
|
"testing"
|
|
"time"
|
|
|
|
v1 "k8s.io/api/core/v1"
|
|
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
|
"k8s.io/apimachinery/pkg/types"
|
|
"k8s.io/apimachinery/pkg/util/intstr"
|
|
"k8s.io/client-go/kubernetes/fake"
|
|
"k8s.io/client-go/tools/record"
|
|
kubepod "k8s.io/kubernetes/pkg/kubelet/pod"
|
|
"k8s.io/kubernetes/pkg/kubelet/prober/results"
|
|
"k8s.io/kubernetes/pkg/kubelet/status"
|
|
statustest "k8s.io/kubernetes/pkg/kubelet/status/testing"
|
|
kubeletutil "k8s.io/kubernetes/pkg/kubelet/util"
|
|
utilpointer "k8s.io/utils/pointer"
|
|
)
|
|
|
|
// TCP sockets goes through a TIME-WAIT state (default 60 sec) before being freed,
|
|
// causing conntrack entries and ephemeral ports to be hold for 60 seconds
|
|
// despite the probe may have finished in less than 1 second.
|
|
// If the rate of probes is higher than the rate the OS recycles the ports used,
|
|
// it can consume a considerable number of ephemeral ports or conntrack entries.
|
|
// These tests verify that after certain period the probes keep working, if the probes
|
|
// don't close the sockets faster, they will start to fail.
|
|
// The test creates a TCP or HTTP server to fake a pod. It creates 1 pod with 600 fake
|
|
// containers each and runs one probe for each of these containers (all the probes comes
|
|
// from the same process, same as in the Kubelet, and targets the same IP:port to verify
|
|
// that the ephemeral port is not exhausted.
|
|
|
|
// The default port range on a normal Linux system has 28321 free ephemeral ports per
|
|
// tuple srcIP,srcPort:dstIP:dstPort:Proto: /proc/sys/net/ipv4/ip_local_port_range 32768 60999
|
|
// 1 pods x 600 containers/pod x 1 probes/container x 1 req/sec = 600 req/sec
|
|
// 600 req/sec x 59 sec = 35400
|
|
// The test should run out of ephemeral ports in less than one minute and start failing connections
|
|
// Ref: https://github.com/kubernetes/kubernetes/issues/89898#issuecomment-1383207322
|
|
|
|
func TestTCPPortExhaustion(t *testing.T) {
|
|
// This test creates a considereable number of connections in a short time
|
|
// and flakes on constrained environments, thus it is skipped by default.
|
|
// The test is left for manual verification or experimentation with new
|
|
// changes on the probes.
|
|
t.Skip("skipping TCP port exhaustion tests")
|
|
|
|
const (
|
|
numTestPods = 1
|
|
numContainers = 600
|
|
)
|
|
|
|
tests := []struct {
|
|
name string
|
|
http bool // it can be tcp or http
|
|
}{
|
|
{"TCP", false},
|
|
{"HTTP", true},
|
|
}
|
|
for _, tt := range tests {
|
|
t.Run(fmt.Sprintf(tt.name), func(t *testing.T) {
|
|
testRootDir := ""
|
|
if tempDir, err := ioutil.TempDir("", "kubelet_test."); err != nil {
|
|
t.Fatalf("can't make a temp rootdir: %v", err)
|
|
} else {
|
|
testRootDir = tempDir
|
|
}
|
|
podManager := kubepod.NewBasicPodManager(nil)
|
|
podStartupLatencyTracker := kubeletutil.NewPodStartupLatencyTracker()
|
|
m := NewManager(
|
|
status.NewManager(&fake.Clientset{}, podManager, &statustest.FakePodDeletionSafetyProvider{}, podStartupLatencyTracker, testRootDir),
|
|
results.NewManager(),
|
|
results.NewManager(),
|
|
results.NewManager(),
|
|
nil, // runner
|
|
&record.FakeRecorder{},
|
|
).(*manager)
|
|
defer cleanup(t, m)
|
|
|
|
now := time.Now()
|
|
fakePods := make([]*fakePod, numTestPods)
|
|
for i := 0; i < numTestPods; i++ {
|
|
fake, err := newFakePod(tt.http)
|
|
if err != nil {
|
|
t.Fatalf("unexpected error creating fake pod: %v", err)
|
|
}
|
|
defer fake.stop()
|
|
handler := fake.probeHandler()
|
|
fakePods[i] = fake
|
|
|
|
pod := v1.Pod{
|
|
ObjectMeta: metav1.ObjectMeta{
|
|
UID: types.UID(fmt.Sprintf("pod%d", i)),
|
|
Name: fmt.Sprintf("pod%d", i),
|
|
Namespace: "test",
|
|
},
|
|
Spec: v1.PodSpec{},
|
|
Status: v1.PodStatus{
|
|
Phase: v1.PodPhase(v1.PodReady),
|
|
PodIPs: []v1.PodIP{{IP: "127.0.0.1"}},
|
|
},
|
|
}
|
|
for j := 0; j < numContainers; j++ {
|
|
// use only liveness probes for simplicity, initial state is success for them
|
|
pod.Spec.Containers = append(pod.Spec.Containers, v1.Container{
|
|
Name: fmt.Sprintf("container%d", j),
|
|
LivenessProbe: newProbe(handler),
|
|
})
|
|
pod.Status.ContainerStatuses = append(pod.Status.ContainerStatuses, v1.ContainerStatus{
|
|
Name: fmt.Sprintf("container%d", j),
|
|
ContainerID: fmt.Sprintf("pod%d://container%d", i, j),
|
|
State: v1.ContainerState{
|
|
Running: &v1.ContainerStateRunning{
|
|
StartedAt: metav1.Now(),
|
|
},
|
|
},
|
|
Started: utilpointer.Bool(true),
|
|
})
|
|
}
|
|
podManager.AddPod(&pod)
|
|
m.statusManager.SetPodStatus(&pod, pod.Status)
|
|
m.AddPod(&pod)
|
|
}
|
|
t.Logf("Adding %d pods with %d containers each in %v", numTestPods, numContainers, time.Since(now))
|
|
|
|
ctx, cancel := context.WithTimeout(context.Background(), 59*time.Second)
|
|
defer cancel()
|
|
var wg sync.WaitGroup
|
|
|
|
wg.Add(1)
|
|
go func() {
|
|
defer wg.Done()
|
|
for {
|
|
var result results.Update
|
|
var probeType string
|
|
select {
|
|
case result = <-m.startupManager.Updates():
|
|
probeType = "startup"
|
|
case result = <-m.livenessManager.Updates():
|
|
probeType = "liveness"
|
|
case result = <-m.readinessManager.Updates():
|
|
probeType = "readiness"
|
|
case <-ctx.Done():
|
|
return
|
|
}
|
|
switch result.Result.String() {
|
|
// The test will fail if any of the probes fails
|
|
case "Failure":
|
|
t.Errorf("Failure %s on contantinerID: %v Pod %v", probeType, result.ContainerID, result.PodUID)
|
|
case "UNKNOWN": // startup probes
|
|
t.Logf("UNKNOWN state for %v", result)
|
|
default:
|
|
}
|
|
}
|
|
}()
|
|
wg.Wait()
|
|
|
|
// log the number of connections received in each pod for debugging test failures.
|
|
for _, pod := range fakePods {
|
|
n := pod.connections()
|
|
t.Logf("Number of connections %d", n)
|
|
}
|
|
|
|
})
|
|
}
|
|
|
|
}
|
|
|
|
func newProbe(handler v1.ProbeHandler) *v1.Probe {
|
|
return &v1.Probe{
|
|
ProbeHandler: handler,
|
|
TimeoutSeconds: 1,
|
|
PeriodSeconds: 1,
|
|
SuccessThreshold: 1,
|
|
FailureThreshold: 3,
|
|
}
|
|
}
|
|
|
|
// newFakePod runs a server (TCP or HTTP) in a random port
|
|
func newFakePod(httpServer bool) (*fakePod, error) {
|
|
ln, err := net.Listen("tcp", "127.0.0.1:0")
|
|
if err != nil {
|
|
return nil, fmt.Errorf("failed to bind: %v", err)
|
|
}
|
|
f := &fakePod{ln: ln, http: httpServer}
|
|
|
|
// spawn an http server or a TCP server that counts the number of connections received
|
|
if httpServer {
|
|
var mu sync.Mutex
|
|
visitors := map[string]struct{}{}
|
|
go http.Serve(ln, http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
|
mu.Lock()
|
|
defer mu.Unlock()
|
|
if _, ok := visitors[r.RemoteAddr]; !ok {
|
|
atomic.AddInt64(&f.numConnection, 1)
|
|
visitors[r.RemoteAddr] = struct{}{}
|
|
}
|
|
}))
|
|
} else {
|
|
go func() {
|
|
for {
|
|
conn, err := ln.Accept()
|
|
if err != nil {
|
|
// exit when the listener is closed
|
|
return
|
|
}
|
|
atomic.AddInt64(&f.numConnection, 1)
|
|
// handle request but not block
|
|
go func(c net.Conn) {
|
|
defer c.Close()
|
|
// read but swallow the errors since the probe doesn't send data
|
|
buffer := make([]byte, 1024)
|
|
c.Read(buffer)
|
|
// respond
|
|
conn.Write([]byte("Hi back!\n"))
|
|
}(conn)
|
|
|
|
}
|
|
}()
|
|
}
|
|
return f, nil
|
|
|
|
}
|
|
|
|
type fakePod struct {
|
|
ln net.Listener
|
|
numConnection int64
|
|
http bool
|
|
}
|
|
|
|
func (f *fakePod) probeHandler() v1.ProbeHandler {
|
|
port := f.ln.Addr().(*net.TCPAddr).Port
|
|
var handler v1.ProbeHandler
|
|
if f.http {
|
|
handler = v1.ProbeHandler{
|
|
HTTPGet: &v1.HTTPGetAction{
|
|
Host: "127.0.0.1",
|
|
Port: intstr.FromInt(port),
|
|
},
|
|
}
|
|
} else {
|
|
handler = v1.ProbeHandler{
|
|
TCPSocket: &v1.TCPSocketAction{
|
|
Host: "127.0.0.1",
|
|
Port: intstr.FromInt(port),
|
|
},
|
|
}
|
|
}
|
|
return handler
|
|
}
|
|
|
|
func (f *fakePod) stop() {
|
|
f.ln.Close()
|
|
}
|
|
|
|
func (f *fakePod) connections() int {
|
|
return int(atomic.LoadInt64(&f.numConnection))
|
|
}
|