mirror of
https://github.com/k3s-io/kubernetes.git
synced 2025-07-26 21:17:23 +00:00
Merge pull request #60531 from dashpole/memcg_update
Automatic merge from submit-queue (batch tested with PRs 60759, 60531, 60923, 60851, 58717). If you want to cherry-pick this change to another branch, please follow the instructions <a href="https://github.com/kubernetes/community/blob/master/contributors/devel/cherry-picks.md">here</a>. Subtract inactive_file from usage when setting memcg threshold **What this PR does / why we need it**: Implements solution for #51745, proposed in https://github.com/kubernetes/community/pull/1451. This is a prerequisite to fixing https://github.com/kubernetes/kubernetes/issues/57901. **Which issue(s) this PR fixes** *(optional, in `fixes #<issue number>(, fixes #<issue_number>, ...)` format, will close the issue(s) when PR gets merged)*: Fixes #51745 **Special notes for your reviewer**: **Release note**: ```release-note NONE ``` /sig node /priority important-longterm /kind bug /assign @sjenning @derekwaynecarr @dchen1107
This commit is contained in:
commit
a2f1c24254
@ -94,7 +94,6 @@ go_library(
|
|||||||
"//vendor/k8s.io/apimachinery/pkg/api/resource:go_default_library",
|
"//vendor/k8s.io/apimachinery/pkg/api/resource:go_default_library",
|
||||||
"//vendor/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library",
|
"//vendor/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library",
|
||||||
"//vendor/k8s.io/apimachinery/pkg/util/clock:go_default_library",
|
"//vendor/k8s.io/apimachinery/pkg/util/clock:go_default_library",
|
||||||
"//vendor/k8s.io/apimachinery/pkg/util/wait:go_default_library",
|
|
||||||
"//vendor/k8s.io/apiserver/pkg/util/feature:go_default_library",
|
"//vendor/k8s.io/apiserver/pkg/util/feature:go_default_library",
|
||||||
"//vendor/k8s.io/client-go/tools/record:go_default_library",
|
"//vendor/k8s.io/client-go/tools/record:go_default_library",
|
||||||
] + select({
|
] + select({
|
||||||
|
@ -27,7 +27,6 @@ import (
|
|||||||
"k8s.io/api/core/v1"
|
"k8s.io/api/core/v1"
|
||||||
"k8s.io/apimachinery/pkg/api/resource"
|
"k8s.io/apimachinery/pkg/api/resource"
|
||||||
"k8s.io/apimachinery/pkg/util/clock"
|
"k8s.io/apimachinery/pkg/util/clock"
|
||||||
"k8s.io/apimachinery/pkg/util/wait"
|
|
||||||
utilfeature "k8s.io/apiserver/pkg/util/feature"
|
utilfeature "k8s.io/apiserver/pkg/util/feature"
|
||||||
"k8s.io/client-go/tools/record"
|
"k8s.io/client-go/tools/record"
|
||||||
apiv1resource "k8s.io/kubernetes/pkg/api/v1/resource"
|
apiv1resource "k8s.io/kubernetes/pkg/api/v1/resource"
|
||||||
@ -83,8 +82,8 @@ type managerImpl struct {
|
|||||||
resourceToNodeReclaimFuncs map[v1.ResourceName]nodeReclaimFuncs
|
resourceToNodeReclaimFuncs map[v1.ResourceName]nodeReclaimFuncs
|
||||||
// last observations from synchronize
|
// last observations from synchronize
|
||||||
lastObservations signalObservations
|
lastObservations signalObservations
|
||||||
// notifiersInitialized indicates if the threshold notifiers have been initialized (i.e. synchronize() has been called once)
|
// notifierStopCh is a channel used to stop all thresholdNotifiers
|
||||||
notifiersInitialized bool
|
notifierStopCh ThresholdStopCh
|
||||||
// dedicatedImageFs indicates if imagefs is on a separate device from the rootfs
|
// dedicatedImageFs indicates if imagefs is on a separate device from the rootfs
|
||||||
dedicatedImageFs *bool
|
dedicatedImageFs *bool
|
||||||
}
|
}
|
||||||
@ -114,6 +113,7 @@ func NewManager(
|
|||||||
nodeRef: nodeRef,
|
nodeRef: nodeRef,
|
||||||
nodeConditionsLastObservedAt: nodeConditionsObservedAt{},
|
nodeConditionsLastObservedAt: nodeConditionsObservedAt{},
|
||||||
thresholdsFirstObservedAt: thresholdsObservedAt{},
|
thresholdsFirstObservedAt: thresholdsObservedAt{},
|
||||||
|
notifierStopCh: NewInitialStopCh(clock),
|
||||||
dedicatedImageFs: nil,
|
dedicatedImageFs: nil,
|
||||||
}
|
}
|
||||||
return manager, manager
|
return manager, manager
|
||||||
@ -184,15 +184,11 @@ func (m *managerImpl) IsUnderPIDPressure() bool {
|
|||||||
return hasNodeCondition(m.nodeConditions, v1.NodePIDPressure)
|
return hasNodeCondition(m.nodeConditions, v1.NodePIDPressure)
|
||||||
}
|
}
|
||||||
|
|
||||||
func startMemoryThresholdNotifier(thresholds []evictionapi.Threshold, observations signalObservations, hard bool, handler thresholdNotifierHandlerFunc) error {
|
func (m *managerImpl) startMemoryThresholdNotifier(summary *statsapi.Summary, hard bool, handler thresholdNotifierHandlerFunc) error {
|
||||||
for _, threshold := range thresholds {
|
for _, threshold := range m.config.Thresholds {
|
||||||
if threshold.Signal != evictionapi.SignalMemoryAvailable || hard != isHardEvictionThreshold(threshold) {
|
if threshold.Signal != evictionapi.SignalMemoryAvailable || hard != isHardEvictionThreshold(threshold) {
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
observed, found := observations[evictionapi.SignalMemoryAvailable]
|
|
||||||
if !found {
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
cgroups, err := cm.GetCgroupSubsystems()
|
cgroups, err := cm.GetCgroupSubsystems()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
@ -203,15 +199,23 @@ func startMemoryThresholdNotifier(thresholds []evictionapi.Threshold, observatio
|
|||||||
return fmt.Errorf("memory cgroup mount point not found")
|
return fmt.Errorf("memory cgroup mount point not found")
|
||||||
}
|
}
|
||||||
attribute := "memory.usage_in_bytes"
|
attribute := "memory.usage_in_bytes"
|
||||||
quantity := evictionapi.GetThresholdQuantity(threshold.Value, observed.capacity)
|
if summary.Node.Memory == nil || summary.Node.Memory.UsageBytes == nil || summary.Node.Memory.WorkingSetBytes == nil {
|
||||||
usageThreshold := resource.NewQuantity(observed.capacity.Value(), resource.DecimalSI)
|
return fmt.Errorf("summary was incomplete")
|
||||||
usageThreshold.Sub(*quantity)
|
}
|
||||||
|
// Set threshold on usage to capacity - eviction_hard + inactive_file,
|
||||||
|
// since we want to be notified when working_set = capacity - eviction_hard
|
||||||
|
inactiveFile := resource.NewQuantity(int64(*summary.Node.Memory.UsageBytes-*summary.Node.Memory.WorkingSetBytes), resource.BinarySI)
|
||||||
|
capacity := resource.NewQuantity(int64(*summary.Node.Memory.AvailableBytes+*summary.Node.Memory.WorkingSetBytes), resource.BinarySI)
|
||||||
|
evictionThresholdQuantity := evictionapi.GetThresholdQuantity(threshold.Value, capacity)
|
||||||
|
memcgThreshold := capacity.DeepCopy()
|
||||||
|
memcgThreshold.Sub(*evictionThresholdQuantity)
|
||||||
|
memcgThreshold.Add(*inactiveFile)
|
||||||
description := fmt.Sprintf("<%s available", formatThresholdValue(threshold.Value))
|
description := fmt.Sprintf("<%s available", formatThresholdValue(threshold.Value))
|
||||||
memcgThresholdNotifier, err := NewMemCGThresholdNotifier(cgpath, attribute, usageThreshold.String(), description, handler)
|
memcgThresholdNotifier, err := NewMemCGThresholdNotifier(cgpath, attribute, memcgThreshold.String(), description, handler)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
go memcgThresholdNotifier.Start(wait.NeverStop)
|
go memcgThresholdNotifier.Start(m.notifierStopCh)
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
return nil
|
return nil
|
||||||
@ -247,16 +251,11 @@ func (m *managerImpl) synchronize(diskInfoProvider DiskInfoProvider, podFunc Act
|
|||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// make observations and get a function to derive pod usage stats relative to those observations.
|
|
||||||
observations, statsFunc := makeSignalObservations(summary)
|
|
||||||
debugLogObservations("observations", observations)
|
|
||||||
|
|
||||||
// attempt to create a threshold notifier to improve eviction response time
|
// attempt to create a threshold notifier to improve eviction response time
|
||||||
if m.config.KernelMemcgNotification && !m.notifiersInitialized {
|
if m.config.KernelMemcgNotification && m.notifierStopCh.Reset() {
|
||||||
glog.Infof("eviction manager attempting to integrate with kernel memcg notification api")
|
glog.V(4).Infof("eviction manager attempting to integrate with kernel memcg notification api")
|
||||||
m.notifiersInitialized = true
|
|
||||||
// start soft memory notification
|
// start soft memory notification
|
||||||
err = startMemoryThresholdNotifier(m.config.Thresholds, observations, false, func(desc string) {
|
err = m.startMemoryThresholdNotifier(summary, false, func(desc string) {
|
||||||
glog.Infof("soft memory eviction threshold crossed at %s", desc)
|
glog.Infof("soft memory eviction threshold crossed at %s", desc)
|
||||||
// TODO wait grace period for soft memory limit
|
// TODO wait grace period for soft memory limit
|
||||||
m.synchronize(diskInfoProvider, podFunc)
|
m.synchronize(diskInfoProvider, podFunc)
|
||||||
@ -265,7 +264,7 @@ func (m *managerImpl) synchronize(diskInfoProvider DiskInfoProvider, podFunc Act
|
|||||||
glog.Warningf("eviction manager: failed to create soft memory threshold notifier: %v", err)
|
glog.Warningf("eviction manager: failed to create soft memory threshold notifier: %v", err)
|
||||||
}
|
}
|
||||||
// start hard memory notification
|
// start hard memory notification
|
||||||
err = startMemoryThresholdNotifier(m.config.Thresholds, observations, true, func(desc string) {
|
err = m.startMemoryThresholdNotifier(summary, true, func(desc string) {
|
||||||
glog.Infof("hard memory eviction threshold crossed at %s", desc)
|
glog.Infof("hard memory eviction threshold crossed at %s", desc)
|
||||||
m.synchronize(diskInfoProvider, podFunc)
|
m.synchronize(diskInfoProvider, podFunc)
|
||||||
})
|
})
|
||||||
@ -274,6 +273,10 @@ func (m *managerImpl) synchronize(diskInfoProvider DiskInfoProvider, podFunc Act
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// make observations and get a function to derive pod usage stats relative to those observations.
|
||||||
|
observations, statsFunc := makeSignalObservations(summary)
|
||||||
|
debugLogObservations("observations", observations)
|
||||||
|
|
||||||
// determine the set of thresholds met independent of grace period
|
// determine the set of thresholds met independent of grace period
|
||||||
thresholds = thresholdsMet(thresholds, observations, false)
|
thresholds = thresholdsMet(thresholds, observations, false)
|
||||||
debugLogThresholdsWithObservation("thresholds - ignoring grace period", thresholds, observations)
|
debugLogThresholdsWithObservation("thresholds - ignoring grace period", thresholds, observations)
|
||||||
|
@ -21,11 +21,13 @@ import (
|
|||||||
"sort"
|
"sort"
|
||||||
"strconv"
|
"strconv"
|
||||||
"strings"
|
"strings"
|
||||||
|
"sync"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
"github.com/golang/glog"
|
"github.com/golang/glog"
|
||||||
"k8s.io/api/core/v1"
|
"k8s.io/api/core/v1"
|
||||||
"k8s.io/apimachinery/pkg/api/resource"
|
"k8s.io/apimachinery/pkg/api/resource"
|
||||||
|
"k8s.io/apimachinery/pkg/util/clock"
|
||||||
utilfeature "k8s.io/apiserver/pkg/util/feature"
|
utilfeature "k8s.io/apiserver/pkg/util/feature"
|
||||||
"k8s.io/kubernetes/pkg/features"
|
"k8s.io/kubernetes/pkg/features"
|
||||||
statsapi "k8s.io/kubernetes/pkg/kubelet/apis/stats/v1alpha1"
|
statsapi "k8s.io/kubernetes/pkg/kubelet/apis/stats/v1alpha1"
|
||||||
@ -52,6 +54,9 @@ const (
|
|||||||
resourceNodeFs v1.ResourceName = "nodefs"
|
resourceNodeFs v1.ResourceName = "nodefs"
|
||||||
// nodefs inodes, number. internal to this module, used to account for local node root filesystem inodes.
|
// nodefs inodes, number. internal to this module, used to account for local node root filesystem inodes.
|
||||||
resourceNodeFsInodes v1.ResourceName = "nodefsInodes"
|
resourceNodeFsInodes v1.ResourceName = "nodefsInodes"
|
||||||
|
// this prevents constantly updating the memcg notifier if synchronize
|
||||||
|
// is run frequently.
|
||||||
|
notifierRefreshInterval = 10 * time.Second
|
||||||
)
|
)
|
||||||
|
|
||||||
var (
|
var (
|
||||||
@ -1080,3 +1085,38 @@ func buildResourceToNodeReclaimFuncs(imageGC ImageGC, containerGC ContainerGC, w
|
|||||||
}
|
}
|
||||||
return resourceToReclaimFunc
|
return resourceToReclaimFunc
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// thresholdStopCh is a ThresholdStopCh which can only be closed after notifierRefreshInterval time has passed
|
||||||
|
type thresholdStopCh struct {
|
||||||
|
lock sync.Mutex
|
||||||
|
ch chan struct{}
|
||||||
|
startTime time.Time
|
||||||
|
// used to track time
|
||||||
|
clock clock.Clock
|
||||||
|
}
|
||||||
|
|
||||||
|
// NewInitialStopCh returns a ThresholdStopCh which can be closed immediately
|
||||||
|
func NewInitialStopCh(clock clock.Clock) ThresholdStopCh {
|
||||||
|
return &thresholdStopCh{ch: make(chan struct{}), clock: clock}
|
||||||
|
}
|
||||||
|
|
||||||
|
// implements ThresholdStopCh.Reset
|
||||||
|
func (t *thresholdStopCh) Reset() (closed bool) {
|
||||||
|
t.lock.Lock()
|
||||||
|
defer t.lock.Unlock()
|
||||||
|
closed = t.clock.Since(t.startTime) > notifierRefreshInterval
|
||||||
|
if closed {
|
||||||
|
// close the old channel and reopen a new one
|
||||||
|
close(t.ch)
|
||||||
|
t.startTime = t.clock.Now()
|
||||||
|
t.ch = make(chan struct{})
|
||||||
|
}
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
// implements ThresholdStopCh.Ch
|
||||||
|
func (t *thresholdStopCh) Ch() <-chan struct{} {
|
||||||
|
t.lock.Lock()
|
||||||
|
defer t.lock.Unlock()
|
||||||
|
return t.ch
|
||||||
|
}
|
||||||
|
@ -19,6 +19,7 @@ package eviction
|
|||||||
import (
|
import (
|
||||||
"fmt"
|
"fmt"
|
||||||
"reflect"
|
"reflect"
|
||||||
|
"sync"
|
||||||
"testing"
|
"testing"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
@ -26,6 +27,7 @@ import (
|
|||||||
"k8s.io/apimachinery/pkg/api/resource"
|
"k8s.io/apimachinery/pkg/api/resource"
|
||||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||||
"k8s.io/apimachinery/pkg/types"
|
"k8s.io/apimachinery/pkg/types"
|
||||||
|
"k8s.io/apimachinery/pkg/util/clock"
|
||||||
utilfeature "k8s.io/apiserver/pkg/util/feature"
|
utilfeature "k8s.io/apiserver/pkg/util/feature"
|
||||||
api "k8s.io/kubernetes/pkg/apis/core"
|
api "k8s.io/kubernetes/pkg/apis/core"
|
||||||
"k8s.io/kubernetes/pkg/features"
|
"k8s.io/kubernetes/pkg/features"
|
||||||
@ -1914,3 +1916,34 @@ func (s1 thresholdList) Equal(s2 thresholdList) bool {
|
|||||||
}
|
}
|
||||||
return true
|
return true
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func TestThresholdStopCh(t *testing.T) {
|
||||||
|
var wg sync.WaitGroup
|
||||||
|
fakeClock := clock.NewFakeClock(time.Now())
|
||||||
|
stop := NewInitialStopCh(fakeClock)
|
||||||
|
|
||||||
|
// Should be able to reset the InitialStopCh right away
|
||||||
|
if !stop.Reset() {
|
||||||
|
t.Errorf("Expected to be able to close the initialStopCh, but was unsuccessful")
|
||||||
|
}
|
||||||
|
|
||||||
|
// Need to wait notifierRefreshInterval before closing
|
||||||
|
if stop.Reset() {
|
||||||
|
t.Errorf("Expected not to be able to close the initialStopCh, but was successful")
|
||||||
|
}
|
||||||
|
|
||||||
|
wg.Add(1)
|
||||||
|
ch := stop.Ch()
|
||||||
|
go func() {
|
||||||
|
defer wg.Done()
|
||||||
|
// wait for the channel to close
|
||||||
|
<-ch
|
||||||
|
}()
|
||||||
|
|
||||||
|
fakeClock.Step(2 * notifierRefreshInterval)
|
||||||
|
if !stop.Reset() {
|
||||||
|
t.Errorf("Expected to be able to close the initialStopCh, but was unsuccessful")
|
||||||
|
}
|
||||||
|
// ensure the Reset() closed the channel
|
||||||
|
wg.Wait()
|
||||||
|
}
|
||||||
|
@ -67,7 +67,7 @@ func NewMemCGThresholdNotifier(path, attribute, threshold, description string, h
|
|||||||
unix.Close(eventfd)
|
unix.Close(eventfd)
|
||||||
}
|
}
|
||||||
}()
|
}()
|
||||||
glog.V(2).Infof("eviction: setting notification threshold to %s", threshold)
|
glog.V(3).Infof("eviction: setting notification threshold to %s", threshold)
|
||||||
config := fmt.Sprintf("%d %d %s", eventfd, watchfd, threshold)
|
config := fmt.Sprintf("%d %d %s", eventfd, watchfd, threshold)
|
||||||
_, err = unix.Write(controlfd, []byte(config))
|
_, err = unix.Write(controlfd, []byte(config))
|
||||||
if err != nil {
|
if err != nil {
|
||||||
@ -82,7 +82,7 @@ func NewMemCGThresholdNotifier(path, attribute, threshold, description string, h
|
|||||||
}, nil
|
}, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func getThresholdEvents(eventfd int, eventCh chan<- struct{}, stopCh <-chan struct{}) {
|
func getThresholdEvents(eventfd int, eventCh chan<- struct{}, stop ThresholdStopCh) {
|
||||||
for {
|
for {
|
||||||
buf := make([]byte, 8)
|
buf := make([]byte, 8)
|
||||||
_, err := unix.Read(eventfd, buf)
|
_, err := unix.Read(eventfd, buf)
|
||||||
@ -92,19 +92,19 @@ func getThresholdEvents(eventfd int, eventCh chan<- struct{}, stopCh <-chan stru
|
|||||||
|
|
||||||
select {
|
select {
|
||||||
case eventCh <- struct{}{}:
|
case eventCh <- struct{}{}:
|
||||||
case <-stopCh:
|
case <-stop.Ch():
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (n *memcgThresholdNotifier) Start(stopCh <-chan struct{}) {
|
func (n *memcgThresholdNotifier) Start(stop ThresholdStopCh) {
|
||||||
eventCh := make(chan struct{})
|
eventCh := make(chan struct{})
|
||||||
go getThresholdEvents(n.eventfd, eventCh, stopCh)
|
go getThresholdEvents(n.eventfd, eventCh, stop)
|
||||||
for {
|
for {
|
||||||
select {
|
select {
|
||||||
case <-stopCh:
|
case <-stop.Ch():
|
||||||
glog.V(2).Infof("eviction: stopping threshold notifier")
|
glog.V(3).Infof("eviction: stopping threshold notifier")
|
||||||
unix.Close(n.watchfd)
|
unix.Close(n.watchfd)
|
||||||
unix.Close(n.controlfd)
|
unix.Close(n.controlfd)
|
||||||
unix.Close(n.eventfd)
|
unix.Close(n.eventfd)
|
||||||
|
@ -132,7 +132,17 @@ type nodeReclaimFuncs []nodeReclaimFunc
|
|||||||
// thresholdNotifierHandlerFunc is a function that takes action in response to a crossed threshold
|
// thresholdNotifierHandlerFunc is a function that takes action in response to a crossed threshold
|
||||||
type thresholdNotifierHandlerFunc func(thresholdDescription string)
|
type thresholdNotifierHandlerFunc func(thresholdDescription string)
|
||||||
|
|
||||||
|
// ThresholdStopCh is an interface for a channel which is closed to stop waiting goroutines.
|
||||||
|
// Implementations of ThresholdStopCh must correctly handle concurrent calls to all functions.
|
||||||
|
type ThresholdStopCh interface {
|
||||||
|
// Reset closes the channel if it can be closed, and returns true if it was closed.
|
||||||
|
// Reset also creates a new channel.
|
||||||
|
Reset() bool
|
||||||
|
// Ch returns the channel that is closed when Reset() is called
|
||||||
|
Ch() <-chan struct{}
|
||||||
|
}
|
||||||
|
|
||||||
// ThresholdNotifier notifies the user when an attribute crosses a threshold value
|
// ThresholdNotifier notifies the user when an attribute crosses a threshold value
|
||||||
type ThresholdNotifier interface {
|
type ThresholdNotifier interface {
|
||||||
Start(stopCh <-chan struct{})
|
Start(ThresholdStopCh)
|
||||||
}
|
}
|
||||||
|
Loading…
Reference in New Issue
Block a user