Merge pull request #115143 from aojea/linger

Kubelet TCP/HTTP probes: improve network resources utilization
This commit is contained in:
Kubernetes Prow Robot 2023-01-22 10:00:03 -08:00 committed by GitHub
commit 1e3cc23b9f
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 363 additions and 2 deletions

View File

@ -0,0 +1,271 @@
/*
Copyright 2023 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package prober
import (
"context"
"fmt"
"net"
"net/http"
"sync"
"sync/atomic"
"testing"
"time"
v1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/intstr"
"k8s.io/client-go/kubernetes/fake"
"k8s.io/client-go/tools/record"
kubepod "k8s.io/kubernetes/pkg/kubelet/pod"
"k8s.io/kubernetes/pkg/kubelet/prober/results"
"k8s.io/kubernetes/pkg/kubelet/status"
statustest "k8s.io/kubernetes/pkg/kubelet/status/testing"
kubeletutil "k8s.io/kubernetes/pkg/kubelet/util"
utilpointer "k8s.io/utils/pointer"
)
// TCP sockets goes through a TIME-WAIT state (default 60 sec) before being freed,
// causing conntrack entries and ephemeral ports to be hold for 60 seconds
// despite the probe may have finished in less than 1 second.
// If the rate of probes is higher than the rate the OS recycles the ports used,
// it can consume a considerable number of ephemeral ports or conntrack entries.
// These tests verify that after certain period the probes keep working, if the probes
// don't close the sockets faster, they will start to fail.
// The test creates a TCP or HTTP server to fake a pod. It creates 1 pod with 600 fake
// containers each and runs one probe for each of these containers (all the probes comes
// from the same process, same as in the Kubelet, and targets the same IP:port to verify
// that the ephemeral port is not exhausted.
// The default port range on a normal Linux system has 28321 free ephemeral ports per
// tuple srcIP,srcPort:dstIP:dstPort:Proto: /proc/sys/net/ipv4/ip_local_port_range 32768 60999
// 1 pods x 600 containers/pod x 1 probes/container x 1 req/sec = 600 req/sec
// 600 req/sec x 59 sec = 35400
// The test should run out of ephemeral ports in less than one minute and start failing connections
// Ref: https://github.com/kubernetes/kubernetes/issues/89898#issuecomment-1383207322
func TestTCPPortExhaustion(t *testing.T) {
const (
numTestPods = 1
numContainers = 600
)
if testing.Short() {
t.Skip("skipping TCP port exhaustion in short mode")
}
tests := []struct {
name string
http bool // it can be tcp or http
}{
{"TCP", false},
{"HTTP", true},
}
for _, tt := range tests {
t.Run(fmt.Sprintf(tt.name), func(t *testing.T) {
podManager := kubepod.NewBasicPodManager(nil)
podStartupLatencyTracker := kubeletutil.NewPodStartupLatencyTracker()
m := NewManager(
status.NewManager(&fake.Clientset{}, podManager, &statustest.FakePodDeletionSafetyProvider{}, podStartupLatencyTracker),
results.NewManager(),
results.NewManager(),
results.NewManager(),
nil, // runner
&record.FakeRecorder{},
).(*manager)
defer cleanup(t, m)
now := time.Now()
fakePods := make([]*fakePod, numTestPods)
for i := 0; i < numTestPods; i++ {
fake, err := newFakePod(tt.http)
if err != nil {
t.Fatalf("unexpected error creating fake pod: %v", err)
}
defer fake.stop()
handler := fake.probeHandler()
fakePods[i] = fake
pod := v1.Pod{
ObjectMeta: metav1.ObjectMeta{
UID: types.UID(fmt.Sprintf("pod%d", i)),
Name: fmt.Sprintf("pod%d", i),
Namespace: "test",
},
Spec: v1.PodSpec{},
Status: v1.PodStatus{
Phase: v1.PodPhase(v1.PodReady),
PodIPs: []v1.PodIP{{IP: "127.0.0.1"}},
},
}
for j := 0; j < numContainers; j++ {
// use only liveness probes for simplicity, initial state is success for them
pod.Spec.Containers = append(pod.Spec.Containers, v1.Container{
Name: fmt.Sprintf("container%d", j),
LivenessProbe: newProbe(handler),
})
pod.Status.ContainerStatuses = append(pod.Status.ContainerStatuses, v1.ContainerStatus{
Name: fmt.Sprintf("container%d", j),
ContainerID: fmt.Sprintf("pod%d://container%d", i, j),
State: v1.ContainerState{
Running: &v1.ContainerStateRunning{
StartedAt: metav1.Now(),
},
},
Started: utilpointer.Bool(true),
})
}
podManager.AddPod(&pod)
m.statusManager.SetPodStatus(&pod, pod.Status)
m.AddPod(&pod)
}
t.Logf("Adding %d pods with %d containers each in %v", numTestPods, numContainers, time.Since(now))
ctx, cancel := context.WithTimeout(context.Background(), 59*time.Second)
defer cancel()
var wg sync.WaitGroup
wg.Add(1)
go func() {
defer wg.Done()
for {
var result results.Update
var probeType string
select {
case result = <-m.startupManager.Updates():
probeType = "startup"
case result = <-m.livenessManager.Updates():
probeType = "liveness"
case result = <-m.readinessManager.Updates():
probeType = "readiness"
case <-ctx.Done():
return
}
switch result.Result.String() {
// The test will fail if any of the probes fails
case "Failure":
t.Errorf("Failure %s on contantinerID: %v Pod %v", probeType, result.ContainerID, result.PodUID)
case "UNKNOWN": // startup probes
t.Logf("UNKNOWN state for %v", result)
default:
}
}
}()
wg.Wait()
// log the number of connections received in each pod for debugging test failures.
for _, pod := range fakePods {
n := pod.connections()
t.Logf("Number of connections %d", n)
}
})
}
}
func newProbe(handler v1.ProbeHandler) *v1.Probe {
return &v1.Probe{
ProbeHandler: handler,
TimeoutSeconds: 1,
PeriodSeconds: 1,
SuccessThreshold: 1,
FailureThreshold: 3,
}
}
// newFakePod runs a server (TCP or HTTP) in a random port
func newFakePod(httpServer bool) (*fakePod, error) {
ln, err := net.Listen("tcp", "127.0.0.1:0")
if err != nil {
return nil, fmt.Errorf("failed to bind: %v", err)
}
f := &fakePod{ln: ln, http: httpServer}
// spawn an http server or a TCP server that counts the number of connections received
if httpServer {
var mu sync.Mutex
visitors := map[string]struct{}{}
go http.Serve(ln, http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
mu.Lock()
defer mu.Unlock()
if _, ok := visitors[r.RemoteAddr]; !ok {
atomic.AddInt64(&f.numConnection, 1)
visitors[r.RemoteAddr] = struct{}{}
}
}))
} else {
go func() {
for {
conn, err := ln.Accept()
if err != nil {
// exit when the listener is closed
return
}
atomic.AddInt64(&f.numConnection, 1)
// handle request but not block
go func(c net.Conn) {
defer c.Close()
// read but swallow the errors since the probe doesn't send data
buffer := make([]byte, 1024)
c.Read(buffer)
// respond
conn.Write([]byte("Hi back!\n"))
}(conn)
}
}()
}
return f, nil
}
type fakePod struct {
ln net.Listener
numConnection int64
http bool
}
func (f *fakePod) probeHandler() v1.ProbeHandler {
port := f.ln.Addr().(*net.TCPAddr).Port
var handler v1.ProbeHandler
if f.http {
handler = v1.ProbeHandler{
HTTPGet: &v1.HTTPGetAction{
Host: "127.0.0.1",
Port: intstr.FromInt(port),
},
}
} else {
handler = v1.ProbeHandler{
TCPSocket: &v1.TCPSocketAction{
Host: "127.0.0.1",
Port: intstr.FromInt(port),
},
}
}
return handler
}
func (f *fakePod) stop() {
f.ln.Close()
}
func (f *fakePod) connections() int {
return int(atomic.LoadInt64(&f.numConnection))
}

View File

@ -0,0 +1,42 @@
//go:build !windows
// +build !windows
/*
Copyright 2023 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package probe
import (
"net"
"syscall"
)
// ProbeDialer returns a dialer optimized for probes to avoid lingering sockets on TIME-WAIT state.
// The dialer reduces the TIME-WAIT period to 1 seconds instead of the OS default of 60 seconds.
// Using 1 second instead of 0 because SO_LINGER socket option to 0 causes pending data to be
// discarded and the connection to be aborted with an RST rather than for the pending data to be
// transmitted and the connection closed cleanly with a FIN.
// Ref: https://issues.k8s.io/89898
func ProbeDialer() *net.Dialer {
dialer := &net.Dialer{
Control: func(network, address string, c syscall.RawConn) error {
return c.Control(func(fd uintptr) {
syscall.SetsockoptLinger(int(fd), syscall.SOL_SOCKET, syscall.SO_LINGER, &syscall.Linger{Onoff: 1, Linger: 1})
})
},
}
return dialer
}

View File

@ -0,0 +1,42 @@
//go:build windows
// +build windows
/*
Copyright 2023 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package probe
import (
"net"
"syscall"
)
// ProbeDialer returns a dialer optimized for probes to avoid lingering sockets on TIME-WAIT state.
// The dialer reduces the TIME-WAIT period to 1 seconds instead of the OS default of 60 seconds.
// Using 1 second instead of 0 because SO_LINGER socket option to 0 causes pending data to be
// discarded and the connection to be aborted with an RST rather than for the pending data to be
// transmitted and the connection closed cleanly with a FIN.
// Ref: https://issues.k8s.io/89898
func ProbeDialer() *net.Dialer {
dialer := &net.Dialer{
Control: func(network, address string, c syscall.RawConn) error {
return c.Control(func(fd uintptr) {
syscall.SetsockoptLinger(syscall.Handle(fd), syscall.SOL_SOCKET, syscall.SO_LINGER, &syscall.Linger{Onoff: 1, Linger: 1})
})
},
}
return dialer
}

View File

@ -53,7 +53,11 @@ func NewWithTLSConfig(config *tls.Config, followNonLocalRedirects bool) Prober {
DisableKeepAlives: true, DisableKeepAlives: true,
Proxy: http.ProxyURL(nil), Proxy: http.ProxyURL(nil),
DisableCompression: true, // removes Accept-Encoding header DisableCompression: true, // removes Accept-Encoding header
// DialContext creates unencrypted TCP connections
// and is also used by the transport for HTTPS connection
DialContext: probe.ProbeDialer().DialContext,
}) })
return httpProber{transport, followNonLocalRedirects} return httpProber{transport, followNonLocalRedirects}
} }

View File

@ -38,7 +38,7 @@ type Prober interface {
type tcpProber struct{} type tcpProber struct{}
// Probe returns a ProbeRunner capable of running an TCP check. // Probe checks that a TCP connection to the address can be opened.
func (pr tcpProber) Probe(host string, port int, timeout time.Duration) (probe.Result, string, error) { func (pr tcpProber) Probe(host string, port int, timeout time.Duration) (probe.Result, string, error) {
return DoTCPProbe(net.JoinHostPort(host, strconv.Itoa(port)), timeout) return DoTCPProbe(net.JoinHostPort(host, strconv.Itoa(port)), timeout)
} }
@ -48,7 +48,9 @@ func (pr tcpProber) Probe(host string, port int, timeout time.Duration) (probe.R
// If the socket fails to open, it returns Failure. // If the socket fails to open, it returns Failure.
// This is exported because some other packages may want to do direct TCP probes. // This is exported because some other packages may want to do direct TCP probes.
func DoTCPProbe(addr string, timeout time.Duration) (probe.Result, string, error) { func DoTCPProbe(addr string, timeout time.Duration) (probe.Result, string, error) {
conn, err := net.DialTimeout("tcp", addr, timeout) d := probe.ProbeDialer()
d.Timeout = timeout
conn, err := d.Dial("tcp", addr)
if err != nil { if err != nil {
// Convert errors to failures to handle timeouts. // Convert errors to failures to handle timeouts.
return probe.Failure, err.Error(), nil return probe.Failure, err.Error(), nil