Add updateAnyOp to scheduler_perf

This commit is contained in:
Maciej Skoczeń 2024-09-20 12:16:05 +00:00
parent 5fc4e71a30
commit 40154baab0
3 changed files with 187 additions and 14 deletions

View File

@ -104,22 +104,13 @@ func (c *createAny) create(tCtx ktesting.TContext, env map[string]any) {
// Not caching the discovery result isn't very efficient, but good enough when
// createAny isn't done often.
discoveryCache := memory.NewMemCacheClient(tCtx.Client().Discovery())
restMapper := restmapper.NewDeferredDiscoveryRESTMapper(discoveryCache)
gv, err := schema.ParseGroupVersion(obj.GetAPIVersion())
mapping, err := restMappingFromUnstructuredObj(tCtx, obj)
if err != nil {
tCtx.Fatalf("%s: extract group+version from object %q: %v", c.TemplatePath, klog.KObj(obj), err)
tCtx.Fatalf("%s: %v", c.TemplatePath, err)
}
gk := schema.GroupKind{Group: gv.Group, Kind: obj.GetKind()}
resourceClient := tCtx.Dynamic().Resource(mapping.Resource)
create := func() error {
mapping, err := restMapper.RESTMapping(gk, gv.Version)
if err != nil {
// Cached mapping might be stale, refresh on next try.
restMapper.Reset()
return fmt.Errorf("map %q to resource: %v", gk, err)
}
resourceClient := tCtx.Dynamic().Resource(mapping.Resource)
options := metav1.CreateOptions{
// If the YAML input is invalid, then we want the
// apiserver to tell us via an error. This can
@ -129,12 +120,12 @@ func (c *createAny) create(tCtx ktesting.TContext, env map[string]any) {
}
if c.Namespace != "" {
if mapping.Scope.Name() != meta.RESTScopeNameNamespace {
return fmt.Errorf("namespace %q set for %q, but %q has scope %q", c.Namespace, c.TemplatePath, gk, mapping.Scope.Name())
return fmt.Errorf("namespace %q set for %q, but %q has scope %q", c.Namespace, c.TemplatePath, mapping.GroupVersionKind, mapping.Scope.Name())
}
_, err = resourceClient.Namespace(c.Namespace).Create(tCtx, obj, options)
} else {
if mapping.Scope.Name() != meta.RESTScopeNameRoot {
return fmt.Errorf("namespace not set for %q, but %q has scope %q", c.TemplatePath, gk, mapping.Scope.Name())
return fmt.Errorf("namespace not set for %q, but %q has scope %q", c.TemplatePath, mapping.GroupVersionKind, mapping.Scope.Name())
}
_, err = resourceClient.Create(tCtx, obj, options)
}
@ -175,3 +166,21 @@ func getSpecFromTextTemplateFile(path string, env map[string]any, spec interface
return yaml.UnmarshalStrict(buffer.Bytes(), spec)
}
func restMappingFromUnstructuredObj(tCtx ktesting.TContext, obj *unstructured.Unstructured) (*meta.RESTMapping, error) {
discoveryCache := memory.NewMemCacheClient(tCtx.Client().Discovery())
restMapper := restmapper.NewDeferredDiscoveryRESTMapper(discoveryCache)
gv, err := schema.ParseGroupVersion(obj.GetAPIVersion())
if err != nil {
return nil, fmt.Errorf("extract group+version from object %q: %w", klog.KObj(obj), err)
}
gk := schema.GroupKind{Group: gv.Group, Kind: obj.GetKind()}
mapping, err := restMapper.RESTMapping(gk, gv.Version)
if err != nil {
// Cached mapping might be stale, refresh on next try.
restMapper.Reset()
return nil, fmt.Errorf("failed mapping %q to resource: %w", gk, err)
}
return mapping, nil
}

View File

@ -86,6 +86,7 @@ const (
createResourceClaimsOpcode operationCode = "createResourceClaims"
createResourceDriverOpcode operationCode = "createResourceDriver"
churnOpcode operationCode = "churn"
updateAnyOpcode operationCode = "updateAny"
barrierOpcode operationCode = "barrier"
sleepOpcode operationCode = "sleep"
startCollectingMetricsOpcode operationCode = "startCollectingMetrics"
@ -437,6 +438,7 @@ func (op *op) UnmarshalJSON(b []byte) error {
createResourceClaimsOpcode: &createResourceClaimsOp{},
createResourceDriverOpcode: &createResourceDriverOp{},
churnOpcode: &churnOp{},
updateAnyOpcode: &updateAny{},
barrierOpcode: &barrierOp{},
sleepOpcode: &sleepOp{},
startCollectingMetricsOpcode: &startCollectingMetricsOp{},

View File

@ -0,0 +1,162 @@
/*
Copyright 2024 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package benchmark
import (
"fmt"
"time"
"k8s.io/apimachinery/pkg/api/meta"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/kubernetes/test/utils/ktesting"
"k8s.io/utils/ptr"
)
// updateAny defines an op where some object gets updated from a YAML file.
// The nameset can be specified.
type updateAny struct {
// Must match updateAny.
Opcode operationCode
// Namespace the object should be updated in. Must be empty for cluster-scoped objects.
Namespace string
// Path to spec file describing the object to update.
// This will be processed with text/template.
// .Index will be in the range [0, Count-1] when updating
// more than one object. .Count is the total number of objects.
TemplatePath string
// Count determines how many objects get updated. Defaults to 1 if unset.
Count *int
// Template parameter for Count.
CountParam string
// Number of objects to be updated per second.
// If set to 0, all objects are updated at once.
// Optional
UpdatePerSecond int
// Internal field of the struct used for caching the mapping.
cachedMapping *meta.RESTMapping
}
var _ runnableOp = &updateAny{}
func (c *updateAny) isValid(allowParameterization bool) error {
if c.TemplatePath == "" {
return fmt.Errorf("TemplatePath must be set")
}
if c.UpdatePerSecond < 0 {
return fmt.Errorf("invalid UpdatePerSecond=%d; should be non-negative", c.UpdatePerSecond)
}
// The namespace can only be checked during later because we don't know yet
// whether the object is namespaced or cluster-scoped.
return nil
}
func (c *updateAny) collectsMetrics() bool {
return false
}
func (c updateAny) patchParams(w *workload) (realOp, error) {
if c.CountParam != "" {
count, err := w.Params.get(c.CountParam[1:])
if err != nil {
return nil, err
}
c.Count = ptr.To(count)
}
c.cachedMapping = nil
return &c, c.isValid(false)
}
func (c *updateAny) requiredNamespaces() []string {
if c.Namespace == "" {
return nil
}
return []string{c.Namespace}
}
func (c *updateAny) run(tCtx ktesting.TContext) {
count := 1
if c.Count != nil {
count = *c.Count
}
if c.UpdatePerSecond == 0 {
for index := 0; index < count; index++ {
err := c.update(tCtx, map[string]any{"Index": index, "Count": count})
if err != nil {
tCtx.Fatalf("Failed to update object: %w", err)
}
}
return
}
ticker := time.NewTicker(time.Second / time.Duration(c.UpdatePerSecond))
defer ticker.Stop()
for index := 0; index < count; index++ {
select {
case <-ticker.C:
err := c.update(tCtx, map[string]any{"Index": index, "Count": count})
if err != nil {
tCtx.Fatalf("Failed to update object: %w", err)
}
case <-tCtx.Done():
return
}
}
}
func (c *updateAny) update(tCtx ktesting.TContext, env map[string]any) error {
var obj *unstructured.Unstructured
if err := getSpecFromTextTemplateFile(c.TemplatePath, env, &obj); err != nil {
return fmt.Errorf("%s: parsing failed: %w", c.TemplatePath, err)
}
if c.cachedMapping == nil {
mapping, err := restMappingFromUnstructuredObj(tCtx, obj)
if err != nil {
return err
}
c.cachedMapping = mapping
}
resourceClient := tCtx.Dynamic().Resource(c.cachedMapping.Resource)
options := metav1.UpdateOptions{
// If the YAML input is invalid, then we want the
// apiserver to tell us via an error. This can
// happen because decoding into an unstructured object
// doesn't validate.
FieldValidation: "Strict",
}
if c.Namespace != "" {
if c.cachedMapping.Scope.Name() != meta.RESTScopeNameNamespace {
return fmt.Errorf("namespace %q set for %q, but %q has scope %q", c.Namespace, c.TemplatePath, c.cachedMapping.GroupVersionKind, c.cachedMapping.Scope.Name())
}
_, err := resourceClient.Namespace(c.Namespace).Update(tCtx, obj, options)
if err != nil {
return fmt.Errorf("failed to update object in namespace %q: %w", c.Namespace, err)
}
return nil
}
if c.cachedMapping.Scope.Name() != meta.RESTScopeNameRoot {
return fmt.Errorf("namespace not set for %q, but %q has scope %q", c.TemplatePath, c.cachedMapping.GroupVersionKind, c.cachedMapping.Scope.Name())
}
_, err := resourceClient.Update(tCtx, obj, options)
if err != nil {
return fmt.Errorf("failed to update object: %w", err)
}
return nil
}