From 814c9428fd56aa81c9bbad27dd49a7c2948bfd09 Mon Sep 17 00:00:00 2001 From: Patrick Ohly Date: Wed, 4 Sep 2024 18:52:58 +0200 Subject: [PATCH] DRA scheduler: cache compiled CEL expressions MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit DeviceClasses and different requests are very likely to contain the same expression string. We don't need to compile that over and over again. To avoid hanging onto that cache longer than necessary, it's currently tied to each PreFilter/Filter combination. It might make sense to move this up into the scheduler plugin and thus reuse compiled expressions for different pods. goos: linux goarch: amd64 pkg: k8s.io/kubernetes/test/integration/scheduler_perf cpu: Intel(R) Core(TM) i9-7980XE CPU @ 2.60GHz │ before │ after │ │ SchedulingThroughput/Average │ SchedulingThroughput/Average vs base │ PerfScheduling/SchedulingWithResourceClaimTemplateStructured/5000pods_500nodes-36 33.95 ± 4% 36.65 ± 2% +7.95% (p=0.002 n=6) PerfScheduling/SteadyStateClusterResourceClaimTemplateStructured/empty_100nodes-36 105.8 ± 2% 106.7 ± 3% ~ (p=0.177 n=6) PerfScheduling/SteadyStateClusterResourceClaimTemplateStructured/empty_500nodes-36 100.7 ± 1% 119.7 ± 3% +18.82% (p=0.002 n=6) PerfScheduling/SteadyStateClusterResourceClaimTemplateStructured/half_100nodes-36 90.78 ± 1% 121.10 ± 4% +33.40% (p=0.002 n=6) PerfScheduling/SteadyStateClusterResourceClaimTemplateStructured/half_500nodes-36 50.51 ± 7% 63.72 ± 3% +26.17% (p=0.002 n=6) PerfScheduling/SteadyStateClusterResourceClaimTemplateStructured/full_100nodes-36 103.7 ± 5% 110.2 ± 2% +6.32% (p=0.002 n=6) PerfScheduling/SteadyStateClusterResourceClaimTemplateStructured/full_500nodes-36 28.50 ± 2% 28.16 ± 5% ~ (p=0.102 n=6) geomean 64.99 73.15 +12.56% --- .../dynamicresources/dynamicresources.go | 8 ++- .../k8s.io/dynamic-resource-allocation/go.mod | 2 +- .../structured/allocator.go | 27 +++++++++- .../structured/allocator_test.go | 2 +- .../structured/celcache.go | 54 +++++++++++++++++++ test/integration/scheduler_perf/dra.go | 3 +- 6 files changed, 91 insertions(+), 5 deletions(-) create mode 100644 staging/src/k8s.io/dynamic-resource-allocation/structured/celcache.go diff --git a/pkg/scheduler/framework/plugins/dynamicresources/dynamicresources.go b/pkg/scheduler/framework/plugins/dynamicresources/dynamicresources.go index d1a08fc1081..bac18e416d4 100644 --- a/pkg/scheduler/framework/plugins/dynamicresources/dynamicresources.go +++ b/pkg/scheduler/framework/plugins/dynamicresources/dynamicresources.go @@ -112,6 +112,7 @@ type DynamicResources struct { clientset kubernetes.Interface classLister resourcelisters.DeviceClassLister sliceLister resourcelisters.ResourceSliceLister + celCache *structured.CELCache // claimAssumeCache enables temporarily storing a newer claim object // while the scheduler has allocated it and the corresponding object @@ -186,6 +187,11 @@ func New(ctx context.Context, plArgs runtime.Object, fh framework.Handle, fts fe classLister: fh.SharedInformerFactory().Resource().V1alpha3().DeviceClasses().Lister(), sliceLister: fh.SharedInformerFactory().Resource().V1alpha3().ResourceSlices().Lister(), claimAssumeCache: fh.ResourceClaimCache(), + + // This is a LRU cache for compiled CEL expressions. The most + // recent 10 of them get reused across different scheduling + // cycles. + celCache: structured.NewCELCache(10), } return pl, nil @@ -540,7 +546,7 @@ func (pl *DynamicResources) PreFilter(ctx context.Context, state *framework.Cycl if err != nil { return nil, statusError(logger, err) } - allocator, err := structured.NewAllocator(ctx, pl.enableAdminAccess, allocateClaims, allocatedDevices, pl.classLister, slices) + allocator, err := structured.NewAllocator(ctx, pl.enableAdminAccess, allocateClaims, allocatedDevices, pl.classLister, slices, pl.celCache) if err != nil { return nil, statusError(logger, err) } diff --git a/staging/src/k8s.io/dynamic-resource-allocation/go.mod b/staging/src/k8s.io/dynamic-resource-allocation/go.mod index 8cbfbf386db..50a1a1b356d 100644 --- a/staging/src/k8s.io/dynamic-resource-allocation/go.mod +++ b/staging/src/k8s.io/dynamic-resource-allocation/go.mod @@ -8,6 +8,7 @@ godebug default=go1.23 require ( github.com/blang/semver/v4 v4.0.0 + github.com/golang/groupcache v0.0.0-20210331224755-41bb18bfe9da github.com/google/cel-go v0.21.0 github.com/google/go-cmp v0.6.0 github.com/onsi/gomega v1.33.1 @@ -36,7 +37,6 @@ require ( github.com/go-openapi/jsonreference v0.20.2 // indirect github.com/go-openapi/swag v0.23.0 // indirect github.com/gogo/protobuf v1.3.2 // indirect - github.com/golang/groupcache v0.0.0-20210331224755-41bb18bfe9da // indirect github.com/golang/protobuf v1.5.4 // indirect github.com/google/gnostic-models v0.6.8 // indirect github.com/google/gofuzz v1.2.0 // indirect diff --git a/staging/src/k8s.io/dynamic-resource-allocation/structured/allocator.go b/staging/src/k8s.io/dynamic-resource-allocation/structured/allocator.go index dd55602cf8c..e64c807f95f 100644 --- a/staging/src/k8s.io/dynamic-resource-allocation/structured/allocator.go +++ b/staging/src/k8s.io/dynamic-resource-allocation/structured/allocator.go @@ -30,6 +30,7 @@ import ( draapi "k8s.io/dynamic-resource-allocation/api" "k8s.io/dynamic-resource-allocation/cel" "k8s.io/klog/v2" + "k8s.io/utils/keymutex" "k8s.io/utils/ptr" ) @@ -45,6 +46,8 @@ type Allocator struct { allocatedDevices []DeviceID classLister resourcelisters.DeviceClassLister slices []*resourceapi.ResourceSlice + celCache *CELCache + celMutex keymutex.KeyMutex } // NewAllocator returns an allocator for a certain set of claims or an error if @@ -58,6 +61,7 @@ func NewAllocator(ctx context.Context, allocatedDevices []DeviceID, classLister resourcelisters.DeviceClassLister, slices []*resourceapi.ResourceSlice, + celCache *CELCache, ) (*Allocator, error) { return &Allocator{ adminAccessEnabled: adminAccessEnabled, @@ -65,6 +69,8 @@ func NewAllocator(ctx context.Context, allocatedDevices: allocatedDevices, classLister: classLister, slices: slices, + celCache: celCache, + celMutex: keymutex.NewHashed(0), }, nil } @@ -73,6 +79,25 @@ func (a *Allocator) ClaimsToAllocate() []*resourceapi.ResourceClaim { return a.claimsToAllocate } +func (a *Allocator) compileCELExpression(expression string) cel.CompilationResult { + // Compiling a CEL expression is expensive enough that it is cheaper + // to lock a mutex than doing it several times in parallel. + a.celMutex.LockKey(expression) + //nolint:errcheck // Only returns an error for unknown keys, which isn't the case here. + defer a.celMutex.UnlockKey(expression) + + cached := a.celCache.get(expression) + if cached != nil { + return *cached + } + + expr := cel.GetCompiler().CompileCELExpression(expression, cel.Options{}) + if expr.Error == nil { + a.celCache.add(expression, &expr) + } + return expr +} + // Allocate calculates the allocation(s) for one particular node. // // It returns an error only if some fatal problem occurred. These are errors @@ -697,7 +722,7 @@ func (alloc *allocator) isSelectable(r requestIndices, slice *draapi.ResourceSli func (alloc *allocator) selectorsMatch(r requestIndices, device *draapi.BasicDevice, deviceID DeviceID, class *resourceapi.DeviceClass, selectors []resourceapi.DeviceSelector) (bool, error) { for i, selector := range selectors { - expr := cel.GetCompiler().CompileCELExpression(selector.CEL.Expression, cel.Options{}) + expr := alloc.compileCELExpression(selector.CEL.Expression) if expr.Error != nil { // Could happen if some future apiserver accepted some // future expression and then got downgraded. Normally diff --git a/staging/src/k8s.io/dynamic-resource-allocation/structured/allocator_test.go b/staging/src/k8s.io/dynamic-resource-allocation/structured/allocator_test.go index 6b2816d47fa..c9d87325bdb 100644 --- a/staging/src/k8s.io/dynamic-resource-allocation/structured/allocator_test.go +++ b/staging/src/k8s.io/dynamic-resource-allocation/structured/allocator_test.go @@ -1375,7 +1375,7 @@ func TestAllocator(t *testing.T) { allocatedDevices := slices.Clone(tc.allocatedDevices) slices := slices.Clone(tc.slices) - allocator, err := NewAllocator(ctx, tc.adminAccess, claimsToAllocate, allocatedDevices, classLister, slices) + allocator, err := NewAllocator(ctx, tc.adminAccess, claimsToAllocate, allocatedDevices, classLister, slices, NewCELCache(1)) g.Expect(err).ToNot(gomega.HaveOccurred()) results, err := allocator.Allocate(ctx, tc.node) diff --git a/staging/src/k8s.io/dynamic-resource-allocation/structured/celcache.go b/staging/src/k8s.io/dynamic-resource-allocation/structured/celcache.go new file mode 100644 index 00000000000..f4c84180fe5 --- /dev/null +++ b/staging/src/k8s.io/dynamic-resource-allocation/structured/celcache.go @@ -0,0 +1,54 @@ +/* +Copyright 2024 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package structured + +import ( + "sync" + + "github.com/golang/groupcache/lru" + + "k8s.io/dynamic-resource-allocation/cel" +) + +// CELCache is an LRU cache for a compiled CEL expression. +type CELCache struct { + mutex sync.RWMutex + cache *lru.Cache +} + +// NewCELCache returns a CELCache +func NewCELCache(maxCacheEntries int) *CELCache { + return &CELCache{ + cache: lru.New(maxCacheEntries), + } +} + +func (c *CELCache) add(expression string, expr *cel.CompilationResult) { + c.mutex.Lock() + defer c.mutex.Unlock() + c.cache.Add(expression, expr) +} + +func (c *CELCache) get(expression string) *cel.CompilationResult { + c.mutex.RLock() + defer c.mutex.RUnlock() + expr, found := c.cache.Get(expression) + if !found { + return nil + } + return expr.(*cel.CompilationResult) +} diff --git a/test/integration/scheduler_perf/dra.go b/test/integration/scheduler_perf/dra.go index 8b25d9895dc..ce161911eab 100644 --- a/test/integration/scheduler_perf/dra.go +++ b/test/integration/scheduler_perf/dra.go @@ -290,6 +290,7 @@ func (op *allocResourceClaimsOp) run(tCtx ktesting.TContext) { reflect.TypeOf(&v1.Node{}): true, } require.Equal(tCtx, expectSyncedInformers, syncedInformers, "synced informers") + celCache := structured.NewCELCache(10) // The set of nodes is assumed to be fixed at this point. nodes, err := nodeLister.List(labels.Everything()) @@ -319,7 +320,7 @@ claims: } } - allocator, err := structured.NewAllocator(tCtx, utilfeature.DefaultFeatureGate.Enabled(features.DRAAdminAccess), []*resourceapi.ResourceClaim{claim}, allocatedDevices, classLister, slices) + allocator, err := structured.NewAllocator(tCtx, utilfeature.DefaultFeatureGate.Enabled(features.DRAAdminAccess), []*resourceapi.ResourceClaim{claim}, allocatedDevices, classLister, slices, celCache) tCtx.ExpectNoError(err, "create allocator") rand.Shuffle(len(nodes), func(i, j int) {