DRA scheduler: cache compiled CEL expressions

DeviceClasses and different requests are very likely to contain the same
expression string. We don't need to compile that over and over again.

To avoid hanging onto that cache longer than necessary, it's currently tied to
each PreFilter/Filter combination. It might make sense to move this up into the
scheduler plugin and thus reuse compiled expressions for different pods.

    goos: linux
    goarch: amd64
    pkg: k8s.io/kubernetes/test/integration/scheduler_perf
    cpu: Intel(R) Core(TM) i9-7980XE CPU @ 2.60GHz
                                                                                       │            before            │                        after                        │
                                                                                       │ SchedulingThroughput/Average │ SchedulingThroughput/Average  vs base               │
    PerfScheduling/SchedulingWithResourceClaimTemplateStructured/5000pods_500nodes-36                      33.95 ± 4%                     36.65 ± 2%   +7.95% (p=0.002 n=6)
    PerfScheduling/SteadyStateClusterResourceClaimTemplateStructured/empty_100nodes-36                     105.8 ± 2%                     106.7 ± 3%        ~ (p=0.177 n=6)
    PerfScheduling/SteadyStateClusterResourceClaimTemplateStructured/empty_500nodes-36                     100.7 ± 1%                     119.7 ± 3%  +18.82% (p=0.002 n=6)
    PerfScheduling/SteadyStateClusterResourceClaimTemplateStructured/half_100nodes-36                      90.78 ± 1%                    121.10 ± 4%  +33.40% (p=0.002 n=6)
    PerfScheduling/SteadyStateClusterResourceClaimTemplateStructured/half_500nodes-36                      50.51 ± 7%                     63.72 ± 3%  +26.17% (p=0.002 n=6)
    PerfScheduling/SteadyStateClusterResourceClaimTemplateStructured/full_100nodes-36                      103.7 ± 5%                     110.2 ± 2%   +6.32% (p=0.002 n=6)
    PerfScheduling/SteadyStateClusterResourceClaimTemplateStructured/full_500nodes-36                      28.50 ± 2%                     28.16 ± 5%        ~ (p=0.102 n=6)
    geomean                                                                                                64.99                          73.15       +12.56%
This commit is contained in:
Patrick Ohly 2024-09-04 18:52:58 +02:00
parent 941d17b3b8
commit 814c9428fd
6 changed files with 91 additions and 5 deletions

View File

@ -112,6 +112,7 @@ type DynamicResources struct {
clientset kubernetes.Interface
classLister resourcelisters.DeviceClassLister
sliceLister resourcelisters.ResourceSliceLister
celCache *structured.CELCache
// claimAssumeCache enables temporarily storing a newer claim object
// while the scheduler has allocated it and the corresponding object
@ -186,6 +187,11 @@ func New(ctx context.Context, plArgs runtime.Object, fh framework.Handle, fts fe
classLister: fh.SharedInformerFactory().Resource().V1alpha3().DeviceClasses().Lister(),
sliceLister: fh.SharedInformerFactory().Resource().V1alpha3().ResourceSlices().Lister(),
claimAssumeCache: fh.ResourceClaimCache(),
// This is a LRU cache for compiled CEL expressions. The most
// recent 10 of them get reused across different scheduling
// cycles.
celCache: structured.NewCELCache(10),
}
return pl, nil
@ -540,7 +546,7 @@ func (pl *DynamicResources) PreFilter(ctx context.Context, state *framework.Cycl
if err != nil {
return nil, statusError(logger, err)
}
allocator, err := structured.NewAllocator(ctx, pl.enableAdminAccess, allocateClaims, allocatedDevices, pl.classLister, slices)
allocator, err := structured.NewAllocator(ctx, pl.enableAdminAccess, allocateClaims, allocatedDevices, pl.classLister, slices, pl.celCache)
if err != nil {
return nil, statusError(logger, err)
}

View File

@ -8,6 +8,7 @@ godebug default=go1.23
require (
github.com/blang/semver/v4 v4.0.0
github.com/golang/groupcache v0.0.0-20210331224755-41bb18bfe9da
github.com/google/cel-go v0.21.0
github.com/google/go-cmp v0.6.0
github.com/onsi/gomega v1.33.1
@ -36,7 +37,6 @@ require (
github.com/go-openapi/jsonreference v0.20.2 // indirect
github.com/go-openapi/swag v0.23.0 // indirect
github.com/gogo/protobuf v1.3.2 // indirect
github.com/golang/groupcache v0.0.0-20210331224755-41bb18bfe9da // indirect
github.com/golang/protobuf v1.5.4 // indirect
github.com/google/gnostic-models v0.6.8 // indirect
github.com/google/gofuzz v1.2.0 // indirect

View File

@ -30,6 +30,7 @@ import (
draapi "k8s.io/dynamic-resource-allocation/api"
"k8s.io/dynamic-resource-allocation/cel"
"k8s.io/klog/v2"
"k8s.io/utils/keymutex"
"k8s.io/utils/ptr"
)
@ -45,6 +46,8 @@ type Allocator struct {
allocatedDevices []DeviceID
classLister resourcelisters.DeviceClassLister
slices []*resourceapi.ResourceSlice
celCache *CELCache
celMutex keymutex.KeyMutex
}
// NewAllocator returns an allocator for a certain set of claims or an error if
@ -58,6 +61,7 @@ func NewAllocator(ctx context.Context,
allocatedDevices []DeviceID,
classLister resourcelisters.DeviceClassLister,
slices []*resourceapi.ResourceSlice,
celCache *CELCache,
) (*Allocator, error) {
return &Allocator{
adminAccessEnabled: adminAccessEnabled,
@ -65,6 +69,8 @@ func NewAllocator(ctx context.Context,
allocatedDevices: allocatedDevices,
classLister: classLister,
slices: slices,
celCache: celCache,
celMutex: keymutex.NewHashed(0),
}, nil
}
@ -73,6 +79,25 @@ func (a *Allocator) ClaimsToAllocate() []*resourceapi.ResourceClaim {
return a.claimsToAllocate
}
func (a *Allocator) compileCELExpression(expression string) cel.CompilationResult {
// Compiling a CEL expression is expensive enough that it is cheaper
// to lock a mutex than doing it several times in parallel.
a.celMutex.LockKey(expression)
//nolint:errcheck // Only returns an error for unknown keys, which isn't the case here.
defer a.celMutex.UnlockKey(expression)
cached := a.celCache.get(expression)
if cached != nil {
return *cached
}
expr := cel.GetCompiler().CompileCELExpression(expression, cel.Options{})
if expr.Error == nil {
a.celCache.add(expression, &expr)
}
return expr
}
// Allocate calculates the allocation(s) for one particular node.
//
// It returns an error only if some fatal problem occurred. These are errors
@ -697,7 +722,7 @@ func (alloc *allocator) isSelectable(r requestIndices, slice *draapi.ResourceSli
func (alloc *allocator) selectorsMatch(r requestIndices, device *draapi.BasicDevice, deviceID DeviceID, class *resourceapi.DeviceClass, selectors []resourceapi.DeviceSelector) (bool, error) {
for i, selector := range selectors {
expr := cel.GetCompiler().CompileCELExpression(selector.CEL.Expression, cel.Options{})
expr := alloc.compileCELExpression(selector.CEL.Expression)
if expr.Error != nil {
// Could happen if some future apiserver accepted some
// future expression and then got downgraded. Normally

View File

@ -1375,7 +1375,7 @@ func TestAllocator(t *testing.T) {
allocatedDevices := slices.Clone(tc.allocatedDevices)
slices := slices.Clone(tc.slices)
allocator, err := NewAllocator(ctx, tc.adminAccess, claimsToAllocate, allocatedDevices, classLister, slices)
allocator, err := NewAllocator(ctx, tc.adminAccess, claimsToAllocate, allocatedDevices, classLister, slices, NewCELCache(1))
g.Expect(err).ToNot(gomega.HaveOccurred())
results, err := allocator.Allocate(ctx, tc.node)

View File

@ -0,0 +1,54 @@
/*
Copyright 2024 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package structured
import (
"sync"
"github.com/golang/groupcache/lru"
"k8s.io/dynamic-resource-allocation/cel"
)
// CELCache is an LRU cache for a compiled CEL expression.
type CELCache struct {
mutex sync.RWMutex
cache *lru.Cache
}
// NewCELCache returns a CELCache
func NewCELCache(maxCacheEntries int) *CELCache {
return &CELCache{
cache: lru.New(maxCacheEntries),
}
}
func (c *CELCache) add(expression string, expr *cel.CompilationResult) {
c.mutex.Lock()
defer c.mutex.Unlock()
c.cache.Add(expression, expr)
}
func (c *CELCache) get(expression string) *cel.CompilationResult {
c.mutex.RLock()
defer c.mutex.RUnlock()
expr, found := c.cache.Get(expression)
if !found {
return nil
}
return expr.(*cel.CompilationResult)
}

View File

@ -290,6 +290,7 @@ func (op *allocResourceClaimsOp) run(tCtx ktesting.TContext) {
reflect.TypeOf(&v1.Node{}): true,
}
require.Equal(tCtx, expectSyncedInformers, syncedInformers, "synced informers")
celCache := structured.NewCELCache(10)
// The set of nodes is assumed to be fixed at this point.
nodes, err := nodeLister.List(labels.Everything())
@ -319,7 +320,7 @@ claims:
}
}
allocator, err := structured.NewAllocator(tCtx, utilfeature.DefaultFeatureGate.Enabled(features.DRAAdminAccess), []*resourceapi.ResourceClaim{claim}, allocatedDevices, classLister, slices)
allocator, err := structured.NewAllocator(tCtx, utilfeature.DefaultFeatureGate.Enabled(features.DRAAdminAccess), []*resourceapi.ResourceClaim{claim}, allocatedDevices, classLister, slices, celCache)
tCtx.ExpectNoError(err, "create allocator")
rand.Shuffle(len(nodes), func(i, j int) {