diff --git a/pkg/scheduler/framework/plugins/dynamicresources/dynamicresources.go b/pkg/scheduler/framework/plugins/dynamicresources/dynamicresources.go index 8bc5a44891e..0427e6deb08 100644 --- a/pkg/scheduler/framework/plugins/dynamicresources/dynamicresources.go +++ b/pkg/scheduler/framework/plugins/dynamicresources/dynamicresources.go @@ -38,6 +38,7 @@ import ( resourcelisters "k8s.io/client-go/listers/resource/v1alpha3" "k8s.io/client-go/util/retry" "k8s.io/component-helpers/scheduling/corev1/nodeaffinity" + "k8s.io/dynamic-resource-allocation/cel" "k8s.io/dynamic-resource-allocation/resourceclaim" "k8s.io/dynamic-resource-allocation/structured" "k8s.io/klog/v2" @@ -111,7 +112,7 @@ type DynamicResources struct { clientset kubernetes.Interface classLister resourcelisters.DeviceClassLister sliceLister resourcelisters.ResourceSliceLister - celCache *structured.CELCache + celCache *cel.Cache allocatedDevices *allocatedDevices // claimAssumeCache enables temporarily storing a newer claim object @@ -192,7 +193,7 @@ func New(ctx context.Context, plArgs runtime.Object, fh framework.Handle, fts fe // This is a LRU cache for compiled CEL expressions. The most // recent 10 of them get reused across different scheduling // cycles. - celCache: structured.NewCELCache(10), + celCache: cel.NewCache(10), allocatedDevices: newAllocatedDevices(logger), } diff --git a/staging/src/k8s.io/dynamic-resource-allocation/cel/cache.go b/staging/src/k8s.io/dynamic-resource-allocation/cel/cache.go new file mode 100644 index 00000000000..2868886c5bb --- /dev/null +++ b/staging/src/k8s.io/dynamic-resource-allocation/cel/cache.go @@ -0,0 +1,79 @@ +/* +Copyright 2024 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package cel + +import ( + "sync" + + "k8s.io/utils/keymutex" + "k8s.io/utils/lru" +) + +// Cache is a thread-safe LRU cache for a compiled CEL expression. +type Cache struct { + compileMutex keymutex.KeyMutex + cacheMutex sync.RWMutex + cache *lru.Cache +} + +// NewCache creates a cache. The maximum number of entries determines +// how many entries are cached at most before dropping the oldest +// entry. +func NewCache(maxCacheEntries int) *Cache { + return &Cache{ + compileMutex: keymutex.NewHashed(0), + cache: lru.New(maxCacheEntries), + } +} + +// GetOrCompile checks whether the cache already has a compilation result +// and returns that if available. Otherwise it compiles, stores successful +// results and returns the new result. +func (c *Cache) GetOrCompile(expression string) CompilationResult { + // Compiling a CEL expression is expensive enough that it is cheaper + // to lock a mutex than doing it several times in parallel. + c.compileMutex.LockKey(expression) + //nolint:errcheck // Only returns an error for unknown keys, which isn't the case here. + defer c.compileMutex.UnlockKey(expression) + + cached := c.get(expression) + if cached != nil { + return *cached + } + + expr := GetCompiler().CompileCELExpression(expression, Options{}) + if expr.Error == nil { + c.add(expression, &expr) + } + return expr +} + +func (c *Cache) add(expression string, expr *CompilationResult) { + c.cacheMutex.Lock() + defer c.cacheMutex.Unlock() + c.cache.Add(expression, expr) +} + +func (c *Cache) get(expression string) *CompilationResult { + c.cacheMutex.RLock() + defer c.cacheMutex.RUnlock() + expr, found := c.cache.Get(expression) + if !found { + return nil + } + return expr.(*CompilationResult) +} diff --git a/staging/src/k8s.io/dynamic-resource-allocation/cel/cache_test.go b/staging/src/k8s.io/dynamic-resource-allocation/cel/cache_test.go new file mode 100644 index 00000000000..3c68a94db82 --- /dev/null +++ b/staging/src/k8s.io/dynamic-resource-allocation/cel/cache_test.go @@ -0,0 +1,98 @@ +/* +Copyright 2024 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package cel + +import ( + "fmt" + "sync" + "testing" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +func TestCacheSemantic(t *testing.T) { + // Cache two entries. + // + // Entries are comparable structs with pointers inside. Each + // compilation leads to different pointers, so the entries can be + // compared by value to figure out whether an entry was cached or + // compiled anew. + cache := NewCache(2) + + // Successful compilations get cached. + resultTrue := cache.GetOrCompile("true") + require.Nil(t, resultTrue.Error) + resultTrueAgain := cache.GetOrCompile("true") + if resultTrue != resultTrueAgain { + t.Fatal("result of compiling `true` should have been cached") + } + + // Unsuccessful ones don't. + resultFailed := cache.GetOrCompile("no-such-variable") + require.NotNil(t, resultFailed.Error) + resultFailedAgain := cache.GetOrCompile("no-such-variable") + if resultFailed == resultFailedAgain { + t.Fatal("result of compiling `no-such-variable` should not have been cached") + } + + // The cache can hold a second result. + resultFalse := cache.GetOrCompile("false") + require.Nil(t, resultFalse.Error) + resultFalseAgain := cache.GetOrCompile("false") + if resultFalse != resultFalseAgain { + t.Fatal("result of compiling `false` should have been cached") + } + resultTrueAgain = cache.GetOrCompile("true") + if resultTrue != resultTrueAgain { + t.Fatal("result of compiling `true` should still have been cached") + } + + // A third result pushes out the least recently used one. + resultOther := cache.GetOrCompile("false && true") + require.Nil(t, resultFalse.Error) + resultOtherAgain := cache.GetOrCompile("false && true") + if resultOther != resultOtherAgain { + t.Fatal("result of compiling `false && true` should have been cached") + } + resultFalseAgain = cache.GetOrCompile("false") + if resultFalse == resultFalseAgain { + t.Fatal("result of compiling `false` should have been evicted from the cache") + } +} + +func TestCacheConcurrency(t *testing.T) { + // There's no guarantee that concurrent use of the cache would really + // trigger the race detector in `go test -race`, but in practice + // it does when not using the cacheMutex. + // + // The compileMutex ony affects performance and thus cannot be tested + // without benchmarking. + numWorkers := 10 + + cache := NewCache(2) + var wg sync.WaitGroup + wg.Add(numWorkers) + for i := 0; i < numWorkers; i++ { + go func(i int) { + defer wg.Done() + result := cache.GetOrCompile(fmt.Sprintf("%d == %d", i, i)) + assert.Nil(t, result.Error) + }(i) + } + wg.Wait() +} diff --git a/staging/src/k8s.io/dynamic-resource-allocation/go.mod b/staging/src/k8s.io/dynamic-resource-allocation/go.mod index 50a1a1b356d..8cbfbf386db 100644 --- a/staging/src/k8s.io/dynamic-resource-allocation/go.mod +++ b/staging/src/k8s.io/dynamic-resource-allocation/go.mod @@ -8,7 +8,6 @@ godebug default=go1.23 require ( github.com/blang/semver/v4 v4.0.0 - github.com/golang/groupcache v0.0.0-20210331224755-41bb18bfe9da github.com/google/cel-go v0.21.0 github.com/google/go-cmp v0.6.0 github.com/onsi/gomega v1.33.1 @@ -37,6 +36,7 @@ require ( github.com/go-openapi/jsonreference v0.20.2 // indirect github.com/go-openapi/swag v0.23.0 // indirect github.com/gogo/protobuf v1.3.2 // indirect + github.com/golang/groupcache v0.0.0-20210331224755-41bb18bfe9da // indirect github.com/golang/protobuf v1.5.4 // indirect github.com/google/gnostic-models v0.6.8 // indirect github.com/google/gofuzz v1.2.0 // indirect diff --git a/staging/src/k8s.io/dynamic-resource-allocation/structured/allocator.go b/staging/src/k8s.io/dynamic-resource-allocation/structured/allocator.go index e858c77f1b5..f4c17487672 100644 --- a/staging/src/k8s.io/dynamic-resource-allocation/structured/allocator.go +++ b/staging/src/k8s.io/dynamic-resource-allocation/structured/allocator.go @@ -30,7 +30,6 @@ import ( draapi "k8s.io/dynamic-resource-allocation/api" "k8s.io/dynamic-resource-allocation/cel" "k8s.io/klog/v2" - "k8s.io/utils/keymutex" "k8s.io/utils/ptr" ) @@ -46,8 +45,7 @@ type Allocator struct { allocatedDevices sets.Set[DeviceID] classLister resourcelisters.DeviceClassLister slices []*resourceapi.ResourceSlice - celCache *CELCache - celMutex keymutex.KeyMutex + celCache *cel.Cache } // NewAllocator returns an allocator for a certain set of claims or an error if @@ -60,7 +58,7 @@ func NewAllocator(ctx context.Context, allocatedDevices sets.Set[DeviceID], classLister resourcelisters.DeviceClassLister, slices []*resourceapi.ResourceSlice, - celCache *CELCache, + celCache *cel.Cache, ) (*Allocator, error) { return &Allocator{ adminAccessEnabled: adminAccessEnabled, @@ -69,7 +67,6 @@ func NewAllocator(ctx context.Context, classLister: classLister, slices: slices, celCache: celCache, - celMutex: keymutex.NewHashed(0), }, nil } @@ -78,25 +75,6 @@ func (a *Allocator) ClaimsToAllocate() []*resourceapi.ResourceClaim { return a.claimsToAllocate } -func (a *Allocator) compileCELExpression(expression string) cel.CompilationResult { - // Compiling a CEL expression is expensive enough that it is cheaper - // to lock a mutex than doing it several times in parallel. - a.celMutex.LockKey(expression) - //nolint:errcheck // Only returns an error for unknown keys, which isn't the case here. - defer a.celMutex.UnlockKey(expression) - - cached := a.celCache.get(expression) - if cached != nil { - return *cached - } - - expr := cel.GetCompiler().CompileCELExpression(expression, cel.Options{}) - if expr.Error == nil { - a.celCache.add(expression, &expr) - } - return expr -} - // Allocate calculates the allocation(s) for one particular node. // // It returns an error only if some fatal problem occurred. These are errors @@ -713,7 +691,7 @@ func (alloc *allocator) isSelectable(r requestIndices, slice *draapi.ResourceSli func (alloc *allocator) selectorsMatch(r requestIndices, device *draapi.BasicDevice, deviceID DeviceID, class *resourceapi.DeviceClass, selectors []resourceapi.DeviceSelector) (bool, error) { for i, selector := range selectors { - expr := alloc.compileCELExpression(selector.CEL.Expression) + expr := alloc.celCache.GetOrCompile(selector.CEL.Expression) if expr.Error != nil { // Could happen if some future apiserver accepted some // future expression and then got downgraded. Normally diff --git a/staging/src/k8s.io/dynamic-resource-allocation/structured/allocator_test.go b/staging/src/k8s.io/dynamic-resource-allocation/structured/allocator_test.go index e647660a3fe..8684108cdb2 100644 --- a/staging/src/k8s.io/dynamic-resource-allocation/structured/allocator_test.go +++ b/staging/src/k8s.io/dynamic-resource-allocation/structured/allocator_test.go @@ -37,6 +37,7 @@ import ( "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/runtime/schema" "k8s.io/apimachinery/pkg/util/sets" + "k8s.io/dynamic-resource-allocation/cel" "k8s.io/klog/v2/ktesting" "k8s.io/utils/ptr" ) @@ -1376,7 +1377,7 @@ func TestAllocator(t *testing.T) { allocatedDevices := slices.Clone(tc.allocatedDevices) slices := slices.Clone(tc.slices) - allocator, err := NewAllocator(ctx, tc.adminAccess, claimsToAllocate, sets.New(allocatedDevices...), classLister, slices, NewCELCache(1)) + allocator, err := NewAllocator(ctx, tc.adminAccess, claimsToAllocate, sets.New(allocatedDevices...), classLister, slices, cel.NewCache(1)) g.Expect(err).ToNot(gomega.HaveOccurred()) results, err := allocator.Allocate(ctx, tc.node) diff --git a/staging/src/k8s.io/dynamic-resource-allocation/structured/celcache.go b/staging/src/k8s.io/dynamic-resource-allocation/structured/celcache.go deleted file mode 100644 index f4c84180fe5..00000000000 --- a/staging/src/k8s.io/dynamic-resource-allocation/structured/celcache.go +++ /dev/null @@ -1,54 +0,0 @@ -/* -Copyright 2024 The Kubernetes Authors. - -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. -*/ - -package structured - -import ( - "sync" - - "github.com/golang/groupcache/lru" - - "k8s.io/dynamic-resource-allocation/cel" -) - -// CELCache is an LRU cache for a compiled CEL expression. -type CELCache struct { - mutex sync.RWMutex - cache *lru.Cache -} - -// NewCELCache returns a CELCache -func NewCELCache(maxCacheEntries int) *CELCache { - return &CELCache{ - cache: lru.New(maxCacheEntries), - } -} - -func (c *CELCache) add(expression string, expr *cel.CompilationResult) { - c.mutex.Lock() - defer c.mutex.Unlock() - c.cache.Add(expression, expr) -} - -func (c *CELCache) get(expression string) *cel.CompilationResult { - c.mutex.RLock() - defer c.mutex.RUnlock() - expr, found := c.cache.Get(expression) - if !found { - return nil - } - return expr.(*cel.CompilationResult) -} diff --git a/test/integration/scheduler_perf/dra.go b/test/integration/scheduler_perf/dra.go index 143f269da9e..a4cb1cfe905 100644 --- a/test/integration/scheduler_perf/dra.go +++ b/test/integration/scheduler_perf/dra.go @@ -34,6 +34,7 @@ import ( utilfeature "k8s.io/apiserver/pkg/util/feature" "k8s.io/client-go/informers" "k8s.io/client-go/util/workqueue" + "k8s.io/dynamic-resource-allocation/cel" "k8s.io/dynamic-resource-allocation/structured" "k8s.io/kubernetes/pkg/features" "k8s.io/kubernetes/pkg/scheduler/util/assumecache" @@ -291,7 +292,7 @@ func (op *allocResourceClaimsOp) run(tCtx ktesting.TContext) { reflect.TypeOf(&v1.Node{}): true, } require.Equal(tCtx, expectSyncedInformers, syncedInformers, "synced informers") - celCache := structured.NewCELCache(10) + celCache := cel.NewCache(10) // The set of nodes is assumed to be fixed at this point. nodes, err := nodeLister.List(labels.Everything())