diff --git a/cmd/kube-controller-manager/app/controllermanager.go b/cmd/kube-controller-manager/app/controllermanager.go index fb6546e58d0..bb33636c016 100644 --- a/cmd/kube-controller-manager/app/controllermanager.go +++ b/cmd/kube-controller-manager/app/controllermanager.go @@ -475,6 +475,9 @@ func NewControllerInitializers(loopMode ControllerLoopMode) map[string]InitFunc utilfeature.DefaultFeatureGate.Enabled(genericfeatures.StorageVersionAPI) { register("storage-version-gc", startStorageVersionGCController) } + if utilfeature.DefaultFeatureGate.Enabled(kubefeatures.DynamicResourceAllocation) { + controllers["resource-claim-controller"] = startResourceClaimController + } return controllers } diff --git a/cmd/kube-controller-manager/app/core.go b/cmd/kube-controller-manager/app/core.go index 616d4d25200..cdcbd854b67 100644 --- a/cmd/kube-controller-manager/app/core.go +++ b/cmd/kube-controller-manager/app/core.go @@ -54,6 +54,7 @@ import ( lifecyclecontroller "k8s.io/kubernetes/pkg/controller/nodelifecycle" "k8s.io/kubernetes/pkg/controller/podgc" replicationcontroller "k8s.io/kubernetes/pkg/controller/replication" + "k8s.io/kubernetes/pkg/controller/resourceclaim" resourcequotacontroller "k8s.io/kubernetes/pkg/controller/resourcequota" serviceaccountcontroller "k8s.io/kubernetes/pkg/controller/serviceaccount" "k8s.io/kubernetes/pkg/controller/storageversiongc" @@ -357,6 +358,21 @@ func startEphemeralVolumeController(ctx context.Context, controllerContext Contr return nil, true, nil } +const defaultResourceClaimControllerWorkers = 10 + +func startResourceClaimController(ctx context.Context, controllerContext ControllerContext) (controller.Interface, bool, error) { + ephemeralController, err := resourceclaim.NewController( + controllerContext.ClientBuilder.ClientOrDie("resource-claim-controller"), + controllerContext.InformerFactory.Core().V1().Pods(), + controllerContext.InformerFactory.Resource().V1alpha1().ResourceClaims(), + controllerContext.InformerFactory.Resource().V1alpha1().ResourceClaimTemplates()) + if err != nil { + return nil, true, fmt.Errorf("failed to start ephemeral volume controller: %v", err) + } + go ephemeralController.Run(ctx, defaultResourceClaimControllerWorkers) + return nil, true, nil +} + func startEndpointController(ctx context.Context, controllerCtx ControllerContext) (controller.Interface, bool, error) { go endpointcontroller.NewEndpointController( controllerCtx.InformerFactory.Core().V1().Pods(), diff --git a/cmd/kube-controller-manager/app/import_known_versions.go b/cmd/kube-controller-manager/app/import_known_versions.go index a76ce309970..5b3dfa7cd71 100644 --- a/cmd/kube-controller-manager/app/import_known_versions.go +++ b/cmd/kube-controller-manager/app/import_known_versions.go @@ -33,6 +33,7 @@ import ( _ "k8s.io/kubernetes/pkg/apis/extensions/install" _ "k8s.io/kubernetes/pkg/apis/policy/install" _ "k8s.io/kubernetes/pkg/apis/rbac/install" + _ "k8s.io/kubernetes/pkg/apis/resource/install" _ "k8s.io/kubernetes/pkg/apis/scheduling/install" _ "k8s.io/kubernetes/pkg/apis/storage/install" ) diff --git a/go.mod b/go.mod index 58b42e8f898..adc2204b99c 100644 --- a/go.mod +++ b/go.mod @@ -107,6 +107,7 @@ require ( k8s.io/controller-manager v0.0.0 k8s.io/cri-api v0.0.0 k8s.io/csi-translation-lib v0.0.0 + k8s.io/dynamic-resource-allocation v0.0.0 k8s.io/gengo v0.0.0-20220902162205-c0856e24416d k8s.io/klog/v2 v2.80.1 k8s.io/kms v0.0.0 diff --git a/pkg/controller/resourceclaim/OWNERS b/pkg/controller/resourceclaim/OWNERS index 1d5ec5e5051..8a274491d48 100644 --- a/pkg/controller/resourceclaim/OWNERS +++ b/pkg/controller/resourceclaim/OWNERS @@ -1,6 +1,12 @@ # See the OWNERS docs at https://go.k8s.io/owners approvers: - - saad-ali - - jsafrane + - sig-node-approvers + - klueska - pohly +reviewers: + - klueska + - pohly + - bart0sh +labels: + - sig/node diff --git a/pkg/controller/resourceclaim/config/doc.go b/pkg/controller/resourceclaim/config/doc.go deleted file mode 100644 index 01d52859902..00000000000 --- a/pkg/controller/resourceclaim/config/doc.go +++ /dev/null @@ -1,19 +0,0 @@ -/* -Copyright 2021 The Kubernetes Authors. - -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. -*/ - -// +k8s:deepcopy-gen=package - -package config // import "k8s.io/kubernetes/pkg/controller/volume/ephemeral/config" diff --git a/pkg/controller/resourceclaim/config/types.go b/pkg/controller/resourceclaim/config/types.go deleted file mode 100644 index 64af06a1b2b..00000000000 --- a/pkg/controller/resourceclaim/config/types.go +++ /dev/null @@ -1,25 +0,0 @@ -/* -Copyright 2021 The Kubernetes Authors. - -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. -*/ - -package config - -// EphemeralVolumeControllerConfiguration contains elements describing EphemeralVolumeController. -type EphemeralVolumeControllerConfiguration struct { - // ConcurrentEphemeralVolumeSyncs is the number of ephemeral volume syncing operations - // that will be done concurrently. Larger number = faster ephemeral volume updating, - // but more CPU (and network) load. - ConcurrentEphemeralVolumeSyncs int32 -} diff --git a/pkg/controller/resourceclaim/config/v1alpha1/conversion.go b/pkg/controller/resourceclaim/config/v1alpha1/conversion.go deleted file mode 100644 index d3a5fb58e96..00000000000 --- a/pkg/controller/resourceclaim/config/v1alpha1/conversion.go +++ /dev/null @@ -1,40 +0,0 @@ -/* -Copyright 2021 The Kubernetes Authors. - -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. -*/ - -package v1alpha1 - -import ( - "k8s.io/apimachinery/pkg/conversion" - "k8s.io/kube-controller-manager/config/v1alpha1" - "k8s.io/kubernetes/pkg/controller/volume/ephemeral/config" -) - -// Important! The public back-and-forth conversion functions for the types in this package -// with EphemeralVolumeControllerConfiguration types need to be manually exposed like this in order for -// other packages that reference this package to be able to call these conversion functions -// in an autogenerated manner. -// TODO: Fix the bug in conversion-gen so it automatically discovers these Convert_* functions -// in autogenerated code as well. - -// Convert_v1alpha1_EphemeralVolumeControllerConfiguration_To_config_EphemeralVolumeControllerConfiguration is an autogenerated conversion function. -func Convert_v1alpha1_EphemeralVolumeControllerConfiguration_To_config_EphemeralVolumeControllerConfiguration(in *v1alpha1.EphemeralVolumeControllerConfiguration, out *config.EphemeralVolumeControllerConfiguration, s conversion.Scope) error { - return autoConvert_v1alpha1_EphemeralVolumeControllerConfiguration_To_config_EphemeralVolumeControllerConfiguration(in, out, s) -} - -// Convert_config_EphemeralVolumeControllerConfiguration_To_v1alpha1_EphemeralVolumeControllerConfiguration is an autogenerated conversion function. -func Convert_config_EphemeralVolumeControllerConfiguration_To_v1alpha1_EphemeralVolumeControllerConfiguration(in *config.EphemeralVolumeControllerConfiguration, out *v1alpha1.EphemeralVolumeControllerConfiguration, s conversion.Scope) error { - return autoConvert_config_EphemeralVolumeControllerConfiguration_To_v1alpha1_EphemeralVolumeControllerConfiguration(in, out, s) -} diff --git a/pkg/controller/resourceclaim/config/v1alpha1/defaults.go b/pkg/controller/resourceclaim/config/v1alpha1/defaults.go deleted file mode 100644 index 1907699d6cf..00000000000 --- a/pkg/controller/resourceclaim/config/v1alpha1/defaults.go +++ /dev/null @@ -1,36 +0,0 @@ -/* -Copyright 2021 The Kubernetes Authors. - -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. -*/ - -package v1alpha1 - -import ( - kubectrlmgrconfigv1alpha1 "k8s.io/kube-controller-manager/config/v1alpha1" -) - -// RecommendedDefaultEphemeralVolumeControllerConfiguration defaults a pointer to a -// EphemeralVolumeControllerConfiguration struct. This will set the recommended default -// values, but they may be subject to change between API versions. This function -// is intentionally not registered in the scheme as a "normal" `SetDefaults_Foo` -// function to allow consumers of this type to set whatever defaults for their -// embedded configs. Forcing consumers to use these defaults would be problematic -// as defaulting in the scheme is done as part of the conversion, and there would -// be no easy way to opt-out. Instead, if you want to use this defaulting method -// run it in your wrapper struct of this type in its `SetDefaults_` method. -func RecommendedDefaultEphemeralVolumeControllerConfiguration(obj *kubectrlmgrconfigv1alpha1.EphemeralVolumeControllerConfiguration) { - if obj.ConcurrentEphemeralVolumeSyncs == 0 { - obj.ConcurrentEphemeralVolumeSyncs = 5 - } -} diff --git a/pkg/controller/resourceclaim/config/v1alpha1/doc.go b/pkg/controller/resourceclaim/config/v1alpha1/doc.go deleted file mode 100644 index 0ed1dcc5526..00000000000 --- a/pkg/controller/resourceclaim/config/v1alpha1/doc.go +++ /dev/null @@ -1,21 +0,0 @@ -/* -Copyright 2021 The Kubernetes Authors. - -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. -*/ - -// +k8s:deepcopy-gen=package -// +k8s:conversion-gen=k8s.io/kubernetes/pkg/controller/volume/ephemeral/config -// +k8s:conversion-gen-external-types=k8s.io/kube-controller-manager/config/v1alpha1 - -package v1alpha1 // import "k8s.io/kubernetes/pkg/controller/volume/ephemeral/config/v1alpha1" diff --git a/pkg/controller/resourceclaim/config/v1alpha1/register.go b/pkg/controller/resourceclaim/config/v1alpha1/register.go deleted file mode 100644 index 63cff4d8810..00000000000 --- a/pkg/controller/resourceclaim/config/v1alpha1/register.go +++ /dev/null @@ -1,31 +0,0 @@ -/* -Copyright 2021 The Kubernetes Authors. - -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. -*/ - -package v1alpha1 - -import ( - "k8s.io/apimachinery/pkg/runtime" -) - -var ( - // SchemeBuilder is the scheme builder with scheme init functions to run for this API package - SchemeBuilder runtime.SchemeBuilder - // localSchemeBuilder extends the SchemeBuilder instance with the external types. In this package, - // defaulting and conversion init funcs are registered as well. - localSchemeBuilder = &SchemeBuilder - // AddToScheme is a global function that registers this API group & version to a scheme - AddToScheme = localSchemeBuilder.AddToScheme -) diff --git a/pkg/controller/resourceclaim/config/v1alpha1/zz_generated.conversion.go b/pkg/controller/resourceclaim/config/v1alpha1/zz_generated.conversion.go deleted file mode 100644 index 34697f48409..00000000000 --- a/pkg/controller/resourceclaim/config/v1alpha1/zz_generated.conversion.go +++ /dev/null @@ -1,92 +0,0 @@ -//go:build !ignore_autogenerated -// +build !ignore_autogenerated - -/* -Copyright The Kubernetes Authors. - -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. -*/ - -// Code generated by conversion-gen. DO NOT EDIT. - -package v1alpha1 - -import ( - v1 "k8s.io/apimachinery/pkg/apis/meta/v1" - conversion "k8s.io/apimachinery/pkg/conversion" - runtime "k8s.io/apimachinery/pkg/runtime" - v1alpha1 "k8s.io/kube-controller-manager/config/v1alpha1" - config "k8s.io/kubernetes/pkg/controller/volume/ephemeral/config" -) - -func init() { - localSchemeBuilder.Register(RegisterConversions) -} - -// RegisterConversions adds conversion functions to the given scheme. -// Public to allow building arbitrary schemes. -func RegisterConversions(s *runtime.Scheme) error { - if err := s.AddGeneratedConversionFunc((*v1alpha1.GroupResource)(nil), (*v1.GroupResource)(nil), func(a, b interface{}, scope conversion.Scope) error { - return Convert_v1alpha1_GroupResource_To_v1_GroupResource(a.(*v1alpha1.GroupResource), b.(*v1.GroupResource), scope) - }); err != nil { - return err - } - if err := s.AddGeneratedConversionFunc((*v1.GroupResource)(nil), (*v1alpha1.GroupResource)(nil), func(a, b interface{}, scope conversion.Scope) error { - return Convert_v1_GroupResource_To_v1alpha1_GroupResource(a.(*v1.GroupResource), b.(*v1alpha1.GroupResource), scope) - }); err != nil { - return err - } - if err := s.AddConversionFunc((*config.EphemeralVolumeControllerConfiguration)(nil), (*v1alpha1.EphemeralVolumeControllerConfiguration)(nil), func(a, b interface{}, scope conversion.Scope) error { - return Convert_config_EphemeralVolumeControllerConfiguration_To_v1alpha1_EphemeralVolumeControllerConfiguration(a.(*config.EphemeralVolumeControllerConfiguration), b.(*v1alpha1.EphemeralVolumeControllerConfiguration), scope) - }); err != nil { - return err - } - if err := s.AddConversionFunc((*v1alpha1.EphemeralVolumeControllerConfiguration)(nil), (*config.EphemeralVolumeControllerConfiguration)(nil), func(a, b interface{}, scope conversion.Scope) error { - return Convert_v1alpha1_EphemeralVolumeControllerConfiguration_To_config_EphemeralVolumeControllerConfiguration(a.(*v1alpha1.EphemeralVolumeControllerConfiguration), b.(*config.EphemeralVolumeControllerConfiguration), scope) - }); err != nil { - return err - } - return nil -} - -func autoConvert_v1alpha1_EphemeralVolumeControllerConfiguration_To_config_EphemeralVolumeControllerConfiguration(in *v1alpha1.EphemeralVolumeControllerConfiguration, out *config.EphemeralVolumeControllerConfiguration, s conversion.Scope) error { - out.ConcurrentEphemeralVolumeSyncs = in.ConcurrentEphemeralVolumeSyncs - return nil -} - -func autoConvert_config_EphemeralVolumeControllerConfiguration_To_v1alpha1_EphemeralVolumeControllerConfiguration(in *config.EphemeralVolumeControllerConfiguration, out *v1alpha1.EphemeralVolumeControllerConfiguration, s conversion.Scope) error { - out.ConcurrentEphemeralVolumeSyncs = in.ConcurrentEphemeralVolumeSyncs - return nil -} - -func autoConvert_v1alpha1_GroupResource_To_v1_GroupResource(in *v1alpha1.GroupResource, out *v1.GroupResource, s conversion.Scope) error { - out.Group = in.Group - out.Resource = in.Resource - return nil -} - -// Convert_v1alpha1_GroupResource_To_v1_GroupResource is an autogenerated conversion function. -func Convert_v1alpha1_GroupResource_To_v1_GroupResource(in *v1alpha1.GroupResource, out *v1.GroupResource, s conversion.Scope) error { - return autoConvert_v1alpha1_GroupResource_To_v1_GroupResource(in, out, s) -} - -func autoConvert_v1_GroupResource_To_v1alpha1_GroupResource(in *v1.GroupResource, out *v1alpha1.GroupResource, s conversion.Scope) error { - out.Group = in.Group - out.Resource = in.Resource - return nil -} - -// Convert_v1_GroupResource_To_v1alpha1_GroupResource is an autogenerated conversion function. -func Convert_v1_GroupResource_To_v1alpha1_GroupResource(in *v1.GroupResource, out *v1alpha1.GroupResource, s conversion.Scope) error { - return autoConvert_v1_GroupResource_To_v1alpha1_GroupResource(in, out, s) -} diff --git a/pkg/controller/resourceclaim/config/v1alpha1/zz_generated.deepcopy.go b/pkg/controller/resourceclaim/config/v1alpha1/zz_generated.deepcopy.go deleted file mode 100644 index 61f6555edfc..00000000000 --- a/pkg/controller/resourceclaim/config/v1alpha1/zz_generated.deepcopy.go +++ /dev/null @@ -1,22 +0,0 @@ -//go:build !ignore_autogenerated -// +build !ignore_autogenerated - -/* -Copyright The Kubernetes Authors. - -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. -*/ - -// Code generated by deepcopy-gen. DO NOT EDIT. - -package v1alpha1 diff --git a/pkg/controller/resourceclaim/config/zz_generated.deepcopy.go b/pkg/controller/resourceclaim/config/zz_generated.deepcopy.go deleted file mode 100644 index 467824bb7ec..00000000000 --- a/pkg/controller/resourceclaim/config/zz_generated.deepcopy.go +++ /dev/null @@ -1,38 +0,0 @@ -//go:build !ignore_autogenerated -// +build !ignore_autogenerated - -/* -Copyright The Kubernetes Authors. - -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. -*/ - -// Code generated by deepcopy-gen. DO NOT EDIT. - -package config - -// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. -func (in *EphemeralVolumeControllerConfiguration) DeepCopyInto(out *EphemeralVolumeControllerConfiguration) { - *out = *in - return -} - -// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new EphemeralVolumeControllerConfiguration. -func (in *EphemeralVolumeControllerConfiguration) DeepCopy() *EphemeralVolumeControllerConfiguration { - if in == nil { - return nil - } - out := new(EphemeralVolumeControllerConfiguration) - in.DeepCopyInto(out) - return out -} diff --git a/pkg/controller/resourceclaim/controller.go b/pkg/controller/resourceclaim/controller.go index f40527cba76..6a484b90037 100644 --- a/pkg/controller/resourceclaim/controller.go +++ b/pkg/controller/resourceclaim/controller.go @@ -14,58 +14,69 @@ See the License for the specific language governing permissions and limitations under the License. */ -package ephemeral +package resourceclaim import ( "context" "fmt" + "strings" "time" - "k8s.io/klog/v2" - v1 "k8s.io/api/core/v1" + resourcev1alpha1 "k8s.io/api/resource/v1alpha1" "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/util/runtime" "k8s.io/apimachinery/pkg/util/wait" - coreinformers "k8s.io/client-go/informers/core/v1" + v1informers "k8s.io/client-go/informers/core/v1" + resourcev1alpha1informers "k8s.io/client-go/informers/resource/v1alpha1" clientset "k8s.io/client-go/kubernetes" "k8s.io/client-go/kubernetes/scheme" v1core "k8s.io/client-go/kubernetes/typed/core/v1" - corelisters "k8s.io/client-go/listers/core/v1" + v1listers "k8s.io/client-go/listers/core/v1" + resourcev1alpha1listers "k8s.io/client-go/listers/resource/v1alpha1" "k8s.io/client-go/tools/cache" - kcache "k8s.io/client-go/tools/cache" "k8s.io/client-go/tools/record" "k8s.io/client-go/util/workqueue" - "k8s.io/component-helpers/storage/ephemeral" - "k8s.io/kubernetes/pkg/controller/volume/common" - ephemeralvolumemetrics "k8s.io/kubernetes/pkg/controller/volume/ephemeral/metrics" - "k8s.io/kubernetes/pkg/controller/volume/events" + "k8s.io/dynamic-resource-allocation/resourceclaim" + "k8s.io/klog/v2" + podutil "k8s.io/kubernetes/pkg/api/v1/pod" + "k8s.io/kubernetes/pkg/controller/resourceclaim/metrics" ) -// Controller creates PVCs for ephemeral inline volumes in a pod spec. -type Controller interface { - Run(ctx context.Context, workers int) -} +const ( + // podResourceClaimIndex is the lookup name for the index function which indexes by pod ResourceClaim templates. + podResourceClaimIndex = "pod-resource-claim-index" -type ephemeralController struct { - // kubeClient is the kube API client used by volumehost to communicate with - // the API server. + maxUIDCacheEntries = 500 +) + +// Controller creates ResourceClaims for ResourceClaimTemplates in a pod spec. +type Controller struct { + // kubeClient is the kube API client used to communicate with the API + // server. kubeClient clientset.Interface - // pvcLister is the shared PVC lister used to fetch and store PVC + // claimLister is the shared ResourceClaim lister used to fetch and store ResourceClaim // objects from the API server. It is shared with other controllers and - // therefore the PVC objects in its store should be treated as immutable. - pvcLister corelisters.PersistentVolumeClaimLister - pvcsSynced kcache.InformerSynced + // therefore the ResourceClaim objects in its store should be treated as immutable. + claimLister resourcev1alpha1listers.ResourceClaimLister + claimsSynced cache.InformerSynced // podLister is the shared Pod lister used to fetch Pod // objects from the API server. It is shared with other controllers and // therefore the Pod objects in its store should be treated as immutable. - podLister corelisters.PodLister - podSynced kcache.InformerSynced + podLister v1listers.PodLister + podSynced cache.InformerSynced - // podIndexer has the common PodPVC indexer indexer installed To + // templateLister is the shared ResourceClaimTemplate lister used to + // fetch template objects from the API server. It is shared with other + // controllers and therefore the objects in its store should be treated + // as immutable. + templateLister resourcev1alpha1listers.ResourceClaimTemplateLister + templatesSynced cache.InformerSynced + + // podIndexer has the common PodResourceClaim indexer indexer installed To // limit iteration over pods to those of interest. podIndexer cache.Indexer @@ -73,104 +84,158 @@ type ephemeralController struct { recorder record.EventRecorder queue workqueue.RateLimitingInterface + + // The deletedObjects cache keeps track of Pods for which we know that + // they have existed and have been removed. For those we can be sure + // that a ReservedFor entry needs to be removed. + deletedObjects *uidCache } -// NewController creates an ephemeral volume controller. +const ( + claimKeyPrefix = "claim:" + podKeyPrefix = "pod:" +) + +// NewController creates a ResourceClaim controller. func NewController( kubeClient clientset.Interface, - podInformer coreinformers.PodInformer, - pvcInformer coreinformers.PersistentVolumeClaimInformer) (Controller, error) { + podInformer v1informers.PodInformer, + claimInformer resourcev1alpha1informers.ResourceClaimInformer, + templateInformer resourcev1alpha1informers.ResourceClaimTemplateInformer) (*Controller, error) { - ec := &ephemeralController{ - kubeClient: kubeClient, - podLister: podInformer.Lister(), - podIndexer: podInformer.Informer().GetIndexer(), - podSynced: podInformer.Informer().HasSynced, - pvcLister: pvcInformer.Lister(), - pvcsSynced: pvcInformer.Informer().HasSynced, - queue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "ephemeral_volume"), + ec := &Controller{ + kubeClient: kubeClient, + podLister: podInformer.Lister(), + podIndexer: podInformer.Informer().GetIndexer(), + podSynced: podInformer.Informer().HasSynced, + claimLister: claimInformer.Lister(), + claimsSynced: claimInformer.Informer().HasSynced, + templateLister: templateInformer.Lister(), + templatesSynced: templateInformer.Informer().HasSynced, + queue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "resource_claim"), + deletedObjects: newUIDCache(maxUIDCacheEntries), } - ephemeralvolumemetrics.RegisterMetrics() + metrics.RegisterMetrics() eventBroadcaster := record.NewBroadcaster() eventBroadcaster.StartLogging(klog.Infof) eventBroadcaster.StartRecordingToSink(&v1core.EventSinkImpl{Interface: kubeClient.CoreV1().Events("")}) - ec.recorder = eventBroadcaster.NewRecorder(scheme.Scheme, v1.EventSource{Component: "ephemeral_volume"}) + ec.recorder = eventBroadcaster.NewRecorder(scheme.Scheme, v1.EventSource{Component: "resource_claim"}) - podInformer.Informer().AddEventHandler(kcache.ResourceEventHandlerFuncs{ - AddFunc: ec.enqueuePod, - // The pod spec is immutable. Therefore the controller can ignore pod updates - // because there cannot be any changes that have to be copied into the generated - // PVC. - // Deletion of the PVC is handled through the owner reference and garbage collection. - // Therefore pod deletions also can be ignored. - }) - pvcInformer.Informer().AddEventHandler(kcache.ResourceEventHandlerFuncs{ - DeleteFunc: ec.onPVCDelete, - }) - if err := common.AddPodPVCIndexerIfNotPresent(ec.podIndexer); err != nil { - return nil, fmt.Errorf("could not initialize pvc protection controller: %w", err) + if _, err := podInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{ + AddFunc: func(obj interface{}) { + ec.enqueuePod(obj, false) + }, + UpdateFunc: func(old, updated interface{}) { + ec.enqueuePod(updated, false) + }, + DeleteFunc: func(obj interface{}) { + ec.enqueuePod(obj, true) + }, + }); err != nil { + return nil, err + } + if _, err := claimInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{ + AddFunc: ec.onResourceClaimAddOrUpdate, + UpdateFunc: func(old, updated interface{}) { + ec.onResourceClaimAddOrUpdate(updated) + }, + DeleteFunc: ec.onResourceClaimDelete, + }); err != nil { + return nil, err + } + if err := ec.podIndexer.AddIndexers(cache.Indexers{podResourceClaimIndex: podResourceClaimIndexFunc}); err != nil { + return nil, fmt.Errorf("could not initialize ResourceClaim controller: %w", err) } return ec, nil } -func (ec *ephemeralController) enqueuePod(obj interface{}) { +func (ec *Controller) enqueuePod(obj interface{}, deleted bool) { + if d, ok := obj.(cache.DeletedFinalStateUnknown); ok { + obj = d.Obj + } pod, ok := obj.(*v1.Pod) if !ok { + // Not a pod?! return } - // Ignore pods which are already getting deleted. - if pod.DeletionTimestamp != nil { + if deleted { + ec.deletedObjects.Add(pod.UID) + } + + if len(pod.Spec.ResourceClaims) == 0 { + // Nothing to do for it at all. return } - for _, vol := range pod.Spec.Volumes { - if vol.Ephemeral != nil { - // It has at least one ephemeral inline volume, work on it. - key, err := kcache.DeletionHandlingMetaNamespaceKeyFunc(pod) - if err != nil { - runtime.HandleError(fmt.Errorf("couldn't get key for object %#v: %v", pod, err)) - return + // Release reservations of a deleted or completed pod? + if deleted || + podutil.IsPodTerminal(pod) || + // Deleted and not scheduled: + pod.DeletionTimestamp != nil && pod.Spec.NodeName == "" { + for _, podClaim := range pod.Spec.ResourceClaims { + claimName := resourceclaim.Name(pod, &podClaim) + ec.queue.Add(claimKeyPrefix + pod.Namespace + "/" + claimName) + } + } + + // Create ResourceClaim for inline templates? + if pod.DeletionTimestamp == nil { + for _, podClaim := range pod.Spec.ResourceClaims { + if podClaim.Source.ResourceClaimTemplateName != nil { + // It has at least one inline template, work on it. + ec.queue.Add(podKeyPrefix + pod.Namespace + "/" + pod.Name) + break } - ec.queue.Add(key) - break } } } -func (ec *ephemeralController) onPVCDelete(obj interface{}) { - pvc, ok := obj.(*v1.PersistentVolumeClaim) +func (ec *Controller) onResourceClaimAddOrUpdate(obj interface{}) { + claim, ok := obj.(*resourcev1alpha1.ResourceClaim) if !ok { return } - // Someone deleted a PVC, either intentionally or + // When starting up, we have to check all claims to find those with + // stale pods in ReservedFor. During an update, a pod might get added + // that already no longer exists. + ec.queue.Add(claimKeyPrefix + claim.Namespace + "/" + claim.Name) +} + +func (ec *Controller) onResourceClaimDelete(obj interface{}) { + claim, ok := obj.(*resourcev1alpha1.ResourceClaim) + if !ok { + return + } + + // Someone deleted a ResourceClaim, either intentionally or // accidentally. If there is a pod referencing it because of - // an ephemeral volume, then we should re-create the PVC. + // an inline resource, then we should re-create the ResourceClaim. // The common indexer does some prefiltering for us by // limiting the list to those pods which reference - // the PVC. - objs, err := ec.podIndexer.ByIndex(common.PodPVCIndex, fmt.Sprintf("%s/%s", pvc.Namespace, pvc.Name)) + // the ResourceClaim. + objs, err := ec.podIndexer.ByIndex(podResourceClaimIndex, fmt.Sprintf("%s/%s", claim.Namespace, claim.Name)) if err != nil { runtime.HandleError(fmt.Errorf("listing pods from cache: %v", err)) return } for _, obj := range objs { - ec.enqueuePod(obj) + ec.enqueuePod(obj, false) } } -func (ec *ephemeralController) Run(ctx context.Context, workers int) { +func (ec *Controller) Run(ctx context.Context, workers int) { defer runtime.HandleCrash() defer ec.queue.ShutDown() klog.Infof("Starting ephemeral volume controller") defer klog.Infof("Shutting down ephemeral volume controller") - if !cache.WaitForNamedCacheSync("ephemeral", ctx.Done(), ec.podSynced, ec.pvcsSynced) { + if !cache.WaitForNamedCacheSync("ephemeral", ctx.Done(), ec.podSynced, ec.claimsSynced) { return } @@ -181,12 +246,12 @@ func (ec *ephemeralController) Run(ctx context.Context, workers int) { <-ctx.Done() } -func (ec *ephemeralController) runWorker(ctx context.Context) { +func (ec *Controller) runWorker(ctx context.Context) { for ec.processNextWorkItem(ctx) { } } -func (ec *ephemeralController) processNextWorkItem(ctx context.Context) bool { +func (ec *Controller) processNextWorkItem(ctx context.Context) bool { key, shutdown := ec.queue.Get() if shutdown { return false @@ -205,65 +270,92 @@ func (ec *ephemeralController) processNextWorkItem(ctx context.Context) bool { return true } -// syncHandler is invoked for each pod which might need to be processed. -// If an error is returned from this function, the pod will be requeued. -func (ec *ephemeralController) syncHandler(ctx context.Context, key string) error { - namespace, name, err := kcache.SplitMetaNamespaceKey(key) +// syncHandler is invoked for each work item which might need to be processed. +// If an error is returned from this function, the item will be requeued. +func (ec *Controller) syncHandler(ctx context.Context, key string) error { + sep := strings.Index(key, ":") + if sep < 0 { + return fmt.Errorf("unexpected key: %s", key) + } + prefix, object := key[0:sep+1], key[sep+1:] + namespace, name, err := cache.SplitMetaNamespaceKey(object) if err != nil { return err } + + switch prefix { + case podKeyPrefix: + return ec.syncPod(ctx, namespace, name) + case claimKeyPrefix: + return ec.syncClaim(ctx, namespace, name) + default: + return fmt.Errorf("unexpected key prefix: %s", prefix) + } + +} + +func (ec *Controller) syncPod(ctx context.Context, namespace, name string) error { + logger := klog.LoggerWithValues(klog.FromContext(ctx), "pod", klog.KRef(namespace, name)) + ctx = klog.NewContext(ctx, logger) pod, err := ec.podLister.Pods(namespace).Get(name) if err != nil { if errors.IsNotFound(err) { - klog.V(5).Infof("ephemeral: nothing to do for pod %s, it is gone", key) + logger.V(5).Info("nothing to do for pod, it is gone") return nil } - klog.V(5).Infof("Error getting pod %s/%s (uid: %q) from informer : %v", pod.Namespace, pod.Name, pod.UID, err) return err } // Ignore pods which are already getting deleted. if pod.DeletionTimestamp != nil { - klog.V(5).Infof("ephemeral: nothing to do for pod %s, it is marked for deletion", key) + logger.V(5).Info("nothing to do for pod, it is marked for deletion") return nil } - for _, vol := range pod.Spec.Volumes { - if err := ec.handleVolume(ctx, pod, vol); err != nil { - ec.recorder.Event(pod, v1.EventTypeWarning, events.FailedBinding, fmt.Sprintf("ephemeral volume %s: %v", vol.Name, err)) - return fmt.Errorf("pod %s, ephemeral volume %s: %v", key, vol.Name, err) + for _, podClaim := range pod.Spec.ResourceClaims { + if err := ec.handleClaim(ctx, pod, podClaim); err != nil { + ec.recorder.Event(pod, v1.EventTypeWarning, "FailedResourceClaimCreation", fmt.Sprintf("PodResourceClaim %s: %v", podClaim.Name, err)) + return fmt.Errorf("pod %s/%s, PodResourceClaim %s: %v", namespace, name, podClaim.Name, err) } } return nil } -// handleEphemeralVolume is invoked for each volume of a pod. -func (ec *ephemeralController) handleVolume(ctx context.Context, pod *v1.Pod, vol v1.Volume) error { - klog.V(5).Infof("ephemeral: checking volume %s", vol.Name) - if vol.Ephemeral == nil { +// handleResourceClaim is invoked for each volume of a pod. +func (ec *Controller) handleClaim(ctx context.Context, pod *v1.Pod, podClaim v1.PodResourceClaim) error { + logger := klog.LoggerWithValues(klog.FromContext(ctx), "podClaim", podClaim.Name) + ctx = klog.NewContext(ctx, logger) + logger.V(5).Info("checking", "podClaim", podClaim.Name) + templateName := podClaim.Source.ResourceClaimTemplateName + if templateName == nil { return nil } - pvcName := ephemeral.VolumeClaimName(pod, &vol) - pvc, err := ec.pvcLister.PersistentVolumeClaims(pod.Namespace).Get(pvcName) + claimName := resourceclaim.Name(pod, &podClaim) + claim, err := ec.claimLister.ResourceClaims(pod.Namespace).Get(claimName) if err != nil && !errors.IsNotFound(err) { return err } - if pvc != nil { - if err := ephemeral.VolumeIsForPod(pod, pvc); err != nil { + if claim != nil { + if err := resourceclaim.IsForPod(pod, claim); err != nil { return err } // Already created, nothing more to do. - klog.V(5).Infof("ephemeral: volume %s: PVC %s already created", vol.Name, pvcName) + logger.V(5).Info("claim already created", "podClaim", podClaim.Name, "resourceClaim", claimName) return nil } - // Create the PVC with pod as owner. + template, err := ec.templateLister.ResourceClaimTemplates(pod.Namespace).Get(*templateName) + if err != nil { + return fmt.Errorf("resource claim template %q: %v", *templateName, err) + } + + // Create the ResourceClaim with pod as owner. isTrue := true - pvc = &v1.PersistentVolumeClaim{ + claim = &resourcev1alpha1.ResourceClaim{ ObjectMeta: metav1.ObjectMeta{ - Name: pvcName, + Name: claimName, OwnerReferences: []metav1.OwnerReference{ { APIVersion: "v1", @@ -274,16 +366,114 @@ func (ec *ephemeralController) handleVolume(ctx context.Context, pod *v1.Pod, vo BlockOwnerDeletion: &isTrue, }, }, - Annotations: vol.Ephemeral.VolumeClaimTemplate.Annotations, - Labels: vol.Ephemeral.VolumeClaimTemplate.Labels, + Annotations: template.Spec.ObjectMeta.Annotations, + Labels: template.Spec.ObjectMeta.Labels, }, - Spec: vol.Ephemeral.VolumeClaimTemplate.Spec, + Spec: template.Spec.Spec, } - ephemeralvolumemetrics.EphemeralVolumeCreateAttempts.Inc() - _, err = ec.kubeClient.CoreV1().PersistentVolumeClaims(pod.Namespace).Create(ctx, pvc, metav1.CreateOptions{}) + metrics.ResourceClaimCreateAttempts.Inc() + _, err = ec.kubeClient.ResourceV1alpha1().ResourceClaims(pod.Namespace).Create(ctx, claim, metav1.CreateOptions{}) if err != nil { - ephemeralvolumemetrics.EphemeralVolumeCreateFailures.Inc() - return fmt.Errorf("create PVC %s: %v", pvcName, err) + metrics.ResourceClaimCreateFailures.Inc() + return fmt.Errorf("create ResourceClaim %s: %v", claimName, err) } return nil } + +func (ec *Controller) syncClaim(ctx context.Context, namespace, name string) error { + logger := klog.LoggerWithValues(klog.FromContext(ctx), "claim", klog.KRef(namespace, name)) + ctx = klog.NewContext(ctx, logger) + claim, err := ec.claimLister.ResourceClaims(namespace).Get(name) + if err != nil { + if errors.IsNotFound(err) { + logger.V(5).Info("nothing to do for claim, it is gone") + return nil + } + return err + } + + // Check if the ReservedFor entries are all still valid. + valid := make([]resourcev1alpha1.ResourceClaimConsumerReference, 0, len(claim.Status.ReservedFor)) + for _, reservedFor := range claim.Status.ReservedFor { + if reservedFor.APIGroup == "" && + reservedFor.Resource == "pods" { + // A pod falls into one of three categories: + // - we have it in our cache -> don't remove it until we are told that it got removed + // - we don't have it in our cache anymore, but we have seen it before -> it was deleted, remove it + // - not in our cache, not seen -> double-check with API server before removal + + keepEntry := true + + // Tracking deleted pods in the LRU cache is an + // optimization. Without this cache, the code would + // have to do the API call below for every deleted pod + // to ensure that the pod really doesn't exist. With + // the cache, most of the time the pod will be recorded + // as deleted and the API call can be avoided. + if ec.deletedObjects.Has(reservedFor.UID) { + // We know that the pod was deleted. This is + // easy to check and thus is done first. + keepEntry = false + } else { + pod, err := ec.podLister.Pods(claim.Namespace).Get(reservedFor.Name) + if err != nil && !errors.IsNotFound(err) { + return err + } + if pod == nil { + // We might not have it in our informer cache + // yet. Removing the pod while the scheduler is + // scheduling it would be bad. We have to be + // absolutely sure and thus have to check with + // the API server. + pod, err := ec.kubeClient.CoreV1().Pods(claim.Namespace).Get(ctx, reservedFor.Name, metav1.GetOptions{}) + if err != nil && !errors.IsNotFound(err) { + return err + } + if pod == nil || pod.UID != reservedFor.UID { + keepEntry = false + } + } else if pod.UID != reservedFor.UID { + // Pod exists, but is a different incarnation under the same name. + keepEntry = false + } + } + + if keepEntry { + valid = append(valid, reservedFor) + } + continue + } + + // TODO: support generic object lookup + return fmt.Errorf("unsupported ReservedFor entry: %v", reservedFor) + } + + if len(valid) < len(claim.Status.ReservedFor) { + // TODO (#113700): patch + claim := claim.DeepCopy() + claim.Status.ReservedFor = valid + _, err := ec.kubeClient.ResourceV1alpha1().ResourceClaims(claim.Namespace).UpdateStatus(ctx, claim, metav1.UpdateOptions{}) + if err != nil { + return err + } + } + + return nil +} + +// podResourceClaimIndexFunc is an index function that returns ResourceClaim keys (= +// namespace/name) for ResourceClaimTemplates in a given pod. +func podResourceClaimIndexFunc(obj interface{}) ([]string, error) { + pod, ok := obj.(*v1.Pod) + if !ok { + return []string{}, nil + } + keys := []string{} + for _, podClaim := range pod.Spec.ResourceClaims { + if podClaim.Source.ResourceClaimTemplateName != nil { + claimName := resourceclaim.Name(pod, &podClaim) + keys = append(keys, fmt.Sprintf("%s/%s", pod.Namespace, claimName)) + } + } + return keys, nil +} diff --git a/pkg/controller/resourceclaim/controller_test.go b/pkg/controller/resourceclaim/controller_test.go index e6c8d6d396f..08adf0c8e5b 100644 --- a/pkg/controller/resourceclaim/controller_test.go +++ b/pkg/controller/resourceclaim/controller_test.go @@ -14,7 +14,7 @@ See the License for the specific language governing permissions and limitations under the License. */ -package ephemeral +package resourceclaim import ( "context" @@ -22,38 +22,62 @@ import ( "sort" "testing" + "github.com/stretchr/testify/assert" + v1 "k8s.io/api/core/v1" - // storagev1 "k8s.io/api/storage/v1" - metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" - // "k8s.io/apimachinery/pkg/types" + resourcev1alpha1 "k8s.io/api/resource/v1alpha1" apierrors "k8s.io/apimachinery/pkg/api/errors" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/types" "k8s.io/client-go/informers" "k8s.io/client-go/kubernetes/fake" k8stesting "k8s.io/client-go/testing" "k8s.io/client-go/tools/cache" - kcache "k8s.io/client-go/tools/cache" "k8s.io/component-base/metrics/testutil" "k8s.io/klog/v2" "k8s.io/kubernetes/pkg/controller" - ephemeralvolumemetrics "k8s.io/kubernetes/pkg/controller/volume/ephemeral/metrics" - - "github.com/stretchr/testify/assert" + ephemeralvolumemetrics "k8s.io/kubernetes/pkg/controller/resourceclaim/metrics" ) var ( - testPodName = "test-pod" - testNamespace = "my-namespace" - testPodUID = types.UID("uidpod1") - otherNamespace = "not-my-namespace" - ephemeralVolumeName = "ephemeral-volume" + testPodName = "test-pod" + testNamespace = "my-namespace" + testPodUID = types.UID("uidpod1") + otherNamespace = "not-my-namespace" + podResourceClaimName = "acme-resource" + templateName = "my-template" + className = "my-resource-class" - testPod = makePod(testPodName, testNamespace, testPodUID) - testPodWithEphemeral = makePod(testPodName, testNamespace, testPodUID, *makeEphemeralVolume(ephemeralVolumeName)) - testPodEphemeralClaim = makePVC(testPodName+"-"+ephemeralVolumeName, testNamespace, makeOwnerReference(testPodWithEphemeral, true)) - conflictingClaim = makePVC(testPodName+"-"+ephemeralVolumeName, testNamespace, nil) - otherNamespaceClaim = makePVC(testPodName+"-"+ephemeralVolumeName, otherNamespace, nil) + testPod = makePod(testPodName, testNamespace, testPodUID) + testPodWithResource = makePod(testPodName, testNamespace, testPodUID, *makePodResourceClaim(podResourceClaimName, templateName)) + otherTestPod = makePod(testPodName+"-II", testNamespace, testPodUID+"-II") + testClaim = makeClaim(testPodName+"-"+podResourceClaimName, testNamespace, className, makeOwnerReference(testPodWithResource, true)) + testClaimReserved = func() *resourcev1alpha1.ResourceClaim { + claim := testClaim.DeepCopy() + claim.Status.ReservedFor = append(claim.Status.ReservedFor, + resourcev1alpha1.ResourceClaimConsumerReference{ + Resource: "pods", + Name: testPodWithResource.Name, + UID: testPodWithResource.UID, + }, + ) + return claim + }() + testClaimReservedTwice = func() *resourcev1alpha1.ResourceClaim { + claim := testClaimReserved.DeepCopy() + claim.Status.ReservedFor = append(claim.Status.ReservedFor, + resourcev1alpha1.ResourceClaimConsumerReference{ + Resource: "pods", + Name: otherTestPod.Name, + UID: otherTestPod.UID, + }, + ) + return claim + }() + conflictingClaim = makeClaim(testPodName+"-"+podResourceClaimName, testNamespace, className, nil) + otherNamespaceClaim = makeClaim(testPodName+"-"+podResourceClaimName, otherNamespace, className, nil) + template = makeTemplate(templateName, testNamespace, className) ) func init() { @@ -63,104 +87,170 @@ func init() { func TestSyncHandler(t *testing.T) { tests := []struct { name string - podKey string - pvcs []*v1.PersistentVolumeClaim + key string + claims []*resourcev1alpha1.ResourceClaim pods []*v1.Pod - expectedPVCs []v1.PersistentVolumeClaim + podsLater []*v1.Pod + templates []*resourcev1alpha1.ResourceClaimTemplate + expectedClaims []resourcev1alpha1.ResourceClaim expectedError bool expectedMetrics expectedMetrics }{ { name: "create", - pods: []*v1.Pod{testPodWithEphemeral}, - podKey: podKey(testPodWithEphemeral), - expectedPVCs: []v1.PersistentVolumeClaim{*testPodEphemeralClaim}, + pods: []*v1.Pod{testPodWithResource}, + templates: []*resourcev1alpha1.ResourceClaimTemplate{template}, + key: podKey(testPodWithResource), + expectedClaims: []resourcev1alpha1.ResourceClaim{*testClaim}, expectedMetrics: expectedMetrics{1, 0}, }, { - name: "no-such-pod", - podKey: podKey(testPodWithEphemeral), + name: "missing-template", + pods: []*v1.Pod{testPodWithResource}, + templates: nil, + key: podKey(testPodWithResource), + expectedError: true, + }, + { + name: "nop", + pods: []*v1.Pod{testPodWithResource}, + key: podKey(testPodWithResource), + claims: []*resourcev1alpha1.ResourceClaim{testClaim}, + expectedClaims: []resourcev1alpha1.ResourceClaim{*testClaim}, + expectedMetrics: expectedMetrics{0, 0}, + }, + { + name: "no-such-pod", + key: podKey(testPodWithResource), }, { name: "pod-deleted", pods: func() []*v1.Pod { deleted := metav1.Now() - pods := []*v1.Pod{testPodWithEphemeral.DeepCopy()} + pods := []*v1.Pod{testPodWithResource.DeepCopy()} pods[0].DeletionTimestamp = &deleted return pods }(), - podKey: podKey(testPodWithEphemeral), + key: podKey(testPodWithResource), }, { - name: "no-volumes", - pods: []*v1.Pod{testPod}, - podKey: podKey(testPod), + name: "no-volumes", + pods: []*v1.Pod{testPod}, + key: podKey(testPod), }, { - name: "create-with-other-PVC", - pods: []*v1.Pod{testPodWithEphemeral}, - podKey: podKey(testPodWithEphemeral), - pvcs: []*v1.PersistentVolumeClaim{otherNamespaceClaim}, - expectedPVCs: []v1.PersistentVolumeClaim{*otherNamespaceClaim, *testPodEphemeralClaim}, + name: "create-with-other-claim", + pods: []*v1.Pod{testPodWithResource}, + templates: []*resourcev1alpha1.ResourceClaimTemplate{template}, + key: podKey(testPodWithResource), + claims: []*resourcev1alpha1.ResourceClaim{otherNamespaceClaim}, + expectedClaims: []resourcev1alpha1.ResourceClaim{*otherNamespaceClaim, *testClaim}, expectedMetrics: expectedMetrics{1, 0}, }, { - name: "wrong-PVC-owner", - pods: []*v1.Pod{testPodWithEphemeral}, - podKey: podKey(testPodWithEphemeral), - pvcs: []*v1.PersistentVolumeClaim{conflictingClaim}, - expectedPVCs: []v1.PersistentVolumeClaim{*conflictingClaim}, - expectedError: true, + name: "wrong-claim-owner", + pods: []*v1.Pod{testPodWithResource}, + key: podKey(testPodWithResource), + claims: []*resourcev1alpha1.ResourceClaim{conflictingClaim}, + expectedClaims: []resourcev1alpha1.ResourceClaim{*conflictingClaim}, + expectedError: true, }, { name: "create-conflict", - pods: []*v1.Pod{testPodWithEphemeral}, - podKey: podKey(testPodWithEphemeral), + pods: []*v1.Pod{testPodWithResource}, + templates: []*resourcev1alpha1.ResourceClaimTemplate{template}, + key: podKey(testPodWithResource), expectedMetrics: expectedMetrics{1, 1}, expectedError: true, }, + { + name: "stay-reserved-seen", + pods: []*v1.Pod{testPodWithResource}, + key: claimKey(testClaimReserved), + claims: []*resourcev1alpha1.ResourceClaim{testClaimReserved}, + expectedClaims: []resourcev1alpha1.ResourceClaim{*testClaimReserved}, + expectedMetrics: expectedMetrics{0, 0}, + }, + { + name: "stay-reserved-not-seen", + podsLater: []*v1.Pod{testPodWithResource}, + key: claimKey(testClaimReserved), + claims: []*resourcev1alpha1.ResourceClaim{testClaimReserved}, + expectedClaims: []resourcev1alpha1.ResourceClaim{*testClaimReserved}, + expectedMetrics: expectedMetrics{0, 0}, + }, + { + name: "clear-reserved", + pods: []*v1.Pod{}, + key: claimKey(testClaimReserved), + claims: []*resourcev1alpha1.ResourceClaim{testClaimReserved}, + expectedClaims: []resourcev1alpha1.ResourceClaim{*testClaim}, + expectedMetrics: expectedMetrics{0, 0}, + }, + { + name: "remove-reserved", + pods: []*v1.Pod{testPod}, + key: claimKey(testClaimReservedTwice), + claims: []*resourcev1alpha1.ResourceClaim{testClaimReservedTwice}, + expectedClaims: []resourcev1alpha1.ResourceClaim{*testClaimReserved}, + expectedMetrics: expectedMetrics{0, 0}, + }, } for _, tc := range tests { // Run sequentially because of global logging and global metrics. t.Run(tc.name, func(t *testing.T) { - // There is no good way to shut down the informers. They spawn - // various goroutines and some of them (in particular shared informer) - // become very unhappy ("close on closed channel") when using a context - // that gets cancelled. Therefore we just keep everything running. - ctx := context.Background() + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() var objects []runtime.Object for _, pod := range tc.pods { objects = append(objects, pod) } - for _, pvc := range tc.pvcs { - objects = append(objects, pvc) + for _, claim := range tc.claims { + objects = append(objects, claim) + } + for _, template := range tc.templates { + objects = append(objects, template) } fakeKubeClient := createTestClient(objects...) if tc.expectedMetrics.numFailures > 0 { - fakeKubeClient.PrependReactor("create", "persistentvolumeclaims", func(action k8stesting.Action) (handled bool, ret runtime.Object, err error) { + fakeKubeClient.PrependReactor("create", "resourceclaims", func(action k8stesting.Action) (handled bool, ret runtime.Object, err error) { return true, nil, apierrors.NewConflict(action.GetResource().GroupResource(), "fake name", errors.New("fake conflict")) }) } setupMetrics() informerFactory := informers.NewSharedInformerFactory(fakeKubeClient, controller.NoResyncPeriodFunc()) podInformer := informerFactory.Core().V1().Pods() - pvcInformer := informerFactory.Core().V1().PersistentVolumeClaims() + claimInformer := informerFactory.Resource().V1alpha1().ResourceClaims() + templateInformer := informerFactory.Resource().V1alpha1().ResourceClaimTemplates() - c, err := NewController(fakeKubeClient, podInformer, pvcInformer) + ec, err := NewController(fakeKubeClient, podInformer, claimInformer, templateInformer) if err != nil { t.Fatalf("error creating ephemeral controller : %v", err) } - ec, _ := c.(*ephemeralController) // Ensure informers are up-to-date. go informerFactory.Start(ctx.Done()) + stopInformers := func() { + cancel() + informerFactory.Shutdown() + } + defer stopInformers() informerFactory.WaitForCacheSync(ctx.Done()) - cache.WaitForCacheSync(ctx.Done(), podInformer.Informer().HasSynced, pvcInformer.Informer().HasSynced) + cache.WaitForCacheSync(ctx.Done(), podInformer.Informer().HasSynced, claimInformer.Informer().HasSynced, templateInformer.Informer().HasSynced) - err = ec.syncHandler(context.TODO(), tc.podKey) + // Simulate race: stop informers, add more pods that the controller doesn't know about. + stopInformers() + for _, pod := range tc.podsLater { + _, err := fakeKubeClient.CoreV1().Pods(pod.Namespace).Create(ctx, pod, metav1.CreateOptions{}) + if err != nil { + t.Fatalf("unexpected error while creating pod: %v", err) + } + } + + err = ec.syncHandler(context.TODO(), tc.key) if err != nil && !tc.expectedError { t.Fatalf("unexpected error while running handler: %v", err) } @@ -168,53 +258,68 @@ func TestSyncHandler(t *testing.T) { t.Fatalf("unexpected success") } - pvcs, err := fakeKubeClient.CoreV1().PersistentVolumeClaims("").List(ctx, metav1.ListOptions{}) + claims, err := fakeKubeClient.ResourceV1alpha1().ResourceClaims("").List(ctx, metav1.ListOptions{}) if err != nil { - t.Fatalf("unexpected error while listing PVCs: %v", err) + t.Fatalf("unexpected error while listing claims: %v", err) } - assert.Equal(t, sortPVCs(tc.expectedPVCs), sortPVCs(pvcs.Items)) + assert.Equal(t, normalizeClaims(tc.expectedClaims), normalizeClaims(claims.Items)) expectMetrics(t, tc.expectedMetrics) }) } } -func makePVC(name, namespace string, owner *metav1.OwnerReference) *v1.PersistentVolumeClaim { - pvc := &v1.PersistentVolumeClaim{ +func makeClaim(name, namespace, classname string, owner *metav1.OwnerReference) *resourcev1alpha1.ResourceClaim { + claim := &resourcev1alpha1.ResourceClaim{ ObjectMeta: metav1.ObjectMeta{Name: name, Namespace: namespace}, - Spec: v1.PersistentVolumeClaimSpec{}, + Spec: resourcev1alpha1.ResourceClaimSpec{ + ResourceClassName: classname, + }, } if owner != nil { - pvc.OwnerReferences = []metav1.OwnerReference{*owner} + claim.OwnerReferences = []metav1.OwnerReference{*owner} } - return pvc + return claim } -func makeEphemeralVolume(name string) *v1.Volume { - return &v1.Volume{ +func makePodResourceClaim(name, templateName string) *v1.PodResourceClaim { + return &v1.PodResourceClaim{ Name: name, - VolumeSource: v1.VolumeSource{ - Ephemeral: &v1.EphemeralVolumeSource{ - VolumeClaimTemplate: &v1.PersistentVolumeClaimTemplate{}, + Source: v1.ClaimSource{ + ResourceClaimTemplateName: &templateName, + }, + } +} + +func makePod(name, namespace string, uid types.UID, podClaims ...v1.PodResourceClaim) *v1.Pod { + pod := &v1.Pod{ + ObjectMeta: metav1.ObjectMeta{Name: name, Namespace: namespace, UID: uid}, + Spec: v1.PodSpec{ + ResourceClaims: podClaims, + }, + } + + return pod +} + +func makeTemplate(name, namespace, classname string) *resourcev1alpha1.ResourceClaimTemplate { + template := &resourcev1alpha1.ResourceClaimTemplate{ + ObjectMeta: metav1.ObjectMeta{Name: name, Namespace: namespace}, + Spec: resourcev1alpha1.ResourceClaimTemplateSpec{ + Spec: resourcev1alpha1.ResourceClaimSpec{ + ResourceClassName: classname, }, }, } -} - -func makePod(name, namespace string, uid types.UID, volumes ...v1.Volume) *v1.Pod { - pvc := &v1.Pod{ - ObjectMeta: metav1.ObjectMeta{Name: name, Namespace: namespace, UID: uid}, - Spec: v1.PodSpec{ - Volumes: volumes, - }, - } - - return pvc + return template } func podKey(pod *v1.Pod) string { - key, _ := kcache.DeletionHandlingMetaNamespaceKeyFunc(testPodWithEphemeral) - return key + return podKeyPrefix + pod.Namespace + "/" + pod.Name +} + +func claimKey(claim *resourcev1alpha1.ResourceClaim) string { + return claimKeyPrefix + claim.Namespace + "/" + claim.Name } func makeOwnerReference(pod *v1.Pod, isController bool) *metav1.OwnerReference { @@ -229,12 +334,17 @@ func makeOwnerReference(pod *v1.Pod, isController bool) *metav1.OwnerReference { } } -func sortPVCs(pvcs []v1.PersistentVolumeClaim) []v1.PersistentVolumeClaim { - sort.Slice(pvcs, func(i, j int) bool { - return pvcs[i].Namespace < pvcs[j].Namespace || - pvcs[i].Name < pvcs[j].Name +func normalizeClaims(claims []resourcev1alpha1.ResourceClaim) []resourcev1alpha1.ResourceClaim { + sort.Slice(claims, func(i, j int) bool { + return claims[i].Namespace < claims[j].Namespace || + claims[i].Name < claims[j].Name }) - return pvcs + for i := range claims { + if len(claims[i].Status.ReservedFor) == 0 { + claims[i].Status.ReservedFor = nil + } + } + return claims } func createTestClient(objects ...runtime.Object) *fake.Clientset { @@ -252,15 +362,15 @@ type expectedMetrics struct { func expectMetrics(t *testing.T, em expectedMetrics) { t.Helper() - actualCreated, err := testutil.GetCounterMetricValue(ephemeralvolumemetrics.EphemeralVolumeCreateAttempts) - handleErr(t, err, "ephemeralVolumeCreate") + actualCreated, err := testutil.GetCounterMetricValue(ephemeralvolumemetrics.ResourceClaimCreateAttempts) + handleErr(t, err, "ResourceClaimCreate") if actualCreated != float64(em.numCreated) { - t.Errorf("Expected PVCs to be created %d, got %v", em.numCreated, actualCreated) + t.Errorf("Expected claims to be created %d, got %v", em.numCreated, actualCreated) } - actualConflicts, err := testutil.GetCounterMetricValue(ephemeralvolumemetrics.EphemeralVolumeCreateFailures) - handleErr(t, err, "ephemeralVolumeCreate/Conflict") + actualConflicts, err := testutil.GetCounterMetricValue(ephemeralvolumemetrics.ResourceClaimCreateFailures) + handleErr(t, err, "ResourceClaimCreate/Conflict") if actualConflicts != float64(em.numFailures) { - t.Errorf("Expected PVCs to have conflicts %d, got %v", em.numFailures, actualConflicts) + t.Errorf("Expected claims to have conflicts %d, got %v", em.numFailures, actualConflicts) } } @@ -272,6 +382,6 @@ func handleErr(t *testing.T, err error, metricName string) { func setupMetrics() { ephemeralvolumemetrics.RegisterMetrics() - ephemeralvolumemetrics.EphemeralVolumeCreateAttempts.Reset() - ephemeralvolumemetrics.EphemeralVolumeCreateFailures.Reset() + ephemeralvolumemetrics.ResourceClaimCreateAttempts.Reset() + ephemeralvolumemetrics.ResourceClaimCreateFailures.Reset() } diff --git a/pkg/controller/resourceclaim/doc.go b/pkg/controller/resourceclaim/doc.go index ae45cbaad1d..39257887901 100644 --- a/pkg/controller/resourceclaim/doc.go +++ b/pkg/controller/resourceclaim/doc.go @@ -14,8 +14,8 @@ See the License for the specific language governing permissions and limitations under the License. */ -// Package ephemeral implements the controller part of -// https://github.com/kubernetes/enhancements/tree/master/keps/sig-storage/1698-generic-ephemeral-volumes +// Package resourceclaim implements the controller part of +// https://github.com/kubernetes/enhancements/tree/master/keps/sig-node/3063-dynamic-resource-allocation // -// It was derived from the expand controller. -package ephemeral +// It was derived from the generic ephemeral volume controller. +package resourceclaim diff --git a/pkg/controller/resourceclaim/metrics/OWNERS b/pkg/controller/resourceclaim/metrics/OWNERS new file mode 100644 index 00000000000..77fbdc8e594 --- /dev/null +++ b/pkg/controller/resourceclaim/metrics/OWNERS @@ -0,0 +1,6 @@ +# See the OWNERS docs at https://go.k8s.io/owners + +reviewers: + - sig-instrumentation-reviewers +labels: + - sig/instrumentation diff --git a/pkg/controller/resourceclaim/metrics/metrics.go b/pkg/controller/resourceclaim/metrics/metrics.go index 20eb9f1f6e4..17c5496cd15 100644 --- a/pkg/controller/resourceclaim/metrics/metrics.go +++ b/pkg/controller/resourceclaim/metrics/metrics.go @@ -23,36 +23,36 @@ import ( "k8s.io/component-base/metrics/legacyregistry" ) -// EphemeralVolumeSubsystem - subsystem name used for Endpoint Slices. -const EphemeralVolumeSubsystem = "ephemeral_volume_controller" +// ResourceClaimSubsystem - subsystem name used for ResourceClaim creation +const ResourceClaimSubsystem = "resourceclaim_controller" var ( - // EphemeralVolumeCreateAttempts tracks the number of - // PersistentVolumeClaims().Create calls (both successful and unsuccessful) - EphemeralVolumeCreateAttempts = metrics.NewCounter( + // ResourceClaimCreateAttempts tracks the number of + // ResourceClaims().Create calls (both successful and unsuccessful) + ResourceClaimCreateAttempts = metrics.NewCounter( &metrics.CounterOpts{ - Subsystem: EphemeralVolumeSubsystem, - Name: "create_total", - Help: "Number of PersistenVolumeClaims creation requests", + Subsystem: ResourceClaimSubsystem, + Name: "create_attempts_total", + Help: "Number of ResourceClaims creation requests", StabilityLevel: metrics.ALPHA, }) - // EphemeralVolumeCreateFailures tracks the number of unsuccessful - // PersistentVolumeClaims().Create calls - EphemeralVolumeCreateFailures = metrics.NewCounter( + // ResourceClaimCreateFailures tracks the number of unsuccessful + // ResourceClaims().Create calls + ResourceClaimCreateFailures = metrics.NewCounter( &metrics.CounterOpts{ - Subsystem: EphemeralVolumeSubsystem, + Subsystem: ResourceClaimSubsystem, Name: "create_failures_total", - Help: "Number of PersistenVolumeClaims creation requests", + Help: "Number of ResourceClaims creation request failures", StabilityLevel: metrics.ALPHA, }) ) var registerMetrics sync.Once -// RegisterMetrics registers EphemeralVolume metrics. +// RegisterMetrics registers ResourceClaim metrics. func RegisterMetrics() { registerMetrics.Do(func() { - legacyregistry.MustRegister(EphemeralVolumeCreateAttempts) - legacyregistry.MustRegister(EphemeralVolumeCreateFailures) + legacyregistry.MustRegister(ResourceClaimCreateAttempts) + legacyregistry.MustRegister(ResourceClaimCreateFailures) }) } diff --git a/pkg/controller/resourceclaim/uid_cache.go b/pkg/controller/resourceclaim/uid_cache.go new file mode 100644 index 00000000000..b07caa5fb3c --- /dev/null +++ b/pkg/controller/resourceclaim/uid_cache.go @@ -0,0 +1,53 @@ +/* +Copyright 2022 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package resourceclaim + +import ( + "sync" + + "github.com/golang/groupcache/lru" + + "k8s.io/apimachinery/pkg/types" +) + +// uidCache is an LRU cache for uid. +type uidCache struct { + mutex sync.Mutex + cache *lru.Cache +} + +// newUIDCache returns a uidCache. +func newUIDCache(maxCacheEntries int) *uidCache { + return &uidCache{ + cache: lru.New(maxCacheEntries), + } +} + +// Add adds a uid to the cache. +func (c *uidCache) Add(uid types.UID) { + c.mutex.Lock() + defer c.mutex.Unlock() + c.cache.Add(uid, nil) +} + +// Has returns if a uid is in the cache. +func (c *uidCache) Has(uid types.UID) bool { + c.mutex.Lock() + defer c.mutex.Unlock() + _, found := c.cache.Get(uid) + return found +} diff --git a/plugin/pkg/auth/authorizer/rbac/bootstrappolicy/controller_policy.go b/plugin/pkg/auth/authorizer/rbac/bootstrappolicy/controller_policy.go index a0e93659f63..cbae6c311aa 100644 --- a/plugin/pkg/auth/authorizer/rbac/bootstrappolicy/controller_policy.go +++ b/plugin/pkg/auth/authorizer/rbac/bootstrappolicy/controller_policy.go @@ -205,6 +205,19 @@ func buildControllerRoles() ([]rbacv1.ClusterRole, []rbacv1.ClusterRoleBinding) }, }) + if utilfeature.DefaultFeatureGate.Enabled(features.DynamicResourceAllocation) { + addControllerRole(&controllerRoles, &controllerRoleBindings, rbacv1.ClusterRole{ + ObjectMeta: metav1.ObjectMeta{Name: saRolePrefix + "resource-claim-controller"}, + Rules: []rbacv1.PolicyRule{ + rbacv1helpers.NewRule("get", "list", "watch").Groups(legacyGroup).Resources("pods").RuleOrDie(), + rbacv1helpers.NewRule("update").Groups(legacyGroup).Resources("pods/finalizers").RuleOrDie(), + rbacv1helpers.NewRule("get", "list", "watch", "create").Groups(resourceGroup).Resources("resourceclaims").RuleOrDie(), + rbacv1helpers.NewRule("update", "patch").Groups(resourceGroup).Resources("resourceclaims/status").RuleOrDie(), + eventsRule(), + }, + }) + } + addControllerRole(&controllerRoles, &controllerRoleBindings, rbacv1.ClusterRole{ ObjectMeta: metav1.ObjectMeta{Name: saRolePrefix + "generic-garbage-collector"}, Rules: []rbacv1.PolicyRule{ diff --git a/plugin/pkg/auth/authorizer/rbac/bootstrappolicy/policy.go b/plugin/pkg/auth/authorizer/rbac/bootstrappolicy/policy.go index 85ef7b414f0..13266214bba 100644 --- a/plugin/pkg/auth/authorizer/rbac/bootstrappolicy/policy.go +++ b/plugin/pkg/auth/authorizer/rbac/bootstrappolicy/policy.go @@ -54,6 +54,7 @@ const ( extensionsGroup = "extensions" policyGroup = "policy" rbacGroup = "rbac.authorization.k8s.io" + resourceGroup = "resource.k8s.io" storageGroup = "storage.k8s.io" resMetricsGroup = "metrics.k8s.io" customMetricsGroup = "custom.metrics.k8s.io" diff --git a/vendor/modules.txt b/vendor/modules.txt index 6ee3ad6866f..97707583e6f 100644 --- a/vendor/modules.txt +++ b/vendor/modules.txt @@ -2053,6 +2053,9 @@ k8s.io/cri-api/pkg/errors ## explicit; go 1.19 k8s.io/csi-translation-lib k8s.io/csi-translation-lib/plugins +# k8s.io/dynamic-resource-allocation v0.0.0 => ./staging/src/k8s.io/dynamic-resource-allocation +## explicit; go 1.19 +k8s.io/dynamic-resource-allocation/resourceclaim # k8s.io/gengo v0.0.0-20220902162205-c0856e24416d ## explicit; go 1.13 k8s.io/gengo/args