Merge remote-tracking branch 'origin/master' into release-1.31

This commit is contained in:
Kubernetes Release Robot 2024-07-31 22:46:25 +00:00
commit cb08f03fac
5 changed files with 594 additions and 8 deletions

View File

@ -1264,6 +1264,8 @@ var defaultKubernetesFeatureGates = map[featuregate.Feature]featuregate.FeatureS
genericfeatures.AuthorizeWithSelectors: {Default: false, PreRelease: featuregate.Alpha}, genericfeatures.AuthorizeWithSelectors: {Default: false, PreRelease: featuregate.Alpha},
genericfeatures.ConcurrentWatchObjectDecode: {Default: false, PreRelease: featuregate.Beta},
genericfeatures.ConsistentListFromCache: {Default: true, PreRelease: featuregate.Beta}, genericfeatures.ConsistentListFromCache: {Default: true, PreRelease: featuregate.Beta},
genericfeatures.CoordinatedLeaderElection: {Default: false, PreRelease: featuregate.Alpha}, genericfeatures.CoordinatedLeaderElection: {Default: false, PreRelease: featuregate.Alpha},

View File

@ -30,6 +30,12 @@ rules:
branch: release-1.30 branch: release-1.30
dirs: dirs:
- staging/src/k8s.io/apimachinery - staging/src/k8s.io/apimachinery
- name: release-1.31
go: 1.22.5
source:
branch: release-1.31
dirs:
- staging/src/k8s.io/apimachinery
library: true library: true
- destination: api - destination: api
branches: branches:
@ -77,6 +83,15 @@ rules:
branch: release-1.30 branch: release-1.30
dirs: dirs:
- staging/src/k8s.io/api - staging/src/k8s.io/api
- name: release-1.31
go: 1.22.5
dependencies:
- repository: apimachinery
branch: release-1.31
source:
branch: release-1.31
dirs:
- staging/src/k8s.io/api
library: true library: true
- destination: client-go - destination: client-go
branches: branches:
@ -154,6 +169,21 @@ rules:
# assumes GO111MODULE=on # assumes GO111MODULE=on
go build -mod=mod ./... go build -mod=mod ./...
go test -mod=mod ./... go test -mod=mod ./...
- name: release-1.31
go: 1.22.5
dependencies:
- repository: apimachinery
branch: release-1.31
- repository: api
branch: release-1.31
source:
branch: release-1.31
dirs:
- staging/src/k8s.io/client-go
smoke-test: |
# assumes GO111MODULE=on
go build -mod=mod ./...
go test -mod=mod ./...
library: true library: true
- destination: code-generator - destination: code-generator
branches: branches:
@ -192,6 +222,15 @@ rules:
branch: release-1.30 branch: release-1.30
dirs: dirs:
- staging/src/k8s.io/code-generator - staging/src/k8s.io/code-generator
- name: release-1.31
go: 1.22.5
dependencies:
- repository: apimachinery
branch: release-1.31
source:
branch: release-1.31
dirs:
- staging/src/k8s.io/code-generator
- destination: component-base - destination: component-base
branches: branches:
- name: master - name: master
@ -258,6 +297,19 @@ rules:
branch: release-1.30 branch: release-1.30
dirs: dirs:
- staging/src/k8s.io/component-base - staging/src/k8s.io/component-base
- name: release-1.31
go: 1.22.5
dependencies:
- repository: apimachinery
branch: release-1.31
- repository: api
branch: release-1.31
- repository: client-go
branch: release-1.31
source:
branch: release-1.31
dirs:
- staging/src/k8s.io/component-base
library: true library: true
- destination: component-helpers - destination: component-helpers
branches: branches:
@ -325,6 +377,19 @@ rules:
branch: release-1.30 branch: release-1.30
dirs: dirs:
- staging/src/k8s.io/component-helpers - staging/src/k8s.io/component-helpers
- name: release-1.31
go: 1.22.5
dependencies:
- repository: apimachinery
branch: release-1.31
- repository: api
branch: release-1.31
- repository: client-go
branch: release-1.31
source:
branch: release-1.31
dirs:
- staging/src/k8s.io/component-helpers
library: true library: true
- destination: kms - destination: kms
branches: branches:
@ -380,6 +445,15 @@ rules:
branch: release-1.30 branch: release-1.30
dirs: dirs:
- staging/src/k8s.io/kms - staging/src/k8s.io/kms
- name: release-1.31
go: 1.22.5
dependencies:
- repository: apimachinery
branch: release-1.31
source:
branch: release-1.31
dirs:
- staging/src/k8s.io/kms
library: true library: true
- destination: apiserver - destination: apiserver
branches: branches:
@ -467,6 +541,23 @@ rules:
branch: release-1.30 branch: release-1.30
dirs: dirs:
- staging/src/k8s.io/apiserver - staging/src/k8s.io/apiserver
- name: release-1.31
go: 1.22.5
dependencies:
- repository: apimachinery
branch: release-1.31
- repository: api
branch: release-1.31
- repository: client-go
branch: release-1.31
- repository: component-base
branch: release-1.31
- repository: kms
branch: release-1.31
source:
branch: release-1.31
dirs:
- staging/src/k8s.io/apiserver
library: true library: true
- destination: kube-aggregator - destination: kube-aggregator
branches: branches:
@ -574,6 +665,27 @@ rules:
branch: release-1.30 branch: release-1.30
dirs: dirs:
- staging/src/k8s.io/kube-aggregator - staging/src/k8s.io/kube-aggregator
- name: release-1.31
go: 1.22.5
dependencies:
- repository: apimachinery
branch: release-1.31
- repository: api
branch: release-1.31
- repository: client-go
branch: release-1.31
- repository: apiserver
branch: release-1.31
- repository: component-base
branch: release-1.31
- repository: kms
branch: release-1.31
- repository: code-generator
branch: release-1.31
source:
branch: release-1.31
dirs:
- staging/src/k8s.io/kube-aggregator
- destination: sample-apiserver - destination: sample-apiserver
branches: branches:
- name: master - name: master
@ -705,6 +817,32 @@ rules:
smoke-test: | smoke-test: |
# assumes GO111MODULE=on # assumes GO111MODULE=on
go build -mod=mod . go build -mod=mod .
- name: release-1.31
go: 1.22.5
dependencies:
- repository: apimachinery
branch: release-1.31
- repository: api
branch: release-1.31
- repository: client-go
branch: release-1.31
- repository: apiserver
branch: release-1.31
- repository: code-generator
branch: release-1.31
- repository: kms
branch: release-1.31
- repository: component-base
branch: release-1.31
source:
branch: release-1.31
dirs:
- staging/src/k8s.io/sample-apiserver
required-packages:
- k8s.io/code-generator
smoke-test: |
# assumes GO111MODULE=on
go build -mod=mod .
- destination: sample-controller - destination: sample-controller
branches: branches:
- name: master - name: master
@ -806,6 +944,26 @@ rules:
smoke-test: | smoke-test: |
# assumes GO111MODULE=on # assumes GO111MODULE=on
go build -mod=mod . go build -mod=mod .
- name: release-1.31
go: 1.22.5
dependencies:
- repository: apimachinery
branch: release-1.31
- repository: api
branch: release-1.31
- repository: client-go
branch: release-1.31
- repository: code-generator
branch: release-1.31
source:
branch: release-1.31
dirs:
- staging/src/k8s.io/sample-controller
required-packages:
- k8s.io/code-generator
smoke-test: |
# assumes GO111MODULE=on
go build -mod=mod .
- destination: apiextensions-apiserver - destination: apiextensions-apiserver
branches: branches:
- name: master - name: master
@ -922,6 +1080,29 @@ rules:
- staging/src/k8s.io/apiextensions-apiserver - staging/src/k8s.io/apiextensions-apiserver
required-packages: required-packages:
- k8s.io/code-generator - k8s.io/code-generator
- name: release-1.31
go: 1.22.5
dependencies:
- repository: apimachinery
branch: release-1.31
- repository: api
branch: release-1.31
- repository: client-go
branch: release-1.31
- repository: apiserver
branch: release-1.31
- repository: code-generator
branch: release-1.31
- repository: component-base
branch: release-1.31
- repository: kms
branch: release-1.31
source:
branch: release-1.31
dirs:
- staging/src/k8s.io/apiextensions-apiserver
required-packages:
- k8s.io/code-generator
- destination: metrics - destination: metrics
branches: branches:
- name: master - name: master
@ -998,6 +1179,21 @@ rules:
branch: release-1.30 branch: release-1.30
dirs: dirs:
- staging/src/k8s.io/metrics - staging/src/k8s.io/metrics
- name: release-1.31
go: 1.22.5
dependencies:
- repository: apimachinery
branch: release-1.31
- repository: api
branch: release-1.31
- repository: client-go
branch: release-1.31
- repository: code-generator
branch: release-1.31
source:
branch: release-1.31
dirs:
- staging/src/k8s.io/metrics
library: true library: true
- destination: cli-runtime - destination: cli-runtime
branches: branches:
@ -1065,6 +1261,19 @@ rules:
branch: release-1.30 branch: release-1.30
dirs: dirs:
- staging/src/k8s.io/cli-runtime - staging/src/k8s.io/cli-runtime
- name: release-1.31
go: 1.22.5
dependencies:
- repository: api
branch: release-1.31
- repository: apimachinery
branch: release-1.31
- repository: client-go
branch: release-1.31
source:
branch: release-1.31
dirs:
- staging/src/k8s.io/cli-runtime
library: true library: true
- destination: sample-cli-plugin - destination: sample-cli-plugin
branches: branches:
@ -1142,6 +1351,21 @@ rules:
branch: release-1.30 branch: release-1.30
dirs: dirs:
- staging/src/k8s.io/sample-cli-plugin - staging/src/k8s.io/sample-cli-plugin
- name: release-1.31
go: 1.22.5
dependencies:
- repository: api
branch: release-1.31
- repository: apimachinery
branch: release-1.31
- repository: cli-runtime
branch: release-1.31
- repository: client-go
branch: release-1.31
source:
branch: release-1.31
dirs:
- staging/src/k8s.io/sample-cli-plugin
- destination: kube-proxy - destination: kube-proxy
branches: branches:
- name: master - name: master
@ -1218,6 +1442,21 @@ rules:
branch: release-1.30 branch: release-1.30
dirs: dirs:
- staging/src/k8s.io/kube-proxy - staging/src/k8s.io/kube-proxy
- name: release-1.31
go: 1.22.5
dependencies:
- repository: apimachinery
branch: release-1.31
- repository: component-base
branch: release-1.31
- repository: api
branch: release-1.31
- repository: client-go
branch: release-1.31
source:
branch: release-1.31
dirs:
- staging/src/k8s.io/kube-proxy
library: true library: true
- destination: cri-api - destination: cri-api
branches: branches:
@ -1250,6 +1489,12 @@ rules:
branch: release-1.30 branch: release-1.30
dirs: dirs:
- staging/src/k8s.io/cri-api - staging/src/k8s.io/cri-api
- name: release-1.31
go: 1.22.5
source:
branch: release-1.31
dirs:
- staging/src/k8s.io/cri-api
library: true library: true
- destination: cri-client - destination: cri-client
branches: branches:
@ -1269,6 +1514,23 @@ rules:
branch: master branch: master
dirs: dirs:
- staging/src/k8s.io/cri-client - staging/src/k8s.io/cri-client
- name: release-1.31
go: 1.22.5
dependencies:
- repository: api
branch: release-1.31
- repository: apimachinery
branch: release-1.31
- repository: client-go
branch: release-1.31
- repository: component-base
branch: release-1.31
- repository: cri-api
branch: release-1.31
source:
branch: release-1.31
dirs:
- staging/src/k8s.io/cri-client
library: true library: true
- destination: kubelet - destination: kubelet
branches: branches:
@ -1370,6 +1632,27 @@ rules:
branch: release-1.30 branch: release-1.30
dirs: dirs:
- staging/src/k8s.io/kubelet - staging/src/k8s.io/kubelet
- name: release-1.31
go: 1.22.5
dependencies:
- repository: apimachinery
branch: release-1.31
- repository: apiserver
branch: release-1.31
- repository: api
branch: release-1.31
- repository: client-go
branch: release-1.31
- repository: cri-api
branch: release-1.31
- repository: component-base
branch: release-1.31
- repository: kms
branch: release-1.31
source:
branch: release-1.31
dirs:
- staging/src/k8s.io/kubelet
library: true library: true
- destination: kube-scheduler - destination: kube-scheduler
branches: branches:
@ -1447,6 +1730,21 @@ rules:
branch: release-1.30 branch: release-1.30
dirs: dirs:
- staging/src/k8s.io/kube-scheduler - staging/src/k8s.io/kube-scheduler
- name: release-1.31
go: 1.22.5
dependencies:
- repository: apimachinery
branch: release-1.31
- repository: component-base
branch: release-1.31
- repository: api
branch: release-1.31
- repository: client-go
branch: release-1.31
source:
branch: release-1.31
dirs:
- staging/src/k8s.io/kube-scheduler
library: true library: true
- destination: controller-manager - destination: controller-manager
branches: branches:
@ -1544,6 +1842,25 @@ rules:
branch: release-1.30 branch: release-1.30
dirs: dirs:
- staging/src/k8s.io/controller-manager - staging/src/k8s.io/controller-manager
- name: release-1.31
go: 1.22.5
dependencies:
- repository: api
branch: release-1.31
- repository: apimachinery
branch: release-1.31
- repository: client-go
branch: release-1.31
- repository: component-base
branch: release-1.31
- repository: apiserver
branch: release-1.31
- repository: kms
branch: release-1.31
source:
branch: release-1.31
dirs:
- staging/src/k8s.io/controller-manager
library: true library: true
- destination: cloud-provider - destination: cloud-provider
branches: branches:
@ -1661,6 +1978,29 @@ rules:
branch: release-1.30 branch: release-1.30
dirs: dirs:
- staging/src/k8s.io/cloud-provider - staging/src/k8s.io/cloud-provider
- name: release-1.31
go: 1.22.5
dependencies:
- repository: api
branch: release-1.31
- repository: apimachinery
branch: release-1.31
- repository: apiserver
branch: release-1.31
- repository: client-go
branch: release-1.31
- repository: component-base
branch: release-1.31
- repository: controller-manager
branch: release-1.31
- repository: component-helpers
branch: release-1.31
- repository: kms
branch: release-1.31
source:
branch: release-1.31
dirs:
- staging/src/k8s.io/cloud-provider
library: true library: true
- destination: kube-controller-manager - destination: kube-controller-manager
branches: branches:
@ -1788,6 +2128,31 @@ rules:
branch: release-1.30 branch: release-1.30
dirs: dirs:
- staging/src/k8s.io/kube-controller-manager - staging/src/k8s.io/kube-controller-manager
- name: release-1.31
go: 1.22.5
dependencies:
- repository: apimachinery
branch: release-1.31
- repository: apiserver
branch: release-1.31
- repository: component-base
branch: release-1.31
- repository: api
branch: release-1.31
- repository: client-go
branch: release-1.31
- repository: controller-manager
branch: release-1.31
- repository: cloud-provider
branch: release-1.31
- repository: component-helpers
branch: release-1.31
- repository: kms
branch: release-1.31
source:
branch: release-1.31
dirs:
- staging/src/k8s.io/kube-controller-manager
library: true library: true
- destination: cluster-bootstrap - destination: cluster-bootstrap
branches: branches:
@ -1845,6 +2210,17 @@ rules:
branch: release-1.30 branch: release-1.30
dirs: dirs:
- staging/src/k8s.io/cluster-bootstrap - staging/src/k8s.io/cluster-bootstrap
- name: release-1.31
go: 1.22.5
dependencies:
- repository: apimachinery
branch: release-1.31
- repository: api
branch: release-1.31
source:
branch: release-1.31
dirs:
- staging/src/k8s.io/cluster-bootstrap
library: true library: true
- destination: csi-translation-lib - destination: csi-translation-lib
branches: branches:
@ -1902,6 +2278,17 @@ rules:
branch: release-1.30 branch: release-1.30
dirs: dirs:
- staging/src/k8s.io/csi-translation-lib - staging/src/k8s.io/csi-translation-lib
- name: release-1.31
go: 1.22.5
dependencies:
- repository: api
branch: release-1.31
- repository: apimachinery
branch: release-1.31
source:
branch: release-1.31
dirs:
- staging/src/k8s.io/csi-translation-lib
library: true library: true
- destination: mount-utils - destination: mount-utils
branches: branches:
@ -1934,6 +2321,12 @@ rules:
branch: release-1.30 branch: release-1.30
dirs: dirs:
- staging/src/k8s.io/mount-utils - staging/src/k8s.io/mount-utils
- name: release-1.31
go: 1.22.5
source:
branch: release-1.31
dirs:
- staging/src/k8s.io/mount-utils
library: true library: true
- destination: legacy-cloud-providers - destination: legacy-cloud-providers
branches: branches:
@ -2154,6 +2547,29 @@ rules:
branch: release-1.30 branch: release-1.30
dirs: dirs:
- staging/src/k8s.io/kubectl - staging/src/k8s.io/kubectl
- name: release-1.31
go: 1.22.5
dependencies:
- repository: api
branch: release-1.31
- repository: apimachinery
branch: release-1.31
- repository: cli-runtime
branch: release-1.31
- repository: client-go
branch: release-1.31
- repository: code-generator
branch: release-1.31
- repository: component-base
branch: release-1.31
- repository: component-helpers
branch: release-1.31
- repository: metrics
branch: release-1.31
source:
branch: release-1.31
dirs:
- staging/src/k8s.io/kubectl
library: true library: true
- destination: pod-security-admission - destination: pod-security-admission
branches: branches:
@ -2251,6 +2667,25 @@ rules:
branch: release-1.30 branch: release-1.30
dirs: dirs:
- staging/src/k8s.io/pod-security-admission - staging/src/k8s.io/pod-security-admission
- name: release-1.31
go: 1.22.5
dependencies:
- repository: api
branch: release-1.31
- repository: apimachinery
branch: release-1.31
- repository: apiserver
branch: release-1.31
- repository: client-go
branch: release-1.31
- repository: component-base
branch: release-1.31
- repository: kms
branch: release-1.31
source:
branch: release-1.31
dirs:
- staging/src/k8s.io/pod-security-admission
library: true library: true
- destination: dynamic-resource-allocation - destination: dynamic-resource-allocation
branches: branches:
@ -2360,6 +2795,31 @@ rules:
branch: release-1.30 branch: release-1.30
dirs: dirs:
- staging/src/k8s.io/dynamic-resource-allocation - staging/src/k8s.io/dynamic-resource-allocation
- name: release-1.31
go: 1.22.5
dependencies:
- repository: apimachinery
branch: release-1.31
- repository: apiserver
branch: release-1.31
- repository: api
branch: release-1.31
- repository: client-go
branch: release-1.31
- repository: cri-api
branch: release-1.31
- repository: component-base
branch: release-1.31
- repository: component-helpers
branch: release-1.31
- repository: kms
branch: release-1.31
- repository: kubelet
branch: release-1.31
source:
branch: release-1.31
dirs:
- staging/src/k8s.io/dynamic-resource-allocation
- destination: endpointslice - destination: endpointslice
branches: branches:
- name: master - name: master
@ -2421,6 +2881,21 @@ rules:
branch: release-1.30 branch: release-1.30
dirs: dirs:
- staging/src/k8s.io/endpointslice - staging/src/k8s.io/endpointslice
- name: release-1.31
go: 1.22.5
dependencies:
- repository: api
branch: release-1.31
- repository: apimachinery
branch: release-1.31
- repository: client-go
branch: release-1.31
- repository: component-base
branch: release-1.31
source:
branch: release-1.31
dirs:
- staging/src/k8s.io/endpointslice
recursive-delete-patterns: recursive-delete-patterns:
- '*/.gitattributes' - '*/.gitattributes'
default-go-version: 1.22.5 default-go-version: 1.22.5

View File

@ -101,6 +101,11 @@ const (
// Allows authorization to use field and label selectors. // Allows authorization to use field and label selectors.
AuthorizeWithSelectors featuregate.Feature = "AuthorizeWithSelectors" AuthorizeWithSelectors featuregate.Feature = "AuthorizeWithSelectors"
// owner: @serathius
// beta: v1.31
// Enables concurrent watch object decoding to avoid starving watch cache when conversion webhook is installed.
ConcurrentWatchObjectDecode featuregate.Feature = "ConcurrentWatchObjectDecode"
// owner: @cici37 @jpbetz // owner: @cici37 @jpbetz
// kep: http://kep.k8s.io/3488 // kep: http://kep.k8s.io/3488
// alpha: v1.26 // alpha: v1.26
@ -365,6 +370,8 @@ var defaultKubernetesFeatureGates = map[featuregate.Feature]featuregate.FeatureS
AuthorizeWithSelectors: {Default: false, PreRelease: featuregate.Alpha}, AuthorizeWithSelectors: {Default: false, PreRelease: featuregate.Alpha},
ConcurrentWatchObjectDecode: {Default: false, PreRelease: featuregate.Beta},
ValidatingAdmissionPolicy: {Default: true, PreRelease: featuregate.GA, LockToDefault: true}, // remove in 1.32 ValidatingAdmissionPolicy: {Default: true, PreRelease: featuregate.GA, LockToDefault: true}, // remove in 1.32
CoordinatedLeaderElection: {Default: false, PreRelease: featuregate.Alpha}, CoordinatedLeaderElection: {Default: false, PreRelease: featuregate.Alpha},

View File

@ -46,8 +46,9 @@ import (
const ( const (
// We have set a buffer in order to reduce times of context switches. // We have set a buffer in order to reduce times of context switches.
incomingBufSize = 100 incomingBufSize = 100
outgoingBufSize = 100 outgoingBufSize = 100
processEventConcurrency = 10
) )
// defaultWatcherMaxLimit is used to facilitate construction tests // defaultWatcherMaxLimit is used to facilitate construction tests
@ -230,8 +231,7 @@ func (wc *watchChan) run(initialEventsEndBookmarkRequired, forceInitialEvents bo
go wc.startWatching(watchClosedCh, initialEventsEndBookmarkRequired, forceInitialEvents) go wc.startWatching(watchClosedCh, initialEventsEndBookmarkRequired, forceInitialEvents)
var resultChanWG sync.WaitGroup var resultChanWG sync.WaitGroup
resultChanWG.Add(1) wc.processEvents(&resultChanWG)
go wc.processEvent(&resultChanWG)
select { select {
case err := <-wc.errChan: case err := <-wc.errChan:
@ -424,10 +424,17 @@ func (wc *watchChan) startWatching(watchClosedCh chan struct{}, initialEventsEnd
close(watchClosedCh) close(watchClosedCh)
} }
// processEvent processes events from etcd watcher and sends results to resultChan. // processEvents processes events from etcd watcher and sends results to resultChan.
func (wc *watchChan) processEvent(wg *sync.WaitGroup) { func (wc *watchChan) processEvents(wg *sync.WaitGroup) {
if utilfeature.DefaultFeatureGate.Enabled(features.ConcurrentWatchObjectDecode) {
wc.concurrentProcessEvents(wg)
} else {
wg.Add(1)
go wc.serialProcessEvents(wg)
}
}
func (wc *watchChan) serialProcessEvents(wg *sync.WaitGroup) {
defer wg.Done() defer wg.Done()
for { for {
select { select {
case e := <-wc.incomingEventChan: case e := <-wc.incomingEventChan:
@ -435,7 +442,7 @@ func (wc *watchChan) processEvent(wg *sync.WaitGroup) {
if res == nil { if res == nil {
continue continue
} }
if len(wc.resultChan) == outgoingBufSize { if len(wc.resultChan) == cap(wc.resultChan) {
klog.V(3).InfoS("Fast watcher, slow processing. Probably caused by slow dispatching events to watchers", "outgoingEvents", outgoingBufSize, "objectType", wc.watcher.objectType, "groupResource", wc.watcher.groupResource) klog.V(3).InfoS("Fast watcher, slow processing. Probably caused by slow dispatching events to watchers", "outgoingEvents", outgoingBufSize, "objectType", wc.watcher.objectType, "groupResource", wc.watcher.groupResource)
} }
// If user couldn't receive results fast enough, we also block incoming events from watcher. // If user couldn't receive results fast enough, we also block incoming events from watcher.
@ -452,6 +459,95 @@ func (wc *watchChan) processEvent(wg *sync.WaitGroup) {
} }
} }
func (wc *watchChan) concurrentProcessEvents(wg *sync.WaitGroup) {
p := concurrentOrderedEventProcessing{
input: wc.incomingEventChan,
processFunc: wc.transform,
output: wc.resultChan,
processingQueue: make(chan chan *watch.Event, processEventConcurrency-1),
objectType: wc.watcher.objectType,
groupResource: wc.watcher.groupResource,
}
wg.Add(1)
go func() {
defer wg.Done()
p.scheduleEventProcessing(wc.ctx, wg)
}()
wg.Add(1)
go func() {
defer wg.Done()
p.collectEventProcessing(wc.ctx)
}()
}
type concurrentOrderedEventProcessing struct {
input chan *event
processFunc func(*event) *watch.Event
output chan watch.Event
processingQueue chan chan *watch.Event
// Metadata for logging
objectType string
groupResource schema.GroupResource
}
func (p *concurrentOrderedEventProcessing) scheduleEventProcessing(ctx context.Context, wg *sync.WaitGroup) {
var e *event
for {
select {
case <-ctx.Done():
return
case e = <-p.input:
}
processingResponse := make(chan *watch.Event, 1)
select {
case <-ctx.Done():
return
case p.processingQueue <- processingResponse:
}
wg.Add(1)
go func(e *event, response chan<- *watch.Event) {
defer wg.Done()
select {
case <-ctx.Done():
case response <- p.processFunc(e):
}
}(e, processingResponse)
}
}
func (p *concurrentOrderedEventProcessing) collectEventProcessing(ctx context.Context) {
var processingResponse chan *watch.Event
var e *watch.Event
for {
select {
case <-ctx.Done():
return
case processingResponse = <-p.processingQueue:
}
select {
case <-ctx.Done():
return
case e = <-processingResponse:
}
if e == nil {
continue
}
if len(p.output) == cap(p.output) {
klog.V(3).InfoS("Fast watcher, slow processing. Probably caused by slow dispatching events to watchers", "outgoingEvents", outgoingBufSize, "objectType", p.objectType, "groupResource", p.groupResource)
}
// If user couldn't receive results fast enough, we also block incoming events from watcher.
// Because storing events in local will cause more memory usage.
// The worst case would be closing the fast watcher.
select {
case <-ctx.Done():
return
case p.output <- *e:
}
}
}
func (wc *watchChan) filter(obj runtime.Object) bool { func (wc *watchChan) filter(obj runtime.Object) bool {
if wc.internalPred.Empty() { if wc.internalPred.Empty() {
return true return true

View File

@ -133,6 +133,12 @@ func TestEtcdWatchSemantics(t *testing.T) {
storagetesting.RunWatchSemantics(ctx, t, store) storagetesting.RunWatchSemantics(ctx, t, store)
} }
func TestEtcdWatchSemanticsWithConcurrentDecode(t *testing.T) {
featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.ConcurrentWatchObjectDecode, true)
ctx, store, _ := testSetup(t)
storagetesting.RunWatchSemantics(ctx, t, store)
}
func TestEtcdWatchSemanticInitialEventsExtended(t *testing.T) { func TestEtcdWatchSemanticInitialEventsExtended(t *testing.T) {
ctx, store, _ := testSetup(t) ctx, store, _ := testSetup(t)
storagetesting.RunWatchSemanticInitialEventsExtended(ctx, t, store) storagetesting.RunWatchSemanticInitialEventsExtended(ctx, t, store)