diff --git a/pkg/scheduler/framework/plugins/dynamicresources/dynamicresources.go b/pkg/scheduler/framework/plugins/dynamicresources/dynamicresources.go index d1a08fc1081..bac18e416d4 100644 --- a/pkg/scheduler/framework/plugins/dynamicresources/dynamicresources.go +++ b/pkg/scheduler/framework/plugins/dynamicresources/dynamicresources.go @@ -112,6 +112,7 @@ type DynamicResources struct { clientset kubernetes.Interface classLister resourcelisters.DeviceClassLister sliceLister resourcelisters.ResourceSliceLister + celCache *structured.CELCache // claimAssumeCache enables temporarily storing a newer claim object // while the scheduler has allocated it and the corresponding object @@ -186,6 +187,11 @@ func New(ctx context.Context, plArgs runtime.Object, fh framework.Handle, fts fe classLister: fh.SharedInformerFactory().Resource().V1alpha3().DeviceClasses().Lister(), sliceLister: fh.SharedInformerFactory().Resource().V1alpha3().ResourceSlices().Lister(), claimAssumeCache: fh.ResourceClaimCache(), + + // This is a LRU cache for compiled CEL expressions. The most + // recent 10 of them get reused across different scheduling + // cycles. + celCache: structured.NewCELCache(10), } return pl, nil @@ -540,7 +546,7 @@ func (pl *DynamicResources) PreFilter(ctx context.Context, state *framework.Cycl if err != nil { return nil, statusError(logger, err) } - allocator, err := structured.NewAllocator(ctx, pl.enableAdminAccess, allocateClaims, allocatedDevices, pl.classLister, slices) + allocator, err := structured.NewAllocator(ctx, pl.enableAdminAccess, allocateClaims, allocatedDevices, pl.classLister, slices, pl.celCache) if err != nil { return nil, statusError(logger, err) } diff --git a/staging/src/k8s.io/dynamic-resource-allocation/go.mod b/staging/src/k8s.io/dynamic-resource-allocation/go.mod index 8cbfbf386db..50a1a1b356d 100644 --- a/staging/src/k8s.io/dynamic-resource-allocation/go.mod +++ b/staging/src/k8s.io/dynamic-resource-allocation/go.mod @@ -8,6 +8,7 @@ godebug default=go1.23 require ( github.com/blang/semver/v4 v4.0.0 + github.com/golang/groupcache v0.0.0-20210331224755-41bb18bfe9da github.com/google/cel-go v0.21.0 github.com/google/go-cmp v0.6.0 github.com/onsi/gomega v1.33.1 @@ -36,7 +37,6 @@ require ( github.com/go-openapi/jsonreference v0.20.2 // indirect github.com/go-openapi/swag v0.23.0 // indirect github.com/gogo/protobuf v1.3.2 // indirect - github.com/golang/groupcache v0.0.0-20210331224755-41bb18bfe9da // indirect github.com/golang/protobuf v1.5.4 // indirect github.com/google/gnostic-models v0.6.8 // indirect github.com/google/gofuzz v1.2.0 // indirect diff --git a/staging/src/k8s.io/dynamic-resource-allocation/structured/allocator.go b/staging/src/k8s.io/dynamic-resource-allocation/structured/allocator.go index dd55602cf8c..e64c807f95f 100644 --- a/staging/src/k8s.io/dynamic-resource-allocation/structured/allocator.go +++ b/staging/src/k8s.io/dynamic-resource-allocation/structured/allocator.go @@ -30,6 +30,7 @@ import ( draapi "k8s.io/dynamic-resource-allocation/api" "k8s.io/dynamic-resource-allocation/cel" "k8s.io/klog/v2" + "k8s.io/utils/keymutex" "k8s.io/utils/ptr" ) @@ -45,6 +46,8 @@ type Allocator struct { allocatedDevices []DeviceID classLister resourcelisters.DeviceClassLister slices []*resourceapi.ResourceSlice + celCache *CELCache + celMutex keymutex.KeyMutex } // NewAllocator returns an allocator for a certain set of claims or an error if @@ -58,6 +61,7 @@ func NewAllocator(ctx context.Context, allocatedDevices []DeviceID, classLister resourcelisters.DeviceClassLister, slices []*resourceapi.ResourceSlice, + celCache *CELCache, ) (*Allocator, error) { return &Allocator{ adminAccessEnabled: adminAccessEnabled, @@ -65,6 +69,8 @@ func NewAllocator(ctx context.Context, allocatedDevices: allocatedDevices, classLister: classLister, slices: slices, + celCache: celCache, + celMutex: keymutex.NewHashed(0), }, nil } @@ -73,6 +79,25 @@ func (a *Allocator) ClaimsToAllocate() []*resourceapi.ResourceClaim { return a.claimsToAllocate } +func (a *Allocator) compileCELExpression(expression string) cel.CompilationResult { + // Compiling a CEL expression is expensive enough that it is cheaper + // to lock a mutex than doing it several times in parallel. + a.celMutex.LockKey(expression) + //nolint:errcheck // Only returns an error for unknown keys, which isn't the case here. + defer a.celMutex.UnlockKey(expression) + + cached := a.celCache.get(expression) + if cached != nil { + return *cached + } + + expr := cel.GetCompiler().CompileCELExpression(expression, cel.Options{}) + if expr.Error == nil { + a.celCache.add(expression, &expr) + } + return expr +} + // Allocate calculates the allocation(s) for one particular node. // // It returns an error only if some fatal problem occurred. These are errors @@ -697,7 +722,7 @@ func (alloc *allocator) isSelectable(r requestIndices, slice *draapi.ResourceSli func (alloc *allocator) selectorsMatch(r requestIndices, device *draapi.BasicDevice, deviceID DeviceID, class *resourceapi.DeviceClass, selectors []resourceapi.DeviceSelector) (bool, error) { for i, selector := range selectors { - expr := cel.GetCompiler().CompileCELExpression(selector.CEL.Expression, cel.Options{}) + expr := alloc.compileCELExpression(selector.CEL.Expression) if expr.Error != nil { // Could happen if some future apiserver accepted some // future expression and then got downgraded. Normally diff --git a/staging/src/k8s.io/dynamic-resource-allocation/structured/allocator_test.go b/staging/src/k8s.io/dynamic-resource-allocation/structured/allocator_test.go index 6b2816d47fa..c9d87325bdb 100644 --- a/staging/src/k8s.io/dynamic-resource-allocation/structured/allocator_test.go +++ b/staging/src/k8s.io/dynamic-resource-allocation/structured/allocator_test.go @@ -1375,7 +1375,7 @@ func TestAllocator(t *testing.T) { allocatedDevices := slices.Clone(tc.allocatedDevices) slices := slices.Clone(tc.slices) - allocator, err := NewAllocator(ctx, tc.adminAccess, claimsToAllocate, allocatedDevices, classLister, slices) + allocator, err := NewAllocator(ctx, tc.adminAccess, claimsToAllocate, allocatedDevices, classLister, slices, NewCELCache(1)) g.Expect(err).ToNot(gomega.HaveOccurred()) results, err := allocator.Allocate(ctx, tc.node) diff --git a/staging/src/k8s.io/dynamic-resource-allocation/structured/celcache.go b/staging/src/k8s.io/dynamic-resource-allocation/structured/celcache.go new file mode 100644 index 00000000000..f4c84180fe5 --- /dev/null +++ b/staging/src/k8s.io/dynamic-resource-allocation/structured/celcache.go @@ -0,0 +1,54 @@ +/* +Copyright 2024 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package structured + +import ( + "sync" + + "github.com/golang/groupcache/lru" + + "k8s.io/dynamic-resource-allocation/cel" +) + +// CELCache is an LRU cache for a compiled CEL expression. +type CELCache struct { + mutex sync.RWMutex + cache *lru.Cache +} + +// NewCELCache returns a CELCache +func NewCELCache(maxCacheEntries int) *CELCache { + return &CELCache{ + cache: lru.New(maxCacheEntries), + } +} + +func (c *CELCache) add(expression string, expr *cel.CompilationResult) { + c.mutex.Lock() + defer c.mutex.Unlock() + c.cache.Add(expression, expr) +} + +func (c *CELCache) get(expression string) *cel.CompilationResult { + c.mutex.RLock() + defer c.mutex.RUnlock() + expr, found := c.cache.Get(expression) + if !found { + return nil + } + return expr.(*cel.CompilationResult) +} diff --git a/test/integration/scheduler_perf/dra.go b/test/integration/scheduler_perf/dra.go index 8b25d9895dc..ce161911eab 100644 --- a/test/integration/scheduler_perf/dra.go +++ b/test/integration/scheduler_perf/dra.go @@ -290,6 +290,7 @@ func (op *allocResourceClaimsOp) run(tCtx ktesting.TContext) { reflect.TypeOf(&v1.Node{}): true, } require.Equal(tCtx, expectSyncedInformers, syncedInformers, "synced informers") + celCache := structured.NewCELCache(10) // The set of nodes is assumed to be fixed at this point. nodes, err := nodeLister.List(labels.Everything()) @@ -319,7 +320,7 @@ claims: } } - allocator, err := structured.NewAllocator(tCtx, utilfeature.DefaultFeatureGate.Enabled(features.DRAAdminAccess), []*resourceapi.ResourceClaim{claim}, allocatedDevices, classLister, slices) + allocator, err := structured.NewAllocator(tCtx, utilfeature.DefaultFeatureGate.Enabled(features.DRAAdminAccess), []*resourceapi.ResourceClaim{claim}, allocatedDevices, classLister, slices, celCache) tCtx.ExpectNoError(err, "create allocator") rand.Shuffle(len(nodes), func(i, j int) {