From 7863d9a3819bfae228215d94982dab50748be56c Mon Sep 17 00:00:00 2001 From: Patrick Ohly Date: Wed, 30 Oct 2024 10:36:56 +0100 Subject: [PATCH] DRA scheduler: refactor CEL compilation cache A better place is the cel package because a) the name can become shorter and b) it is tightly coupled with the compiler there. Moving the compilation into the cache simplifies the callers. --- .../dynamicresources/dynamicresources.go | 5 +- .../dynamic-resource-allocation/cel/cache.go | 79 +++++++++++++++ .../cel/cache_test.go | 98 +++++++++++++++++++ .../k8s.io/dynamic-resource-allocation/go.mod | 2 +- .../structured/allocator.go | 28 +----- .../structured/allocator_test.go | 3 +- .../structured/celcache.go | 54 ---------- test/integration/scheduler_perf/dra.go | 3 +- 8 files changed, 188 insertions(+), 84 deletions(-) create mode 100644 staging/src/k8s.io/dynamic-resource-allocation/cel/cache.go create mode 100644 staging/src/k8s.io/dynamic-resource-allocation/cel/cache_test.go delete mode 100644 staging/src/k8s.io/dynamic-resource-allocation/structured/celcache.go diff --git a/pkg/scheduler/framework/plugins/dynamicresources/dynamicresources.go b/pkg/scheduler/framework/plugins/dynamicresources/dynamicresources.go index 8bc5a44891e..0427e6deb08 100644 --- a/pkg/scheduler/framework/plugins/dynamicresources/dynamicresources.go +++ b/pkg/scheduler/framework/plugins/dynamicresources/dynamicresources.go @@ -38,6 +38,7 @@ import ( resourcelisters "k8s.io/client-go/listers/resource/v1alpha3" "k8s.io/client-go/util/retry" "k8s.io/component-helpers/scheduling/corev1/nodeaffinity" + "k8s.io/dynamic-resource-allocation/cel" "k8s.io/dynamic-resource-allocation/resourceclaim" "k8s.io/dynamic-resource-allocation/structured" "k8s.io/klog/v2" @@ -111,7 +112,7 @@ type DynamicResources struct { clientset kubernetes.Interface classLister resourcelisters.DeviceClassLister sliceLister resourcelisters.ResourceSliceLister - celCache *structured.CELCache + celCache *cel.Cache allocatedDevices *allocatedDevices // claimAssumeCache enables temporarily storing a newer claim object @@ -192,7 +193,7 @@ func New(ctx context.Context, plArgs runtime.Object, fh framework.Handle, fts fe // This is a LRU cache for compiled CEL expressions. The most // recent 10 of them get reused across different scheduling // cycles. - celCache: structured.NewCELCache(10), + celCache: cel.NewCache(10), allocatedDevices: newAllocatedDevices(logger), } diff --git a/staging/src/k8s.io/dynamic-resource-allocation/cel/cache.go b/staging/src/k8s.io/dynamic-resource-allocation/cel/cache.go new file mode 100644 index 00000000000..2868886c5bb --- /dev/null +++ b/staging/src/k8s.io/dynamic-resource-allocation/cel/cache.go @@ -0,0 +1,79 @@ +/* +Copyright 2024 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package cel + +import ( + "sync" + + "k8s.io/utils/keymutex" + "k8s.io/utils/lru" +) + +// Cache is a thread-safe LRU cache for a compiled CEL expression. +type Cache struct { + compileMutex keymutex.KeyMutex + cacheMutex sync.RWMutex + cache *lru.Cache +} + +// NewCache creates a cache. The maximum number of entries determines +// how many entries are cached at most before dropping the oldest +// entry. +func NewCache(maxCacheEntries int) *Cache { + return &Cache{ + compileMutex: keymutex.NewHashed(0), + cache: lru.New(maxCacheEntries), + } +} + +// GetOrCompile checks whether the cache already has a compilation result +// and returns that if available. Otherwise it compiles, stores successful +// results and returns the new result. +func (c *Cache) GetOrCompile(expression string) CompilationResult { + // Compiling a CEL expression is expensive enough that it is cheaper + // to lock a mutex than doing it several times in parallel. + c.compileMutex.LockKey(expression) + //nolint:errcheck // Only returns an error for unknown keys, which isn't the case here. + defer c.compileMutex.UnlockKey(expression) + + cached := c.get(expression) + if cached != nil { + return *cached + } + + expr := GetCompiler().CompileCELExpression(expression, Options{}) + if expr.Error == nil { + c.add(expression, &expr) + } + return expr +} + +func (c *Cache) add(expression string, expr *CompilationResult) { + c.cacheMutex.Lock() + defer c.cacheMutex.Unlock() + c.cache.Add(expression, expr) +} + +func (c *Cache) get(expression string) *CompilationResult { + c.cacheMutex.RLock() + defer c.cacheMutex.RUnlock() + expr, found := c.cache.Get(expression) + if !found { + return nil + } + return expr.(*CompilationResult) +} diff --git a/staging/src/k8s.io/dynamic-resource-allocation/cel/cache_test.go b/staging/src/k8s.io/dynamic-resource-allocation/cel/cache_test.go new file mode 100644 index 00000000000..3c68a94db82 --- /dev/null +++ b/staging/src/k8s.io/dynamic-resource-allocation/cel/cache_test.go @@ -0,0 +1,98 @@ +/* +Copyright 2024 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package cel + +import ( + "fmt" + "sync" + "testing" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +func TestCacheSemantic(t *testing.T) { + // Cache two entries. + // + // Entries are comparable structs with pointers inside. Each + // compilation leads to different pointers, so the entries can be + // compared by value to figure out whether an entry was cached or + // compiled anew. + cache := NewCache(2) + + // Successful compilations get cached. + resultTrue := cache.GetOrCompile("true") + require.Nil(t, resultTrue.Error) + resultTrueAgain := cache.GetOrCompile("true") + if resultTrue != resultTrueAgain { + t.Fatal("result of compiling `true` should have been cached") + } + + // Unsuccessful ones don't. + resultFailed := cache.GetOrCompile("no-such-variable") + require.NotNil(t, resultFailed.Error) + resultFailedAgain := cache.GetOrCompile("no-such-variable") + if resultFailed == resultFailedAgain { + t.Fatal("result of compiling `no-such-variable` should not have been cached") + } + + // The cache can hold a second result. + resultFalse := cache.GetOrCompile("false") + require.Nil(t, resultFalse.Error) + resultFalseAgain := cache.GetOrCompile("false") + if resultFalse != resultFalseAgain { + t.Fatal("result of compiling `false` should have been cached") + } + resultTrueAgain = cache.GetOrCompile("true") + if resultTrue != resultTrueAgain { + t.Fatal("result of compiling `true` should still have been cached") + } + + // A third result pushes out the least recently used one. + resultOther := cache.GetOrCompile("false && true") + require.Nil(t, resultFalse.Error) + resultOtherAgain := cache.GetOrCompile("false && true") + if resultOther != resultOtherAgain { + t.Fatal("result of compiling `false && true` should have been cached") + } + resultFalseAgain = cache.GetOrCompile("false") + if resultFalse == resultFalseAgain { + t.Fatal("result of compiling `false` should have been evicted from the cache") + } +} + +func TestCacheConcurrency(t *testing.T) { + // There's no guarantee that concurrent use of the cache would really + // trigger the race detector in `go test -race`, but in practice + // it does when not using the cacheMutex. + // + // The compileMutex ony affects performance and thus cannot be tested + // without benchmarking. + numWorkers := 10 + + cache := NewCache(2) + var wg sync.WaitGroup + wg.Add(numWorkers) + for i := 0; i < numWorkers; i++ { + go func(i int) { + defer wg.Done() + result := cache.GetOrCompile(fmt.Sprintf("%d == %d", i, i)) + assert.Nil(t, result.Error) + }(i) + } + wg.Wait() +} diff --git a/staging/src/k8s.io/dynamic-resource-allocation/go.mod b/staging/src/k8s.io/dynamic-resource-allocation/go.mod index 50a1a1b356d..8cbfbf386db 100644 --- a/staging/src/k8s.io/dynamic-resource-allocation/go.mod +++ b/staging/src/k8s.io/dynamic-resource-allocation/go.mod @@ -8,7 +8,6 @@ godebug default=go1.23 require ( github.com/blang/semver/v4 v4.0.0 - github.com/golang/groupcache v0.0.0-20210331224755-41bb18bfe9da github.com/google/cel-go v0.21.0 github.com/google/go-cmp v0.6.0 github.com/onsi/gomega v1.33.1 @@ -37,6 +36,7 @@ require ( github.com/go-openapi/jsonreference v0.20.2 // indirect github.com/go-openapi/swag v0.23.0 // indirect github.com/gogo/protobuf v1.3.2 // indirect + github.com/golang/groupcache v0.0.0-20210331224755-41bb18bfe9da // indirect github.com/golang/protobuf v1.5.4 // indirect github.com/google/gnostic-models v0.6.8 // indirect github.com/google/gofuzz v1.2.0 // indirect diff --git a/staging/src/k8s.io/dynamic-resource-allocation/structured/allocator.go b/staging/src/k8s.io/dynamic-resource-allocation/structured/allocator.go index e858c77f1b5..f4c17487672 100644 --- a/staging/src/k8s.io/dynamic-resource-allocation/structured/allocator.go +++ b/staging/src/k8s.io/dynamic-resource-allocation/structured/allocator.go @@ -30,7 +30,6 @@ import ( draapi "k8s.io/dynamic-resource-allocation/api" "k8s.io/dynamic-resource-allocation/cel" "k8s.io/klog/v2" - "k8s.io/utils/keymutex" "k8s.io/utils/ptr" ) @@ -46,8 +45,7 @@ type Allocator struct { allocatedDevices sets.Set[DeviceID] classLister resourcelisters.DeviceClassLister slices []*resourceapi.ResourceSlice - celCache *CELCache - celMutex keymutex.KeyMutex + celCache *cel.Cache } // NewAllocator returns an allocator for a certain set of claims or an error if @@ -60,7 +58,7 @@ func NewAllocator(ctx context.Context, allocatedDevices sets.Set[DeviceID], classLister resourcelisters.DeviceClassLister, slices []*resourceapi.ResourceSlice, - celCache *CELCache, + celCache *cel.Cache, ) (*Allocator, error) { return &Allocator{ adminAccessEnabled: adminAccessEnabled, @@ -69,7 +67,6 @@ func NewAllocator(ctx context.Context, classLister: classLister, slices: slices, celCache: celCache, - celMutex: keymutex.NewHashed(0), }, nil } @@ -78,25 +75,6 @@ func (a *Allocator) ClaimsToAllocate() []*resourceapi.ResourceClaim { return a.claimsToAllocate } -func (a *Allocator) compileCELExpression(expression string) cel.CompilationResult { - // Compiling a CEL expression is expensive enough that it is cheaper - // to lock a mutex than doing it several times in parallel. - a.celMutex.LockKey(expression) - //nolint:errcheck // Only returns an error for unknown keys, which isn't the case here. - defer a.celMutex.UnlockKey(expression) - - cached := a.celCache.get(expression) - if cached != nil { - return *cached - } - - expr := cel.GetCompiler().CompileCELExpression(expression, cel.Options{}) - if expr.Error == nil { - a.celCache.add(expression, &expr) - } - return expr -} - // Allocate calculates the allocation(s) for one particular node. // // It returns an error only if some fatal problem occurred. These are errors @@ -713,7 +691,7 @@ func (alloc *allocator) isSelectable(r requestIndices, slice *draapi.ResourceSli func (alloc *allocator) selectorsMatch(r requestIndices, device *draapi.BasicDevice, deviceID DeviceID, class *resourceapi.DeviceClass, selectors []resourceapi.DeviceSelector) (bool, error) { for i, selector := range selectors { - expr := alloc.compileCELExpression(selector.CEL.Expression) + expr := alloc.celCache.GetOrCompile(selector.CEL.Expression) if expr.Error != nil { // Could happen if some future apiserver accepted some // future expression and then got downgraded. Normally diff --git a/staging/src/k8s.io/dynamic-resource-allocation/structured/allocator_test.go b/staging/src/k8s.io/dynamic-resource-allocation/structured/allocator_test.go index e647660a3fe..8684108cdb2 100644 --- a/staging/src/k8s.io/dynamic-resource-allocation/structured/allocator_test.go +++ b/staging/src/k8s.io/dynamic-resource-allocation/structured/allocator_test.go @@ -37,6 +37,7 @@ import ( "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/runtime/schema" "k8s.io/apimachinery/pkg/util/sets" + "k8s.io/dynamic-resource-allocation/cel" "k8s.io/klog/v2/ktesting" "k8s.io/utils/ptr" ) @@ -1376,7 +1377,7 @@ func TestAllocator(t *testing.T) { allocatedDevices := slices.Clone(tc.allocatedDevices) slices := slices.Clone(tc.slices) - allocator, err := NewAllocator(ctx, tc.adminAccess, claimsToAllocate, sets.New(allocatedDevices...), classLister, slices, NewCELCache(1)) + allocator, err := NewAllocator(ctx, tc.adminAccess, claimsToAllocate, sets.New(allocatedDevices...), classLister, slices, cel.NewCache(1)) g.Expect(err).ToNot(gomega.HaveOccurred()) results, err := allocator.Allocate(ctx, tc.node) diff --git a/staging/src/k8s.io/dynamic-resource-allocation/structured/celcache.go b/staging/src/k8s.io/dynamic-resource-allocation/structured/celcache.go deleted file mode 100644 index f4c84180fe5..00000000000 --- a/staging/src/k8s.io/dynamic-resource-allocation/structured/celcache.go +++ /dev/null @@ -1,54 +0,0 @@ -/* -Copyright 2024 The Kubernetes Authors. - -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. -*/ - -package structured - -import ( - "sync" - - "github.com/golang/groupcache/lru" - - "k8s.io/dynamic-resource-allocation/cel" -) - -// CELCache is an LRU cache for a compiled CEL expression. -type CELCache struct { - mutex sync.RWMutex - cache *lru.Cache -} - -// NewCELCache returns a CELCache -func NewCELCache(maxCacheEntries int) *CELCache { - return &CELCache{ - cache: lru.New(maxCacheEntries), - } -} - -func (c *CELCache) add(expression string, expr *cel.CompilationResult) { - c.mutex.Lock() - defer c.mutex.Unlock() - c.cache.Add(expression, expr) -} - -func (c *CELCache) get(expression string) *cel.CompilationResult { - c.mutex.RLock() - defer c.mutex.RUnlock() - expr, found := c.cache.Get(expression) - if !found { - return nil - } - return expr.(*cel.CompilationResult) -} diff --git a/test/integration/scheduler_perf/dra.go b/test/integration/scheduler_perf/dra.go index 143f269da9e..a4cb1cfe905 100644 --- a/test/integration/scheduler_perf/dra.go +++ b/test/integration/scheduler_perf/dra.go @@ -34,6 +34,7 @@ import ( utilfeature "k8s.io/apiserver/pkg/util/feature" "k8s.io/client-go/informers" "k8s.io/client-go/util/workqueue" + "k8s.io/dynamic-resource-allocation/cel" "k8s.io/dynamic-resource-allocation/structured" "k8s.io/kubernetes/pkg/features" "k8s.io/kubernetes/pkg/scheduler/util/assumecache" @@ -291,7 +292,7 @@ func (op *allocResourceClaimsOp) run(tCtx ktesting.TContext) { reflect.TypeOf(&v1.Node{}): true, } require.Equal(tCtx, expectSyncedInformers, syncedInformers, "synced informers") - celCache := structured.NewCELCache(10) + celCache := cel.NewCache(10) // The set of nodes is assumed to be fixed at this point. nodes, err := nodeLister.List(labels.Everything())