mirror of
https://github.com/k3s-io/kubernetes.git
synced 2025-07-31 15:25:57 +00:00
Merge pull request #64643 from dashpole/memcg_poll
Automatic merge from submit-queue (batch tested with PRs 64503, 64903, 64643, 64987). If you want to cherry-pick this change to another branch, please follow the instructions <a href="https://github.com/kubernetes/community/blob/master/contributors/devel/cherry-picks.md">here</a>. Use unix.EpollWait to determine when memcg events are available to be Read **What this PR does / why we need it**: This fixes a file descriptor leak introduced in https://github.com/kubernetes/kubernetes/pull/60531 when the `--experimental-kernel-memcg-notification` kubelet flag is enabled. The root of the issue is that `unix.Read` blocks indefinitely when reading from an event file descriptor and there is nothing to read. Since we refresh the memcg notifications, these reads accumulate until the memcg threshold is crossed, at which time all reads complete. However, if the node never comes under memory pressure, the node can run out of file descriptors. This PR changes the eviction manager to use `unix.EpollWait` to wait, with a 10 second timeout, for events to be available on the eventfd. We only read from the eventfd when there is an event available to be read, preventing an accumulation of `unix.Read` threads, and allowing the event file descriptors to be reclaimed by the kernel. This PR also breaks the creation, and updating of the memcg threshold into separate portions, and performs creation before starting the periodic synchronize calls. It also moves the logic of configuring memory thresholds into memory_threshold_notifier into a separate file. This also reverts https://github.com/kubernetes/kubernetes/pull/64582, as the underlying leak that caused us to disable it for testing is fixed here. Fixes #62808 **Release note**: ```release-note NONE ``` /sig node /kind bug /priority critical-urgent
This commit is contained in:
commit
8e03228c1a
@ -191,6 +191,12 @@ TEST_CLUSTER_RESYNC_PERIOD="${TEST_CLUSTER_RESYNC_PERIOD:---min-resync-period=3m
|
||||
TEST_CLUSTER_API_CONTENT_TYPE="${TEST_CLUSTER_API_CONTENT_TYPE:-}"
|
||||
|
||||
KUBELET_TEST_ARGS="${KUBELET_TEST_ARGS:-} --serialize-image-pulls=false ${TEST_CLUSTER_API_CONTENT_TYPE}"
|
||||
if [[ "${NODE_OS_DISTRIBUTION}" == "gci" ]] || [[ "${NODE_OS_DISTRIBUTION}" == "ubuntu" ]] || [[ "${NODE_OS_DISTRIBUTION}" == "custom" ]]; then
|
||||
NODE_KUBELET_TEST_ARGS="${NODE_KUBELET_TEST_ARGS:-} --experimental-kernel-memcg-notification=true"
|
||||
fi
|
||||
if [[ "${MASTER_OS_DISTRIBUTION}" == "gci" ]] || [[ "${MASTER_OS_DISTRIBUTION}" == "ubuntu" ]]; then
|
||||
MASTER_KUBELET_TEST_ARGS="${MASTER_KUBELET_TEST_ARGS:-} --experimental-kernel-memcg-notification=true"
|
||||
fi
|
||||
APISERVER_TEST_ARGS="${APISERVER_TEST_ARGS:-} --vmodule=httplog=3 --runtime-config=extensions/v1beta1,scheduling.k8s.io/v1alpha1,settings.k8s.io/v1alpha1 ${TEST_CLUSTER_DELETE_COLLECTION_WORKERS} ${TEST_CLUSTER_MAX_REQUESTS_INFLIGHT}"
|
||||
CONTROLLER_MANAGER_TEST_ARGS="${CONTROLLER_MANAGER_TEST_ARGS:-} ${TEST_CLUSTER_RESYNC_PERIOD} ${TEST_CLUSTER_API_CONTENT_TYPE}"
|
||||
SCHEDULER_TEST_ARGS="${SCHEDULER_TEST_ARGS:-} ${TEST_CLUSTER_API_CONTENT_TYPE}"
|
||||
|
@ -11,6 +11,8 @@ go_test(
|
||||
srcs = [
|
||||
"eviction_manager_test.go",
|
||||
"helpers_test.go",
|
||||
"memory_threshold_notifier_test.go",
|
||||
"mock_threshold_notifier_test.go",
|
||||
],
|
||||
embed = [":go_default_library"],
|
||||
deps = [
|
||||
@ -20,6 +22,7 @@ go_test(
|
||||
"//pkg/kubelet/eviction/api:go_default_library",
|
||||
"//pkg/kubelet/lifecycle:go_default_library",
|
||||
"//pkg/kubelet/types:go_default_library",
|
||||
"//vendor/github.com/stretchr/testify/mock:go_default_library",
|
||||
"//vendor/k8s.io/api/core/v1:go_default_library",
|
||||
"//vendor/k8s.io/apimachinery/pkg/api/resource:go_default_library",
|
||||
"//vendor/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library",
|
||||
@ -36,6 +39,7 @@ go_library(
|
||||
"doc.go",
|
||||
"eviction_manager.go",
|
||||
"helpers.go",
|
||||
"memory_threshold_notifier.go",
|
||||
"types.go",
|
||||
] + select({
|
||||
"@io_bazel_rules_go//go/platform:android": [
|
||||
|
@ -19,7 +19,6 @@ package eviction
|
||||
import (
|
||||
"fmt"
|
||||
"sort"
|
||||
"strconv"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
@ -35,7 +34,6 @@ import (
|
||||
v1qos "k8s.io/kubernetes/pkg/apis/core/v1/helper/qos"
|
||||
"k8s.io/kubernetes/pkg/features"
|
||||
statsapi "k8s.io/kubernetes/pkg/kubelet/apis/stats/v1alpha1"
|
||||
"k8s.io/kubernetes/pkg/kubelet/cm"
|
||||
evictionapi "k8s.io/kubernetes/pkg/kubelet/eviction/api"
|
||||
"k8s.io/kubernetes/pkg/kubelet/lifecycle"
|
||||
"k8s.io/kubernetes/pkg/kubelet/metrics"
|
||||
@ -85,10 +83,12 @@ type managerImpl struct {
|
||||
signalToNodeReclaimFuncs map[evictionapi.Signal]nodeReclaimFuncs
|
||||
// last observations from synchronize
|
||||
lastObservations signalObservations
|
||||
// notifierStopCh is a channel used to stop all thresholdNotifiers
|
||||
notifierStopCh ThresholdStopCh
|
||||
// dedicatedImageFs indicates if imagefs is on a separate device from the rootfs
|
||||
dedicatedImageFs *bool
|
||||
// thresholdNotifiers is a list of memory threshold notifiers which each notify for a memory eviction threshold
|
||||
thresholdNotifiers []ThresholdNotifier
|
||||
// thresholdsLastUpdated is the last time the thresholdNotifiers were updated.
|
||||
thresholdsLastUpdated time.Time
|
||||
}
|
||||
|
||||
// ensure it implements the required interface
|
||||
@ -116,8 +116,8 @@ func NewManager(
|
||||
nodeRef: nodeRef,
|
||||
nodeConditionsLastObservedAt: nodeConditionsObservedAt{},
|
||||
thresholdsFirstObservedAt: thresholdsObservedAt{},
|
||||
notifierStopCh: NewInitialStopCh(clock),
|
||||
dedicatedImageFs: nil,
|
||||
thresholdNotifiers: []ThresholdNotifier{},
|
||||
}
|
||||
return manager, manager
|
||||
}
|
||||
@ -157,12 +157,29 @@ func (m *managerImpl) Admit(attrs *lifecycle.PodAdmitAttributes) lifecycle.PodAd
|
||||
return lifecycle.PodAdmitResult{
|
||||
Admit: false,
|
||||
Reason: Reason,
|
||||
Message: fmt.Sprintf(message, m.nodeConditions),
|
||||
Message: fmt.Sprintf(nodeLowMessageFmt, m.nodeConditions),
|
||||
}
|
||||
}
|
||||
|
||||
// Start starts the control loop to observe and response to low compute resources.
|
||||
func (m *managerImpl) Start(diskInfoProvider DiskInfoProvider, podFunc ActivePodsFunc, podCleanedUpFunc PodCleanedUpFunc, monitoringInterval time.Duration) {
|
||||
thresholdHandler := func(message string) {
|
||||
glog.Infof(message)
|
||||
m.synchronize(diskInfoProvider, podFunc)
|
||||
}
|
||||
if m.config.KernelMemcgNotification {
|
||||
for _, threshold := range m.config.Thresholds {
|
||||
if threshold.Signal == evictionapi.SignalMemoryAvailable || threshold.Signal == evictionapi.SignalAllocatableMemoryAvailable {
|
||||
notifier, err := NewMemoryThresholdNotifier(threshold, m.config.PodCgroupRoot, &CgroupNotifierFactory{}, thresholdHandler)
|
||||
if err != nil {
|
||||
glog.Warningf("eviction manager: failed to create memory threshold notifier: %v", err)
|
||||
} else {
|
||||
go notifier.Start()
|
||||
m.thresholdNotifiers = append(m.thresholdNotifiers, notifier)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
// start the eviction manager monitoring
|
||||
go func() {
|
||||
for {
|
||||
@ -197,51 +214,6 @@ func (m *managerImpl) IsUnderPIDPressure() bool {
|
||||
return hasNodeCondition(m.nodeConditions, v1.NodePIDPressure)
|
||||
}
|
||||
|
||||
func (m *managerImpl) startMemoryThresholdNotifier(summary *statsapi.Summary, hard, allocatable bool, handler thresholdNotifierHandlerFunc) error {
|
||||
for _, threshold := range m.config.Thresholds {
|
||||
if threshold.Signal != evictionapi.SignalMemoryAvailable || hard != isHardEvictionThreshold(threshold) {
|
||||
continue
|
||||
}
|
||||
cgroups, err := cm.GetCgroupSubsystems()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
cgpath, found := cgroups.MountPoints["memory"]
|
||||
if !found || len(cgpath) == 0 {
|
||||
return fmt.Errorf("memory cgroup mount point not found")
|
||||
}
|
||||
attribute := "memory.usage_in_bytes"
|
||||
memoryStats := summary.Node.Memory
|
||||
if allocatable {
|
||||
cgpath += m.config.PodCgroupRoot
|
||||
allocatableContainer, err := getSysContainer(summary.Node.SystemContainers, statsapi.SystemContainerPods)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
memoryStats = allocatableContainer.Memory
|
||||
}
|
||||
if memoryStats == nil || memoryStats.UsageBytes == nil || memoryStats.WorkingSetBytes == nil || memoryStats.AvailableBytes == nil {
|
||||
return fmt.Errorf("summary was incomplete")
|
||||
}
|
||||
// Set threshold on usage to capacity - eviction_hard + inactive_file,
|
||||
// since we want to be notified when working_set = capacity - eviction_hard
|
||||
inactiveFile := resource.NewQuantity(int64(*memoryStats.UsageBytes-*memoryStats.WorkingSetBytes), resource.BinarySI)
|
||||
capacity := resource.NewQuantity(int64(*memoryStats.AvailableBytes+*memoryStats.WorkingSetBytes), resource.BinarySI)
|
||||
evictionThresholdQuantity := evictionapi.GetThresholdQuantity(threshold.Value, capacity)
|
||||
memcgThreshold := capacity.DeepCopy()
|
||||
memcgThreshold.Sub(*evictionThresholdQuantity)
|
||||
memcgThreshold.Add(*inactiveFile)
|
||||
description := fmt.Sprintf("<%s available", formatThresholdValue(threshold.Value))
|
||||
memcgThresholdNotifier, err := NewMemCGThresholdNotifier(cgpath, attribute, strconv.FormatInt(memcgThreshold.Value(), 10), description, handler)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
go memcgThresholdNotifier.Start(m.notifierStopCh)
|
||||
return nil
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// synchronize is the main control loop that enforces eviction thresholds.
|
||||
// Returns the pod that was killed, or nil if no pod was killed.
|
||||
func (m *managerImpl) synchronize(diskInfoProvider DiskInfoProvider, podFunc ActivePodsFunc) []*v1.Pod {
|
||||
@ -272,41 +244,12 @@ func (m *managerImpl) synchronize(diskInfoProvider DiskInfoProvider, podFunc Act
|
||||
return nil
|
||||
}
|
||||
|
||||
// attempt to create a threshold notifier to improve eviction response time
|
||||
if m.config.KernelMemcgNotification && m.notifierStopCh.Reset() {
|
||||
glog.V(4).Infof("eviction manager attempting to integrate with kernel memcg notification api")
|
||||
// start soft memory notification
|
||||
err = m.startMemoryThresholdNotifier(summary, false, false, func(desc string) {
|
||||
glog.Infof("soft memory eviction threshold crossed at %s", desc)
|
||||
// TODO wait grace period for soft memory limit
|
||||
m.synchronize(diskInfoProvider, podFunc)
|
||||
})
|
||||
if err != nil {
|
||||
glog.Warningf("eviction manager: failed to create soft memory threshold notifier: %v", err)
|
||||
} // start soft memory notification
|
||||
err = m.startMemoryThresholdNotifier(summary, false, true, func(desc string) {
|
||||
glog.Infof("soft allocatable memory eviction threshold crossed at %s", desc)
|
||||
// TODO wait grace period for soft memory limit
|
||||
m.synchronize(diskInfoProvider, podFunc)
|
||||
})
|
||||
if err != nil {
|
||||
glog.Warningf("eviction manager: failed to create allocatable soft memory threshold notifier: %v", err)
|
||||
}
|
||||
// start hard memory notification
|
||||
err = m.startMemoryThresholdNotifier(summary, true, false, func(desc string) {
|
||||
glog.Infof("hard memory eviction threshold crossed at %s", desc)
|
||||
m.synchronize(diskInfoProvider, podFunc)
|
||||
})
|
||||
if err != nil {
|
||||
glog.Warningf("eviction manager: failed to create hard memory threshold notifier: %v", err)
|
||||
}
|
||||
// start hard memory notification
|
||||
err = m.startMemoryThresholdNotifier(summary, true, true, func(desc string) {
|
||||
glog.Infof("hard allocatable memory eviction threshold crossed at %s", desc)
|
||||
m.synchronize(diskInfoProvider, podFunc)
|
||||
})
|
||||
if err != nil {
|
||||
glog.Warningf("eviction manager: failed to create hard allocatable memory threshold notifier: %v", err)
|
||||
if m.clock.Since(m.thresholdsLastUpdated) > notifierRefreshInterval {
|
||||
m.thresholdsLastUpdated = m.clock.Now()
|
||||
for _, notifier := range m.thresholdNotifiers {
|
||||
if err := notifier.UpdateThreshold(summary); err != nil {
|
||||
glog.Warningf("eviction manager: failed to update %s: %v", notifier.Description(), err)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@ -535,7 +478,7 @@ func (m *managerImpl) emptyDirLimitEviction(podStats statsapi.PodStats, pod *v1.
|
||||
used := podVolumeUsed[pod.Spec.Volumes[i].Name]
|
||||
if used != nil && size != nil && size.Sign() == 1 && used.Cmp(*size) > 0 {
|
||||
// the emptyDir usage exceeds the size limit, evict the pod
|
||||
return m.evictPod(pod, 0, fmt.Sprintf(emptyDirMessage, pod.Spec.Volumes[i].Name, size.String()), nil)
|
||||
return m.evictPod(pod, 0, fmt.Sprintf(emptyDirMessageFmt, pod.Spec.Volumes[i].Name, size.String()), nil)
|
||||
}
|
||||
}
|
||||
}
|
||||
@ -567,7 +510,7 @@ func (m *managerImpl) podEphemeralStorageLimitEviction(podStats statsapi.PodStat
|
||||
podEphemeralStorageLimit := podLimits[v1.ResourceEphemeralStorage]
|
||||
if podEphemeralStorageTotalUsage.Cmp(podEphemeralStorageLimit) > 0 {
|
||||
// the total usage of pod exceeds the total size limit of containers, evict the pod
|
||||
return m.evictPod(pod, 0, fmt.Sprintf(podEphemeralStorageMessage, podEphemeralStorageLimit.String()), nil)
|
||||
return m.evictPod(pod, 0, fmt.Sprintf(podEphemeralStorageMessageFmt, podEphemeralStorageLimit.String()), nil)
|
||||
}
|
||||
return false
|
||||
}
|
||||
@ -589,7 +532,7 @@ func (m *managerImpl) containerEphemeralStorageLimitEviction(podStats statsapi.P
|
||||
|
||||
if ephemeralStorageThreshold, ok := thresholdsMap[containerStat.Name]; ok {
|
||||
if ephemeralStorageThreshold.Cmp(*containerUsed) < 0 {
|
||||
return m.evictPod(pod, 0, fmt.Sprintf(containerEphemeralStorageMessage, containerStat.Name, ephemeralStorageThreshold.String()), nil)
|
||||
return m.evictPod(pod, 0, fmt.Sprintf(containerEphemeralStorageMessageFmt, containerStat.Name, ephemeralStorageThreshold.String()), nil)
|
||||
|
||||
}
|
||||
}
|
||||
|
@ -17,6 +17,7 @@ limitations under the License.
|
||||
package eviction
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
@ -1434,3 +1435,74 @@ func TestAllocatableMemoryPressure(t *testing.T) {
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func TestUpdateMemcgThreshold(t *testing.T) {
|
||||
activePodsFunc := func() []*v1.Pod {
|
||||
return []*v1.Pod{}
|
||||
}
|
||||
|
||||
fakeClock := clock.NewFakeClock(time.Now())
|
||||
podKiller := &mockPodKiller{}
|
||||
diskInfoProvider := &mockDiskInfoProvider{dedicatedImageFs: false}
|
||||
diskGC := &mockDiskGC{err: nil}
|
||||
nodeRef := &v1.ObjectReference{Kind: "Node", Name: "test", UID: types.UID("test"), Namespace: ""}
|
||||
|
||||
config := Config{
|
||||
MaxPodGracePeriodSeconds: 5,
|
||||
PressureTransitionPeriod: time.Minute * 5,
|
||||
Thresholds: []evictionapi.Threshold{
|
||||
{
|
||||
Signal: evictionapi.SignalMemoryAvailable,
|
||||
Operator: evictionapi.OpLessThan,
|
||||
Value: evictionapi.ThresholdValue{
|
||||
Quantity: quantityMustParse("1Gi"),
|
||||
},
|
||||
},
|
||||
},
|
||||
PodCgroupRoot: "kubepods",
|
||||
}
|
||||
summaryProvider := &fakeSummaryProvider{result: makeMemoryStats("2Gi", map[*v1.Pod]statsapi.PodStats{})}
|
||||
|
||||
thresholdNotifier := &MockThresholdNotifier{}
|
||||
thresholdNotifier.On("UpdateThreshold", summaryProvider.result).Return(nil).Twice()
|
||||
|
||||
manager := &managerImpl{
|
||||
clock: fakeClock,
|
||||
killPodFunc: podKiller.killPodNow,
|
||||
imageGC: diskGC,
|
||||
containerGC: diskGC,
|
||||
config: config,
|
||||
recorder: &record.FakeRecorder{},
|
||||
summaryProvider: summaryProvider,
|
||||
nodeRef: nodeRef,
|
||||
nodeConditionsLastObservedAt: nodeConditionsObservedAt{},
|
||||
thresholdsFirstObservedAt: thresholdsObservedAt{},
|
||||
thresholdNotifiers: []ThresholdNotifier{thresholdNotifier},
|
||||
}
|
||||
|
||||
manager.synchronize(diskInfoProvider, activePodsFunc)
|
||||
// The UpdateThreshold method should have been called once, since this is the first run.
|
||||
thresholdNotifier.AssertNumberOfCalls(t, "UpdateThreshold", 1)
|
||||
|
||||
manager.synchronize(diskInfoProvider, activePodsFunc)
|
||||
// The UpdateThreshold method should not have been called again, since not enough time has passed
|
||||
thresholdNotifier.AssertNumberOfCalls(t, "UpdateThreshold", 1)
|
||||
|
||||
fakeClock.Step(2 * notifierRefreshInterval)
|
||||
manager.synchronize(diskInfoProvider, activePodsFunc)
|
||||
// The UpdateThreshold method should be called again since enough time has passed
|
||||
thresholdNotifier.AssertNumberOfCalls(t, "UpdateThreshold", 2)
|
||||
|
||||
// new memory threshold notifier that returns an error
|
||||
thresholdNotifier = &MockThresholdNotifier{}
|
||||
thresholdNotifier.On("UpdateThreshold", summaryProvider.result).Return(fmt.Errorf("error updating threshold"))
|
||||
thresholdNotifier.On("Description").Return("mock thresholdNotifier").Once()
|
||||
manager.thresholdNotifiers = []ThresholdNotifier{thresholdNotifier}
|
||||
|
||||
fakeClock.Step(2 * notifierRefreshInterval)
|
||||
manager.synchronize(diskInfoProvider, activePodsFunc)
|
||||
// The UpdateThreshold method should be called because at least notifierRefreshInterval time has passed.
|
||||
// The Description method should be called because UpdateThreshold returned an error
|
||||
thresholdNotifier.AssertNumberOfCalls(t, "UpdateThreshold", 1)
|
||||
thresholdNotifier.AssertNumberOfCalls(t, "Description", 1)
|
||||
}
|
||||
|
@ -21,13 +21,11 @@ import (
|
||||
"sort"
|
||||
"strconv"
|
||||
"strings"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/golang/glog"
|
||||
"k8s.io/api/core/v1"
|
||||
"k8s.io/apimachinery/pkg/api/resource"
|
||||
"k8s.io/apimachinery/pkg/util/clock"
|
||||
utilfeature "k8s.io/apiserver/pkg/util/feature"
|
||||
"k8s.io/kubernetes/pkg/features"
|
||||
statsapi "k8s.io/kubernetes/pkg/kubelet/apis/stats/v1alpha1"
|
||||
@ -40,21 +38,18 @@ const (
|
||||
unsupportedEvictionSignal = "unsupported eviction signal %v"
|
||||
// Reason is the reason reported back in status.
|
||||
Reason = "Evicted"
|
||||
// the message associated with the reason.
|
||||
message = "The node was low on resource: %v. "
|
||||
// additional information for containers exceeding requests
|
||||
containerMessage = "Container %s was using %s, which exceeds its request of %s. "
|
||||
// additional information for containers which have exceeded their ES limit
|
||||
containerEphemeralStorageMessage = "Container %s exceeded its local ephemeral storage limit %q. "
|
||||
// additional information for pods which have exceeded their ES limit
|
||||
podEphemeralStorageMessage = "Pod ephemeral local storage usage exceeds the total limit of containers %s. "
|
||||
// additional information for empty-dir volumes which have exceeded their size limit
|
||||
emptyDirMessage = "Usage of EmptyDir volume %q exceeds the limit %q. "
|
||||
// nodeLowMessageFmt is the message for evictions due to resource pressure.
|
||||
nodeLowMessageFmt = "The node was low on resource: %v. "
|
||||
// containerMessageFmt provides additional information for containers exceeding requests
|
||||
containerMessageFmt = "Container %s was using %s, which exceeds its request of %s. "
|
||||
// containerEphemeralStorageMessageFmt provides additional information for containers which have exceeded their ES limit
|
||||
containerEphemeralStorageMessageFmt = "Container %s exceeded its local ephemeral storage limit %q. "
|
||||
// podEphemeralStorageMessageFmt provides additional information for pods which have exceeded their ES limit
|
||||
podEphemeralStorageMessageFmt = "Pod ephemeral local storage usage exceeds the total limit of containers %s. "
|
||||
// emptyDirMessageFmt provides additional information for empty-dir volumes which have exceeded their size limit
|
||||
emptyDirMessageFmt = "Usage of EmptyDir volume %q exceeds the limit %q. "
|
||||
// inodes, number. internal to this module, used to account for local disk inode consumption.
|
||||
resourceInodes v1.ResourceName = "inodes"
|
||||
// this prevents constantly updating the memcg notifier if synchronize
|
||||
// is run frequently.
|
||||
notifierRefreshInterval = 10 * time.Second
|
||||
// OffendingContainersKey is the key in eviction event annotations for the list of container names which exceeded their requests
|
||||
OffendingContainersKey = "offending_containers"
|
||||
// OffendingContainersUsageKey is the key in eviction event annotations for the list of usage of containers which exceeded their requests
|
||||
@ -1007,6 +1002,10 @@ func isHardEvictionThreshold(threshold evictionapi.Threshold) bool {
|
||||
return threshold.GracePeriod == time.Duration(0)
|
||||
}
|
||||
|
||||
func isAllocatableEvictionThreshold(threshold evictionapi.Threshold) bool {
|
||||
return threshold.Signal == evictionapi.SignalAllocatableMemoryAvailable
|
||||
}
|
||||
|
||||
// buildSignalToRankFunc returns ranking functions associated with resources
|
||||
func buildSignalToRankFunc(withImageFs bool) map[evictionapi.Signal]rankFunc {
|
||||
signalToRankFunc := map[evictionapi.Signal]rankFunc{
|
||||
@ -1062,7 +1061,7 @@ func buildSignalToNodeReclaimFuncs(imageGC ImageGC, containerGC ContainerGC, wit
|
||||
// evictionMessage constructs a useful message about why an eviction occurred, and annotations to provide metadata about the eviction
|
||||
func evictionMessage(resourceToReclaim v1.ResourceName, pod *v1.Pod, stats statsFunc) (message string, annotations map[string]string) {
|
||||
annotations = make(map[string]string)
|
||||
message = fmt.Sprintf(message, resourceToReclaim)
|
||||
message = fmt.Sprintf(nodeLowMessageFmt, resourceToReclaim)
|
||||
containers := []string{}
|
||||
containerUsage := []string{}
|
||||
podStats, ok := stats(pod)
|
||||
@ -1085,7 +1084,7 @@ func evictionMessage(resourceToReclaim v1.ResourceName, pod *v1.Pod, stats stats
|
||||
}
|
||||
}
|
||||
if usage != nil && usage.Cmp(requests) > 0 {
|
||||
message += fmt.Sprintf(containerMessage, container.Name, usage.String(), requests.String())
|
||||
message += fmt.Sprintf(containerMessageFmt, container.Name, usage.String(), requests.String())
|
||||
containers = append(containers, container.Name)
|
||||
containerUsage = append(containerUsage, usage.String())
|
||||
}
|
||||
@ -1097,38 +1096,3 @@ func evictionMessage(resourceToReclaim v1.ResourceName, pod *v1.Pod, stats stats
|
||||
annotations[StarvedResourceKey] = string(resourceToReclaim)
|
||||
return
|
||||
}
|
||||
|
||||
// thresholdStopCh is a ThresholdStopCh which can only be closed after notifierRefreshInterval time has passed
|
||||
type thresholdStopCh struct {
|
||||
lock sync.Mutex
|
||||
ch chan struct{}
|
||||
startTime time.Time
|
||||
// used to track time
|
||||
clock clock.Clock
|
||||
}
|
||||
|
||||
// NewInitialStopCh returns a ThresholdStopCh which can be closed immediately
|
||||
func NewInitialStopCh(clock clock.Clock) ThresholdStopCh {
|
||||
return &thresholdStopCh{ch: make(chan struct{}), clock: clock}
|
||||
}
|
||||
|
||||
// implements ThresholdStopCh.Reset
|
||||
func (t *thresholdStopCh) Reset() (closed bool) {
|
||||
t.lock.Lock()
|
||||
defer t.lock.Unlock()
|
||||
closed = t.clock.Since(t.startTime) > notifierRefreshInterval
|
||||
if closed {
|
||||
// close the old channel and reopen a new one
|
||||
close(t.ch)
|
||||
t.startTime = t.clock.Now()
|
||||
t.ch = make(chan struct{})
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
// implements ThresholdStopCh.Ch
|
||||
func (t *thresholdStopCh) Ch() <-chan struct{} {
|
||||
t.lock.Lock()
|
||||
defer t.lock.Unlock()
|
||||
return t.ch
|
||||
}
|
||||
|
@ -20,7 +20,6 @@ import (
|
||||
"fmt"
|
||||
"reflect"
|
||||
"sort"
|
||||
"sync"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
@ -28,7 +27,6 @@ import (
|
||||
"k8s.io/apimachinery/pkg/api/resource"
|
||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||
"k8s.io/apimachinery/pkg/types"
|
||||
"k8s.io/apimachinery/pkg/util/clock"
|
||||
utilfeature "k8s.io/apiserver/pkg/util/feature"
|
||||
"k8s.io/kubernetes/pkg/features"
|
||||
statsapi "k8s.io/kubernetes/pkg/kubelet/apis/stats/v1alpha1"
|
||||
@ -1948,34 +1946,3 @@ func (s1 thresholdList) Equal(s2 thresholdList) bool {
|
||||
}
|
||||
return true
|
||||
}
|
||||
|
||||
func TestThresholdStopCh(t *testing.T) {
|
||||
var wg sync.WaitGroup
|
||||
fakeClock := clock.NewFakeClock(time.Now())
|
||||
stop := NewInitialStopCh(fakeClock)
|
||||
|
||||
// Should be able to reset the InitialStopCh right away
|
||||
if !stop.Reset() {
|
||||
t.Errorf("Expected to be able to close the initialStopCh, but was unsuccessful")
|
||||
}
|
||||
|
||||
// Need to wait notifierRefreshInterval before closing
|
||||
if stop.Reset() {
|
||||
t.Errorf("Expected not to be able to close the initialStopCh, but was successful")
|
||||
}
|
||||
|
||||
wg.Add(1)
|
||||
ch := stop.Ch()
|
||||
go func() {
|
||||
defer wg.Done()
|
||||
// wait for the channel to close
|
||||
<-ch
|
||||
}()
|
||||
|
||||
fakeClock.Step(2 * notifierRefreshInterval)
|
||||
if !stop.Reset() {
|
||||
t.Errorf("Expected to be able to close the initialStopCh, but was unsuccessful")
|
||||
}
|
||||
// ensure the Reset() closed the channel
|
||||
wg.Wait()
|
||||
}
|
||||
|
135
pkg/kubelet/eviction/memory_threshold_notifier.go
Normal file
135
pkg/kubelet/eviction/memory_threshold_notifier.go
Normal file
@ -0,0 +1,135 @@
|
||||
/*
|
||||
Copyright 2018 The Kubernetes Authors.
|
||||
|
||||
Licensed under the Apache License, Version 2.0 (the "License");
|
||||
you may not use this file except in compliance with the License.
|
||||
You may obtain a copy of the License at
|
||||
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
|
||||
Unless required by applicable law or agreed to in writing, software
|
||||
distributed under the License is distributed on an "AS IS" BASIS,
|
||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
*/
|
||||
|
||||
package eviction
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"time"
|
||||
|
||||
"github.com/golang/glog"
|
||||
|
||||
"k8s.io/apimachinery/pkg/api/resource"
|
||||
statsapi "k8s.io/kubernetes/pkg/kubelet/apis/stats/v1alpha1"
|
||||
"k8s.io/kubernetes/pkg/kubelet/cm"
|
||||
evictionapi "k8s.io/kubernetes/pkg/kubelet/eviction/api"
|
||||
)
|
||||
|
||||
const (
|
||||
memoryUsageAttribute = "memory.usage_in_bytes"
|
||||
// this prevents constantly updating the memcg notifier if synchronize
|
||||
// is run frequently.
|
||||
notifierRefreshInterval = 10 * time.Second
|
||||
)
|
||||
|
||||
type memoryThresholdNotifier struct {
|
||||
threshold evictionapi.Threshold
|
||||
cgroupPath string
|
||||
events chan struct{}
|
||||
factory NotifierFactory
|
||||
handler func(string)
|
||||
notifier CgroupNotifier
|
||||
}
|
||||
|
||||
var _ ThresholdNotifier = &memoryThresholdNotifier{}
|
||||
|
||||
// NewMemoryThresholdNotifier creates a ThresholdNotifier which is designed to respond to the given threshold.
|
||||
// UpdateThreshold must be called once before the threshold will be active.
|
||||
func NewMemoryThresholdNotifier(threshold evictionapi.Threshold, cgroupRoot string, factory NotifierFactory, handler func(string)) (ThresholdNotifier, error) {
|
||||
cgroups, err := cm.GetCgroupSubsystems()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
cgpath, found := cgroups.MountPoints["memory"]
|
||||
if !found || len(cgpath) == 0 {
|
||||
return nil, fmt.Errorf("memory cgroup mount point not found")
|
||||
}
|
||||
if isAllocatableEvictionThreshold(threshold) {
|
||||
// for allocatable thresholds, point the cgroup notifier at the allocatable cgroup
|
||||
cgpath += cgroupRoot
|
||||
}
|
||||
return &memoryThresholdNotifier{
|
||||
threshold: threshold,
|
||||
cgroupPath: cgpath,
|
||||
events: make(chan struct{}),
|
||||
handler: handler,
|
||||
factory: factory,
|
||||
}, nil
|
||||
}
|
||||
|
||||
func (m *memoryThresholdNotifier) Start() {
|
||||
glog.Infof("eviction manager: created %s", m.Description())
|
||||
for range m.events {
|
||||
m.handler(fmt.Sprintf("eviction manager: %s crossed", m.Description()))
|
||||
}
|
||||
}
|
||||
|
||||
func (m *memoryThresholdNotifier) UpdateThreshold(summary *statsapi.Summary) error {
|
||||
memoryStats := summary.Node.Memory
|
||||
if isAllocatableEvictionThreshold(m.threshold) {
|
||||
allocatableContainer, err := getSysContainer(summary.Node.SystemContainers, statsapi.SystemContainerPods)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
memoryStats = allocatableContainer.Memory
|
||||
}
|
||||
if memoryStats == nil || memoryStats.UsageBytes == nil || memoryStats.WorkingSetBytes == nil || memoryStats.AvailableBytes == nil {
|
||||
return fmt.Errorf("summary was incomplete. Expected MemoryStats and all subfields to be non-nil, but got %+v", memoryStats)
|
||||
}
|
||||
// Set threshold on usage to capacity - eviction_hard + inactive_file,
|
||||
// since we want to be notified when working_set = capacity - eviction_hard
|
||||
inactiveFile := resource.NewQuantity(int64(*memoryStats.UsageBytes-*memoryStats.WorkingSetBytes), resource.BinarySI)
|
||||
capacity := resource.NewQuantity(int64(*memoryStats.AvailableBytes+*memoryStats.WorkingSetBytes), resource.BinarySI)
|
||||
evictionThresholdQuantity := evictionapi.GetThresholdQuantity(m.threshold.Value, capacity)
|
||||
memcgThreshold := capacity.DeepCopy()
|
||||
memcgThreshold.Sub(*evictionThresholdQuantity)
|
||||
memcgThreshold.Add(*inactiveFile)
|
||||
|
||||
glog.V(3).Infof("eviction manager: setting %s to %s\n", m.Description(), memcgThreshold.String())
|
||||
if m.notifier != nil {
|
||||
m.notifier.Stop()
|
||||
}
|
||||
newNotifier, err := m.factory.NewCgroupNotifier(m.cgroupPath, memoryUsageAttribute, memcgThreshold.Value())
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
m.notifier = newNotifier
|
||||
go m.notifier.Start(m.events)
|
||||
return nil
|
||||
}
|
||||
|
||||
func (m *memoryThresholdNotifier) Description() string {
|
||||
var hard, allocatable string
|
||||
if isHardEvictionThreshold(m.threshold) {
|
||||
hard = "hard "
|
||||
} else {
|
||||
hard = "soft "
|
||||
}
|
||||
if isAllocatableEvictionThreshold(m.threshold) {
|
||||
allocatable = "allocatable "
|
||||
}
|
||||
return fmt.Sprintf("%s%smemory eviction threshold", hard, allocatable)
|
||||
}
|
||||
|
||||
var _ NotifierFactory = &CgroupNotifierFactory{}
|
||||
|
||||
// CgroupNotifierFactory knows how to make CgroupNotifiers which integrate with the kernel
|
||||
type CgroupNotifierFactory struct{}
|
||||
|
||||
// NewCgroupNotifier implements the NotifierFactory interface
|
||||
func (n *CgroupNotifierFactory) NewCgroupNotifier(path, attribute string, threshold int64) (CgroupNotifier, error) {
|
||||
return NewCgroupNotifier(path, attribute, threshold)
|
||||
}
|
270
pkg/kubelet/eviction/memory_threshold_notifier_test.go
Normal file
270
pkg/kubelet/eviction/memory_threshold_notifier_test.go
Normal file
@ -0,0 +1,270 @@
|
||||
/*
|
||||
Copyright 2018 The Kubernetes Authors.
|
||||
|
||||
Licensed under the Apache License, Version 2.0 (the "License");
|
||||
you may not use this file except in compliance with the License.
|
||||
You may obtain a copy of the License at
|
||||
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
|
||||
Unless required by applicable law or agreed to in writing, software
|
||||
distributed under the License is distributed on an "AS IS" BASIS,
|
||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
*/
|
||||
|
||||
package eviction
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"strings"
|
||||
"sync"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"k8s.io/apimachinery/pkg/api/resource"
|
||||
statsapi "k8s.io/kubernetes/pkg/kubelet/apis/stats/v1alpha1"
|
||||
evictionapi "k8s.io/kubernetes/pkg/kubelet/eviction/api"
|
||||
)
|
||||
|
||||
const testCgroupPath = "/sys/fs/cgroups/memory"
|
||||
|
||||
func nodeSummary(available, workingSet, usage resource.Quantity, allocatable bool) *statsapi.Summary {
|
||||
availableBytes := uint64(available.Value())
|
||||
workingSetBytes := uint64(workingSet.Value())
|
||||
usageBytes := uint64(usage.Value())
|
||||
memoryStats := statsapi.MemoryStats{
|
||||
AvailableBytes: &availableBytes,
|
||||
WorkingSetBytes: &workingSetBytes,
|
||||
UsageBytes: &usageBytes,
|
||||
}
|
||||
if allocatable {
|
||||
return &statsapi.Summary{
|
||||
Node: statsapi.NodeStats{
|
||||
SystemContainers: []statsapi.ContainerStats{
|
||||
{
|
||||
Name: statsapi.SystemContainerPods,
|
||||
Memory: &memoryStats,
|
||||
},
|
||||
},
|
||||
},
|
||||
}
|
||||
}
|
||||
return &statsapi.Summary{
|
||||
Node: statsapi.NodeStats{
|
||||
Memory: &memoryStats,
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
func newTestMemoryThresholdNotifier(threshold evictionapi.Threshold, factory NotifierFactory, handler func(string)) *memoryThresholdNotifier {
|
||||
return &memoryThresholdNotifier{
|
||||
threshold: threshold,
|
||||
cgroupPath: testCgroupPath,
|
||||
events: make(chan struct{}),
|
||||
factory: factory,
|
||||
handler: handler,
|
||||
}
|
||||
}
|
||||
|
||||
func TestUpdateThreshold(t *testing.T) {
|
||||
testCases := []struct {
|
||||
description string
|
||||
available resource.Quantity
|
||||
workingSet resource.Quantity
|
||||
usage resource.Quantity
|
||||
evictionThreshold evictionapi.Threshold
|
||||
expectedThreshold resource.Quantity
|
||||
updateThresholdErr error
|
||||
expectErr bool
|
||||
}{
|
||||
{
|
||||
description: "node level threshold",
|
||||
available: resource.MustParse("3Gi"),
|
||||
usage: resource.MustParse("2Gi"),
|
||||
workingSet: resource.MustParse("1Gi"),
|
||||
evictionThreshold: evictionapi.Threshold{
|
||||
Signal: evictionapi.SignalMemoryAvailable,
|
||||
Operator: evictionapi.OpLessThan,
|
||||
Value: evictionapi.ThresholdValue{
|
||||
Quantity: quantityMustParse("1Gi"),
|
||||
},
|
||||
},
|
||||
expectedThreshold: resource.MustParse("4Gi"),
|
||||
updateThresholdErr: nil,
|
||||
expectErr: false,
|
||||
},
|
||||
{
|
||||
description: "allocatable threshold",
|
||||
available: resource.MustParse("4Gi"),
|
||||
usage: resource.MustParse("3Gi"),
|
||||
workingSet: resource.MustParse("1Gi"),
|
||||
evictionThreshold: evictionapi.Threshold{
|
||||
Signal: evictionapi.SignalAllocatableMemoryAvailable,
|
||||
Operator: evictionapi.OpLessThan,
|
||||
Value: evictionapi.ThresholdValue{
|
||||
Quantity: quantityMustParse("1Gi"),
|
||||
},
|
||||
},
|
||||
expectedThreshold: resource.MustParse("6Gi"),
|
||||
updateThresholdErr: nil,
|
||||
expectErr: false,
|
||||
},
|
||||
{
|
||||
description: "error updating node level threshold",
|
||||
available: resource.MustParse("3Gi"),
|
||||
usage: resource.MustParse("2Gi"),
|
||||
workingSet: resource.MustParse("1Gi"),
|
||||
evictionThreshold: evictionapi.Threshold{
|
||||
Signal: evictionapi.SignalMemoryAvailable,
|
||||
Operator: evictionapi.OpLessThan,
|
||||
Value: evictionapi.ThresholdValue{
|
||||
Quantity: quantityMustParse("1Gi"),
|
||||
},
|
||||
},
|
||||
expectedThreshold: resource.MustParse("4Gi"),
|
||||
updateThresholdErr: fmt.Errorf("unexpected error"),
|
||||
expectErr: true,
|
||||
},
|
||||
}
|
||||
|
||||
for _, tc := range testCases {
|
||||
t.Run(tc.description, func(t *testing.T) {
|
||||
notifierFactory := &MockNotifierFactory{}
|
||||
notifier := &MockCgroupNotifier{}
|
||||
m := newTestMemoryThresholdNotifier(tc.evictionThreshold, notifierFactory, nil)
|
||||
notifierFactory.On("NewCgroupNotifier", testCgroupPath, memoryUsageAttribute, tc.expectedThreshold.Value()).Return(notifier, tc.updateThresholdErr)
|
||||
var events chan<- struct{}
|
||||
events = m.events
|
||||
notifier.On("Start", events).Return()
|
||||
err := m.UpdateThreshold(nodeSummary(tc.available, tc.workingSet, tc.usage, isAllocatableEvictionThreshold(tc.evictionThreshold)))
|
||||
if err != nil && !tc.expectErr {
|
||||
t.Errorf("Unexpected error updating threshold: %v", err)
|
||||
} else if err == nil && tc.expectErr {
|
||||
t.Errorf("Expected error updating threshold, but got nil")
|
||||
}
|
||||
if !tc.expectErr {
|
||||
notifierFactory.AssertNumberOfCalls(t, "NewCgroupNotifier", 1)
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
func TestStart(t *testing.T) {
|
||||
noResources := resource.MustParse("0")
|
||||
threshold := evictionapi.Threshold{
|
||||
Signal: evictionapi.SignalMemoryAvailable,
|
||||
Operator: evictionapi.OpLessThan,
|
||||
Value: evictionapi.ThresholdValue{
|
||||
Quantity: &noResources,
|
||||
},
|
||||
}
|
||||
notifier := &MockCgroupNotifier{}
|
||||
notifierFactory := &MockNotifierFactory{}
|
||||
|
||||
var wg sync.WaitGroup
|
||||
wg.Add(4)
|
||||
m := newTestMemoryThresholdNotifier(threshold, notifierFactory, func(string) {
|
||||
wg.Done()
|
||||
})
|
||||
notifierFactory.On("NewCgroupNotifier", testCgroupPath, memoryUsageAttribute, int64(0)).Return(notifier, nil)
|
||||
var events chan<- struct{}
|
||||
events = m.events
|
||||
notifier.On("Start", events).Return()
|
||||
notifier.On("Stop").Return()
|
||||
|
||||
err := m.UpdateThreshold(nodeSummary(noResources, noResources, noResources, isAllocatableEvictionThreshold(threshold)))
|
||||
if err != nil {
|
||||
t.Errorf("Unexpected error updating threshold: %v", err)
|
||||
}
|
||||
notifierFactory.AssertNumberOfCalls(t, "NewCgroupNotifier", 1)
|
||||
|
||||
go m.Start()
|
||||
|
||||
for i := 0; i < 4; i++ {
|
||||
m.events <- struct{}{}
|
||||
}
|
||||
wg.Wait()
|
||||
}
|
||||
|
||||
func TestThresholdDescription(t *testing.T) {
|
||||
testCases := []struct {
|
||||
description string
|
||||
evictionThreshold evictionapi.Threshold
|
||||
expectedSubstrings []string
|
||||
omittedSubstrings []string
|
||||
}{
|
||||
{
|
||||
description: "hard node level threshold",
|
||||
evictionThreshold: evictionapi.Threshold{
|
||||
Signal: evictionapi.SignalMemoryAvailable,
|
||||
Operator: evictionapi.OpLessThan,
|
||||
Value: evictionapi.ThresholdValue{
|
||||
Quantity: quantityMustParse("2Gi"),
|
||||
},
|
||||
},
|
||||
expectedSubstrings: []string{"hard"},
|
||||
omittedSubstrings: []string{"allocatable", "soft"},
|
||||
},
|
||||
{
|
||||
description: "soft node level threshold",
|
||||
evictionThreshold: evictionapi.Threshold{
|
||||
Signal: evictionapi.SignalMemoryAvailable,
|
||||
Operator: evictionapi.OpLessThan,
|
||||
Value: evictionapi.ThresholdValue{
|
||||
Quantity: quantityMustParse("2Gi"),
|
||||
},
|
||||
GracePeriod: time.Minute * 2,
|
||||
},
|
||||
expectedSubstrings: []string{"soft"},
|
||||
omittedSubstrings: []string{"allocatable", "hard"},
|
||||
},
|
||||
{
|
||||
description: "hard allocatable threshold",
|
||||
evictionThreshold: evictionapi.Threshold{
|
||||
Signal: evictionapi.SignalAllocatableMemoryAvailable,
|
||||
Operator: evictionapi.OpLessThan,
|
||||
Value: evictionapi.ThresholdValue{
|
||||
Quantity: quantityMustParse("2Gi"),
|
||||
},
|
||||
GracePeriod: time.Minute * 2,
|
||||
},
|
||||
expectedSubstrings: []string{"soft", "allocatable"},
|
||||
omittedSubstrings: []string{"hard"},
|
||||
},
|
||||
{
|
||||
description: "soft allocatable threshold",
|
||||
evictionThreshold: evictionapi.Threshold{
|
||||
Signal: evictionapi.SignalAllocatableMemoryAvailable,
|
||||
Operator: evictionapi.OpLessThan,
|
||||
Value: evictionapi.ThresholdValue{
|
||||
Quantity: quantityMustParse("2Gi"),
|
||||
},
|
||||
},
|
||||
expectedSubstrings: []string{"hard", "allocatable"},
|
||||
omittedSubstrings: []string{"soft"},
|
||||
},
|
||||
}
|
||||
|
||||
for _, tc := range testCases {
|
||||
t.Run(tc.description, func(t *testing.T) {
|
||||
m := &memoryThresholdNotifier{
|
||||
notifier: &MockCgroupNotifier{},
|
||||
threshold: tc.evictionThreshold,
|
||||
cgroupPath: testCgroupPath,
|
||||
}
|
||||
desc := m.Description()
|
||||
for _, expected := range tc.expectedSubstrings {
|
||||
if !strings.Contains(desc, expected) {
|
||||
t.Errorf("expected description for notifier with threshold %+v to contain %s, but it did not", tc.evictionThreshold, expected)
|
||||
}
|
||||
}
|
||||
for _, omitted := range tc.omittedSubstrings {
|
||||
if strings.Contains(desc, omitted) {
|
||||
t.Errorf("expected description for notifier with threshold %+v NOT to contain %s, but it did", tc.evictionThreshold, omitted)
|
||||
}
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
98
pkg/kubelet/eviction/mock_threshold_notifier_test.go
Normal file
98
pkg/kubelet/eviction/mock_threshold_notifier_test.go
Normal file
@ -0,0 +1,98 @@
|
||||
/*
|
||||
Copyright 2018 The Kubernetes Authors.
|
||||
|
||||
Licensed under the Apache License, Version 2.0 (the "License");
|
||||
you may not use this file except in compliance with the License.
|
||||
You may obtain a copy of the License at
|
||||
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
|
||||
Unless required by applicable law or agreed to in writing, software
|
||||
distributed under the License is distributed on an "AS IS" BASIS,
|
||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
*/
|
||||
|
||||
package eviction
|
||||
|
||||
import (
|
||||
mock "github.com/stretchr/testify/mock"
|
||||
statsapi "k8s.io/kubernetes/pkg/kubelet/apis/stats/v1alpha1"
|
||||
)
|
||||
|
||||
// MockCgroupNotifier is a mock implementation of the CgroupNotifier interface
|
||||
type MockCgroupNotifier struct {
|
||||
mock.Mock
|
||||
}
|
||||
|
||||
// Start implements the NotifierFactory interface
|
||||
func (m *MockCgroupNotifier) Start(a0 chan<- struct{}) {
|
||||
m.Called(a0)
|
||||
}
|
||||
|
||||
// Stop implements the NotifierFactory interface
|
||||
func (m *MockCgroupNotifier) Stop() {
|
||||
m.Called()
|
||||
}
|
||||
|
||||
// MockNotifierFactory is a mock of the NotifierFactory interface
|
||||
type MockNotifierFactory struct {
|
||||
mock.Mock
|
||||
}
|
||||
|
||||
// NewCgroupNotifier implements the NotifierFactory interface
|
||||
func (m *MockNotifierFactory) NewCgroupNotifier(a0, a1 string, a2 int64) (CgroupNotifier, error) {
|
||||
ret := m.Called(a0, a1, a2)
|
||||
|
||||
var r0 CgroupNotifier
|
||||
if rf, ok := ret.Get(0).(func(string, string, int64) CgroupNotifier); ok {
|
||||
r0 = rf(a0, a1, a2)
|
||||
} else {
|
||||
if ret.Get(0) != nil {
|
||||
r0 = ret.Get(0).(CgroupNotifier)
|
||||
}
|
||||
}
|
||||
var r1 error
|
||||
if rf, ok := ret.Get(1).(func(string, string, int64) error); ok {
|
||||
r1 = rf(a0, a1, a2)
|
||||
} else {
|
||||
r1 = ret.Error(1)
|
||||
}
|
||||
return r0, r1
|
||||
}
|
||||
|
||||
// MockThresholdNotifier is a mock implementation of the ThresholdNotifier interface
|
||||
type MockThresholdNotifier struct {
|
||||
mock.Mock
|
||||
}
|
||||
|
||||
// Start implements the ThresholdNotifier interface
|
||||
func (m *MockThresholdNotifier) Start() {
|
||||
m.Called()
|
||||
}
|
||||
|
||||
// UpdateThreshold implements the ThresholdNotifier interface
|
||||
func (m *MockThresholdNotifier) UpdateThreshold(a0 *statsapi.Summary) error {
|
||||
ret := m.Called(a0)
|
||||
|
||||
var r0 error
|
||||
if rf, ok := ret.Get(0).(func(*statsapi.Summary) error); ok {
|
||||
r0 = rf(a0)
|
||||
} else {
|
||||
r0 = ret.Error(0)
|
||||
}
|
||||
return r0
|
||||
}
|
||||
|
||||
// Description implements the ThresholdNotifier interface
|
||||
func (m *MockThresholdNotifier) Description() string {
|
||||
ret := m.Called()
|
||||
var r0 string
|
||||
if rf, ok := ret.Get(0).(func() string); ok {
|
||||
r0 = rf()
|
||||
} else {
|
||||
r0 = ret.String(0)
|
||||
}
|
||||
return r0
|
||||
}
|
@ -18,43 +18,47 @@ package eviction
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/golang/glog"
|
||||
"golang.org/x/sys/unix"
|
||||
)
|
||||
|
||||
type memcgThresholdNotifier struct {
|
||||
watchfd int
|
||||
controlfd int
|
||||
eventfd int
|
||||
handler thresholdNotifierHandlerFunc
|
||||
description string
|
||||
const (
|
||||
// eventSize is the number of bytes returned by a successful read from an eventfd
|
||||
// see http://man7.org/linux/man-pages/man2/eventfd.2.html for more information
|
||||
eventSize = 8
|
||||
// numFdEvents is the number of events we can record at once.
|
||||
// If EpollWait finds more than this, they will be missed.
|
||||
numFdEvents = 6
|
||||
)
|
||||
|
||||
type linuxCgroupNotifier struct {
|
||||
eventfd int
|
||||
epfd int
|
||||
stop chan struct{}
|
||||
stopLock sync.Mutex
|
||||
}
|
||||
|
||||
var _ ThresholdNotifier = &memcgThresholdNotifier{}
|
||||
var _ CgroupNotifier = &linuxCgroupNotifier{}
|
||||
|
||||
// NewMemCGThresholdNotifier sends notifications when a cgroup threshold
|
||||
// is crossed (in either direction) for a given cgroup attribute
|
||||
func NewMemCGThresholdNotifier(path, attribute, threshold, description string, handler thresholdNotifierHandlerFunc) (ThresholdNotifier, error) {
|
||||
watchfd, err := unix.Open(fmt.Sprintf("%s/%s", path, attribute), unix.O_RDONLY, 0)
|
||||
// NewCgroupNotifier returns a linuxCgroupNotifier, which performs cgroup control operations required
|
||||
// to receive notifications from the cgroup when the threshold is crossed in either direction.
|
||||
func NewCgroupNotifier(path, attribute string, threshold int64) (CgroupNotifier, error) {
|
||||
var watchfd, eventfd, epfd, controlfd int
|
||||
var err error
|
||||
watchfd, err = unix.Open(fmt.Sprintf("%s/%s", path, attribute), unix.O_RDONLY, 0)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
defer func() {
|
||||
if err != nil {
|
||||
unix.Close(watchfd)
|
||||
}
|
||||
}()
|
||||
controlfd, err := unix.Open(fmt.Sprintf("%s/cgroup.event_control", path), unix.O_WRONLY, 0)
|
||||
defer unix.Close(watchfd)
|
||||
controlfd, err = unix.Open(fmt.Sprintf("%s/cgroup.event_control", path), unix.O_WRONLY, 0)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
defer func() {
|
||||
if err != nil {
|
||||
unix.Close(controlfd)
|
||||
}
|
||||
}()
|
||||
eventfd, err := unix.Eventfd(0, unix.EFD_CLOEXEC)
|
||||
defer unix.Close(controlfd)
|
||||
eventfd, err = unix.Eventfd(0, unix.EFD_CLOEXEC)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
@ -63,55 +67,119 @@ func NewMemCGThresholdNotifier(path, attribute, threshold, description string, h
|
||||
return nil, err
|
||||
}
|
||||
defer func() {
|
||||
// Close eventfd if we get an error later in initialization
|
||||
if err != nil {
|
||||
unix.Close(eventfd)
|
||||
}
|
||||
}()
|
||||
glog.V(3).Infof("eviction: setting notification threshold to %s", threshold)
|
||||
config := fmt.Sprintf("%d %d %s", eventfd, watchfd, threshold)
|
||||
epfd, err = unix.EpollCreate1(0)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if epfd < 0 {
|
||||
err = fmt.Errorf("EpollCreate1 call failed")
|
||||
return nil, err
|
||||
}
|
||||
defer func() {
|
||||
// Close epfd if we get an error later in initialization
|
||||
if err != nil {
|
||||
unix.Close(epfd)
|
||||
}
|
||||
}()
|
||||
config := fmt.Sprintf("%d %d %d", eventfd, watchfd, threshold)
|
||||
_, err = unix.Write(controlfd, []byte(config))
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return &memcgThresholdNotifier{
|
||||
watchfd: watchfd,
|
||||
controlfd: controlfd,
|
||||
eventfd: eventfd,
|
||||
handler: handler,
|
||||
description: description,
|
||||
return &linuxCgroupNotifier{
|
||||
eventfd: eventfd,
|
||||
epfd: epfd,
|
||||
stop: make(chan struct{}),
|
||||
}, nil
|
||||
}
|
||||
|
||||
func getThresholdEvents(eventfd int, eventCh chan<- struct{}, stop ThresholdStopCh) {
|
||||
func (n *linuxCgroupNotifier) Start(eventCh chan<- struct{}) {
|
||||
err := unix.EpollCtl(n.epfd, unix.EPOLL_CTL_ADD, n.eventfd, &unix.EpollEvent{
|
||||
Fd: int32(n.eventfd),
|
||||
Events: unix.EPOLLIN,
|
||||
})
|
||||
if err != nil {
|
||||
glog.Warningf("eviction manager: error adding epoll eventfd: %v", err)
|
||||
return
|
||||
}
|
||||
for {
|
||||
buf := make([]byte, 8)
|
||||
_, err := unix.Read(eventfd, buf)
|
||||
select {
|
||||
case <-n.stop:
|
||||
return
|
||||
default:
|
||||
}
|
||||
event, err := wait(n.epfd, n.eventfd, notifierRefreshInterval)
|
||||
if err != nil {
|
||||
glog.Warningf("eviction manager: error while waiting for memcg events: %v", err)
|
||||
return
|
||||
} else if !event {
|
||||
// Timeout on wait. This is expected if the threshold was not crossed
|
||||
continue
|
||||
}
|
||||
// Consume the event from the eventfd
|
||||
buf := make([]byte, eventSize)
|
||||
_, err = unix.Read(n.eventfd, buf)
|
||||
if err != nil {
|
||||
glog.Warningf("eviction manager: error reading memcg events: %v", err)
|
||||
return
|
||||
}
|
||||
|
||||
select {
|
||||
case eventCh <- struct{}{}:
|
||||
case <-stop.Ch():
|
||||
return
|
||||
}
|
||||
eventCh <- struct{}{}
|
||||
}
|
||||
}
|
||||
|
||||
func (n *memcgThresholdNotifier) Start(stop ThresholdStopCh) {
|
||||
eventCh := make(chan struct{})
|
||||
go getThresholdEvents(n.eventfd, eventCh, stop)
|
||||
for {
|
||||
select {
|
||||
case <-stop.Ch():
|
||||
glog.V(3).Infof("eviction: stopping threshold notifier")
|
||||
unix.Close(n.watchfd)
|
||||
unix.Close(n.controlfd)
|
||||
unix.Close(n.eventfd)
|
||||
return
|
||||
case <-eventCh:
|
||||
glog.V(2).Infof("eviction: threshold crossed")
|
||||
n.handler(n.description)
|
||||
// wait waits up to notifierRefreshInterval for an event on the Epoll FD for the
|
||||
// eventfd we are concerned about. It returns an error if one occurrs, and true
|
||||
// if the consumer should read from the eventfd.
|
||||
func wait(epfd, eventfd int, timeout time.Duration) (bool, error) {
|
||||
events := make([]unix.EpollEvent, numFdEvents+1)
|
||||
timeoutMS := int(timeout / time.Millisecond)
|
||||
n, err := unix.EpollWait(epfd, events, timeoutMS)
|
||||
if n == -1 {
|
||||
if err == unix.EINTR {
|
||||
// Interrupt, ignore the error
|
||||
return false, nil
|
||||
}
|
||||
return false, err
|
||||
}
|
||||
if n == 0 {
|
||||
// Timeout
|
||||
return false, nil
|
||||
}
|
||||
if n > numFdEvents {
|
||||
return false, fmt.Errorf("epoll_wait returned more events than we know what to do with")
|
||||
}
|
||||
for _, event := range events[:n] {
|
||||
if event.Fd == int32(eventfd) {
|
||||
if event.Events&unix.EPOLLHUP != 0 || event.Events&unix.EPOLLERR != 0 || event.Events&unix.EPOLLIN != 0 {
|
||||
// EPOLLHUP: should not happen, but if it does, treat it as a wakeup.
|
||||
|
||||
// EPOLLERR: If an error is waiting on the file descriptor, we should pretend
|
||||
// something is ready to read, and let unix.Read pick up the error.
|
||||
|
||||
// EPOLLIN: There is data to read.
|
||||
return true, nil
|
||||
}
|
||||
}
|
||||
}
|
||||
// An event occurred that we don't care about.
|
||||
return false, nil
|
||||
}
|
||||
|
||||
func (n *linuxCgroupNotifier) Stop() {
|
||||
n.stopLock.Lock()
|
||||
defer n.stopLock.Unlock()
|
||||
select {
|
||||
case <-n.stop:
|
||||
// the linuxCgroupNotifier is already stopped
|
||||
return
|
||||
default:
|
||||
}
|
||||
unix.Close(n.eventfd)
|
||||
unix.Close(n.epfd)
|
||||
close(n.stop)
|
||||
}
|
||||
|
@ -18,10 +18,16 @@ limitations under the License.
|
||||
|
||||
package eviction
|
||||
|
||||
import "fmt"
|
||||
import "github.com/golang/glog"
|
||||
|
||||
// NewMemCGThresholdNotifier sends notifications when a cgroup threshold
|
||||
// is crossed (in either direction) for a given cgroup attribute
|
||||
func NewMemCGThresholdNotifier(path, attribute, threshold, description string, handler thresholdNotifierHandlerFunc) (ThresholdNotifier, error) {
|
||||
return nil, fmt.Errorf("threshold notification not supported")
|
||||
// NewCgroupNotifier creates a cgroup notifier that does nothing because cgroups do not exist on non-linux systems.
|
||||
func NewCgroupNotifier(path, attribute string, threshold int64) (CgroupNotifier, error) {
|
||||
glog.V(5).Infof("cgroup notifications not supported")
|
||||
return &unsupportedThresholdNotifier{}, nil
|
||||
}
|
||||
|
||||
type unsupportedThresholdNotifier struct{}
|
||||
|
||||
func (*unsupportedThresholdNotifier) Start(_ chan<- struct{}) {}
|
||||
|
||||
func (*unsupportedThresholdNotifier) Stop() {}
|
||||
|
@ -131,20 +131,30 @@ type nodeReclaimFunc func() error
|
||||
// nodeReclaimFuncs is an ordered list of nodeReclaimFunc
|
||||
type nodeReclaimFuncs []nodeReclaimFunc
|
||||
|
||||
// thresholdNotifierHandlerFunc is a function that takes action in response to a crossed threshold
|
||||
type thresholdNotifierHandlerFunc func(thresholdDescription string)
|
||||
|
||||
// ThresholdStopCh is an interface for a channel which is closed to stop waiting goroutines.
|
||||
// Implementations of ThresholdStopCh must correctly handle concurrent calls to all functions.
|
||||
type ThresholdStopCh interface {
|
||||
// Reset closes the channel if it can be closed, and returns true if it was closed.
|
||||
// Reset also creates a new channel.
|
||||
Reset() bool
|
||||
// Ch returns the channel that is closed when Reset() is called
|
||||
Ch() <-chan struct{}
|
||||
// CgroupNotifier generates events from cgroup events
|
||||
type CgroupNotifier interface {
|
||||
// Start causes the CgroupNotifier to begin notifying on the eventCh
|
||||
Start(eventCh chan<- struct{})
|
||||
// Stop stops all processes and cleans up file descriptors associated with the CgroupNotifier
|
||||
Stop()
|
||||
}
|
||||
|
||||
// ThresholdNotifier notifies the user when an attribute crosses a threshold value
|
||||
// NotifierFactory creates CgroupNotifer
|
||||
type NotifierFactory interface {
|
||||
// NewCgroupNotifier creates a CgroupNotifier that creates events when the threshold
|
||||
// on the attribute in the cgroup specified by the path is crossed.
|
||||
NewCgroupNotifier(path, attribute string, threshold int64) (CgroupNotifier, error)
|
||||
}
|
||||
|
||||
// ThresholdNotifier manages CgroupNotifiers based on memory eviction thresholds, and performs a function
|
||||
// when memory eviction thresholds are crossed
|
||||
type ThresholdNotifier interface {
|
||||
Start(ThresholdStopCh)
|
||||
// Start calls the notifier function when the CgroupNotifier notifies the ThresholdNotifier that an event occurred
|
||||
Start()
|
||||
// UpdateThreshold updates the memory cgroup threshold based on the metrics provided.
|
||||
// Calling UpdateThreshold with recent metrics allows the ThresholdNotifier to trigger at the
|
||||
// eviction threshold more accurately
|
||||
UpdateThreshold(summary *statsapi.Summary) error
|
||||
// Description produces a relevant string describing the Memory Threshold Notifier
|
||||
Description() string
|
||||
}
|
||||
|
Loading…
Reference in New Issue
Block a user