kube-controller-manager: add ResourceClaim controller

The controller uses the exact same logic as the generic ephemeral inline volume
controller, just for inline ResourceClaimTemplate -> ResourceClaim.

In addition, it supports removal of pods from the ReservedFor field when those
pods are known to not need the claim anymore. At the moment, only this special
case is supported. Removal of arbitrary objects would imply granting full read
access to all types to determine whether a) an object is gone and b) if the
current incarnation is the one which is listed in ReservedFor. This may get
added later.
This commit is contained in:
Patrick Ohly 2022-03-22 16:56:49 +01:00
parent b87530af4f
commit 0133df3929
23 changed files with 624 additions and 545 deletions

View File

@ -475,6 +475,9 @@ func NewControllerInitializers(loopMode ControllerLoopMode) map[string]InitFunc
utilfeature.DefaultFeatureGate.Enabled(genericfeatures.StorageVersionAPI) { utilfeature.DefaultFeatureGate.Enabled(genericfeatures.StorageVersionAPI) {
register("storage-version-gc", startStorageVersionGCController) register("storage-version-gc", startStorageVersionGCController)
} }
if utilfeature.DefaultFeatureGate.Enabled(kubefeatures.DynamicResourceAllocation) {
controllers["resource-claim-controller"] = startResourceClaimController
}
return controllers return controllers
} }

View File

@ -54,6 +54,7 @@ import (
lifecyclecontroller "k8s.io/kubernetes/pkg/controller/nodelifecycle" lifecyclecontroller "k8s.io/kubernetes/pkg/controller/nodelifecycle"
"k8s.io/kubernetes/pkg/controller/podgc" "k8s.io/kubernetes/pkg/controller/podgc"
replicationcontroller "k8s.io/kubernetes/pkg/controller/replication" replicationcontroller "k8s.io/kubernetes/pkg/controller/replication"
"k8s.io/kubernetes/pkg/controller/resourceclaim"
resourcequotacontroller "k8s.io/kubernetes/pkg/controller/resourcequota" resourcequotacontroller "k8s.io/kubernetes/pkg/controller/resourcequota"
serviceaccountcontroller "k8s.io/kubernetes/pkg/controller/serviceaccount" serviceaccountcontroller "k8s.io/kubernetes/pkg/controller/serviceaccount"
"k8s.io/kubernetes/pkg/controller/storageversiongc" "k8s.io/kubernetes/pkg/controller/storageversiongc"
@ -357,6 +358,21 @@ func startEphemeralVolumeController(ctx context.Context, controllerContext Contr
return nil, true, nil return nil, true, nil
} }
const defaultResourceClaimControllerWorkers = 10
func startResourceClaimController(ctx context.Context, controllerContext ControllerContext) (controller.Interface, bool, error) {
ephemeralController, err := resourceclaim.NewController(
controllerContext.ClientBuilder.ClientOrDie("resource-claim-controller"),
controllerContext.InformerFactory.Core().V1().Pods(),
controllerContext.InformerFactory.Resource().V1alpha1().ResourceClaims(),
controllerContext.InformerFactory.Resource().V1alpha1().ResourceClaimTemplates())
if err != nil {
return nil, true, fmt.Errorf("failed to start ephemeral volume controller: %v", err)
}
go ephemeralController.Run(ctx, defaultResourceClaimControllerWorkers)
return nil, true, nil
}
func startEndpointController(ctx context.Context, controllerCtx ControllerContext) (controller.Interface, bool, error) { func startEndpointController(ctx context.Context, controllerCtx ControllerContext) (controller.Interface, bool, error) {
go endpointcontroller.NewEndpointController( go endpointcontroller.NewEndpointController(
controllerCtx.InformerFactory.Core().V1().Pods(), controllerCtx.InformerFactory.Core().V1().Pods(),

View File

@ -33,6 +33,7 @@ import (
_ "k8s.io/kubernetes/pkg/apis/extensions/install" _ "k8s.io/kubernetes/pkg/apis/extensions/install"
_ "k8s.io/kubernetes/pkg/apis/policy/install" _ "k8s.io/kubernetes/pkg/apis/policy/install"
_ "k8s.io/kubernetes/pkg/apis/rbac/install" _ "k8s.io/kubernetes/pkg/apis/rbac/install"
_ "k8s.io/kubernetes/pkg/apis/resource/install"
_ "k8s.io/kubernetes/pkg/apis/scheduling/install" _ "k8s.io/kubernetes/pkg/apis/scheduling/install"
_ "k8s.io/kubernetes/pkg/apis/storage/install" _ "k8s.io/kubernetes/pkg/apis/storage/install"
) )

1
go.mod
View File

@ -107,6 +107,7 @@ require (
k8s.io/controller-manager v0.0.0 k8s.io/controller-manager v0.0.0
k8s.io/cri-api v0.0.0 k8s.io/cri-api v0.0.0
k8s.io/csi-translation-lib v0.0.0 k8s.io/csi-translation-lib v0.0.0
k8s.io/dynamic-resource-allocation v0.0.0
k8s.io/gengo v0.0.0-20220902162205-c0856e24416d k8s.io/gengo v0.0.0-20220902162205-c0856e24416d
k8s.io/klog/v2 v2.80.1 k8s.io/klog/v2 v2.80.1
k8s.io/kms v0.0.0 k8s.io/kms v0.0.0

View File

@ -1,6 +1,12 @@
# See the OWNERS docs at https://go.k8s.io/owners # See the OWNERS docs at https://go.k8s.io/owners
approvers: approvers:
- saad-ali - sig-node-approvers
- jsafrane - klueska
- pohly - pohly
reviewers:
- klueska
- pohly
- bart0sh
labels:
- sig/node

View File

@ -1,19 +0,0 @@
/*
Copyright 2021 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
// +k8s:deepcopy-gen=package
package config // import "k8s.io/kubernetes/pkg/controller/volume/ephemeral/config"

View File

@ -1,25 +0,0 @@
/*
Copyright 2021 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package config
// EphemeralVolumeControllerConfiguration contains elements describing EphemeralVolumeController.
type EphemeralVolumeControllerConfiguration struct {
// ConcurrentEphemeralVolumeSyncs is the number of ephemeral volume syncing operations
// that will be done concurrently. Larger number = faster ephemeral volume updating,
// but more CPU (and network) load.
ConcurrentEphemeralVolumeSyncs int32
}

View File

@ -1,40 +0,0 @@
/*
Copyright 2021 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package v1alpha1
import (
"k8s.io/apimachinery/pkg/conversion"
"k8s.io/kube-controller-manager/config/v1alpha1"
"k8s.io/kubernetes/pkg/controller/volume/ephemeral/config"
)
// Important! The public back-and-forth conversion functions for the types in this package
// with EphemeralVolumeControllerConfiguration types need to be manually exposed like this in order for
// other packages that reference this package to be able to call these conversion functions
// in an autogenerated manner.
// TODO: Fix the bug in conversion-gen so it automatically discovers these Convert_* functions
// in autogenerated code as well.
// Convert_v1alpha1_EphemeralVolumeControllerConfiguration_To_config_EphemeralVolumeControllerConfiguration is an autogenerated conversion function.
func Convert_v1alpha1_EphemeralVolumeControllerConfiguration_To_config_EphemeralVolumeControllerConfiguration(in *v1alpha1.EphemeralVolumeControllerConfiguration, out *config.EphemeralVolumeControllerConfiguration, s conversion.Scope) error {
return autoConvert_v1alpha1_EphemeralVolumeControllerConfiguration_To_config_EphemeralVolumeControllerConfiguration(in, out, s)
}
// Convert_config_EphemeralVolumeControllerConfiguration_To_v1alpha1_EphemeralVolumeControllerConfiguration is an autogenerated conversion function.
func Convert_config_EphemeralVolumeControllerConfiguration_To_v1alpha1_EphemeralVolumeControllerConfiguration(in *config.EphemeralVolumeControllerConfiguration, out *v1alpha1.EphemeralVolumeControllerConfiguration, s conversion.Scope) error {
return autoConvert_config_EphemeralVolumeControllerConfiguration_To_v1alpha1_EphemeralVolumeControllerConfiguration(in, out, s)
}

View File

@ -1,36 +0,0 @@
/*
Copyright 2021 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package v1alpha1
import (
kubectrlmgrconfigv1alpha1 "k8s.io/kube-controller-manager/config/v1alpha1"
)
// RecommendedDefaultEphemeralVolumeControllerConfiguration defaults a pointer to a
// EphemeralVolumeControllerConfiguration struct. This will set the recommended default
// values, but they may be subject to change between API versions. This function
// is intentionally not registered in the scheme as a "normal" `SetDefaults_Foo`
// function to allow consumers of this type to set whatever defaults for their
// embedded configs. Forcing consumers to use these defaults would be problematic
// as defaulting in the scheme is done as part of the conversion, and there would
// be no easy way to opt-out. Instead, if you want to use this defaulting method
// run it in your wrapper struct of this type in its `SetDefaults_` method.
func RecommendedDefaultEphemeralVolumeControllerConfiguration(obj *kubectrlmgrconfigv1alpha1.EphemeralVolumeControllerConfiguration) {
if obj.ConcurrentEphemeralVolumeSyncs == 0 {
obj.ConcurrentEphemeralVolumeSyncs = 5
}
}

View File

@ -1,21 +0,0 @@
/*
Copyright 2021 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
// +k8s:deepcopy-gen=package
// +k8s:conversion-gen=k8s.io/kubernetes/pkg/controller/volume/ephemeral/config
// +k8s:conversion-gen-external-types=k8s.io/kube-controller-manager/config/v1alpha1
package v1alpha1 // import "k8s.io/kubernetes/pkg/controller/volume/ephemeral/config/v1alpha1"

View File

@ -1,31 +0,0 @@
/*
Copyright 2021 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package v1alpha1
import (
"k8s.io/apimachinery/pkg/runtime"
)
var (
// SchemeBuilder is the scheme builder with scheme init functions to run for this API package
SchemeBuilder runtime.SchemeBuilder
// localSchemeBuilder extends the SchemeBuilder instance with the external types. In this package,
// defaulting and conversion init funcs are registered as well.
localSchemeBuilder = &SchemeBuilder
// AddToScheme is a global function that registers this API group & version to a scheme
AddToScheme = localSchemeBuilder.AddToScheme
)

View File

@ -1,92 +0,0 @@
//go:build !ignore_autogenerated
// +build !ignore_autogenerated
/*
Copyright The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
// Code generated by conversion-gen. DO NOT EDIT.
package v1alpha1
import (
v1 "k8s.io/apimachinery/pkg/apis/meta/v1"
conversion "k8s.io/apimachinery/pkg/conversion"
runtime "k8s.io/apimachinery/pkg/runtime"
v1alpha1 "k8s.io/kube-controller-manager/config/v1alpha1"
config "k8s.io/kubernetes/pkg/controller/volume/ephemeral/config"
)
func init() {
localSchemeBuilder.Register(RegisterConversions)
}
// RegisterConversions adds conversion functions to the given scheme.
// Public to allow building arbitrary schemes.
func RegisterConversions(s *runtime.Scheme) error {
if err := s.AddGeneratedConversionFunc((*v1alpha1.GroupResource)(nil), (*v1.GroupResource)(nil), func(a, b interface{}, scope conversion.Scope) error {
return Convert_v1alpha1_GroupResource_To_v1_GroupResource(a.(*v1alpha1.GroupResource), b.(*v1.GroupResource), scope)
}); err != nil {
return err
}
if err := s.AddGeneratedConversionFunc((*v1.GroupResource)(nil), (*v1alpha1.GroupResource)(nil), func(a, b interface{}, scope conversion.Scope) error {
return Convert_v1_GroupResource_To_v1alpha1_GroupResource(a.(*v1.GroupResource), b.(*v1alpha1.GroupResource), scope)
}); err != nil {
return err
}
if err := s.AddConversionFunc((*config.EphemeralVolumeControllerConfiguration)(nil), (*v1alpha1.EphemeralVolumeControllerConfiguration)(nil), func(a, b interface{}, scope conversion.Scope) error {
return Convert_config_EphemeralVolumeControllerConfiguration_To_v1alpha1_EphemeralVolumeControllerConfiguration(a.(*config.EphemeralVolumeControllerConfiguration), b.(*v1alpha1.EphemeralVolumeControllerConfiguration), scope)
}); err != nil {
return err
}
if err := s.AddConversionFunc((*v1alpha1.EphemeralVolumeControllerConfiguration)(nil), (*config.EphemeralVolumeControllerConfiguration)(nil), func(a, b interface{}, scope conversion.Scope) error {
return Convert_v1alpha1_EphemeralVolumeControllerConfiguration_To_config_EphemeralVolumeControllerConfiguration(a.(*v1alpha1.EphemeralVolumeControllerConfiguration), b.(*config.EphemeralVolumeControllerConfiguration), scope)
}); err != nil {
return err
}
return nil
}
func autoConvert_v1alpha1_EphemeralVolumeControllerConfiguration_To_config_EphemeralVolumeControllerConfiguration(in *v1alpha1.EphemeralVolumeControllerConfiguration, out *config.EphemeralVolumeControllerConfiguration, s conversion.Scope) error {
out.ConcurrentEphemeralVolumeSyncs = in.ConcurrentEphemeralVolumeSyncs
return nil
}
func autoConvert_config_EphemeralVolumeControllerConfiguration_To_v1alpha1_EphemeralVolumeControllerConfiguration(in *config.EphemeralVolumeControllerConfiguration, out *v1alpha1.EphemeralVolumeControllerConfiguration, s conversion.Scope) error {
out.ConcurrentEphemeralVolumeSyncs = in.ConcurrentEphemeralVolumeSyncs
return nil
}
func autoConvert_v1alpha1_GroupResource_To_v1_GroupResource(in *v1alpha1.GroupResource, out *v1.GroupResource, s conversion.Scope) error {
out.Group = in.Group
out.Resource = in.Resource
return nil
}
// Convert_v1alpha1_GroupResource_To_v1_GroupResource is an autogenerated conversion function.
func Convert_v1alpha1_GroupResource_To_v1_GroupResource(in *v1alpha1.GroupResource, out *v1.GroupResource, s conversion.Scope) error {
return autoConvert_v1alpha1_GroupResource_To_v1_GroupResource(in, out, s)
}
func autoConvert_v1_GroupResource_To_v1alpha1_GroupResource(in *v1.GroupResource, out *v1alpha1.GroupResource, s conversion.Scope) error {
out.Group = in.Group
out.Resource = in.Resource
return nil
}
// Convert_v1_GroupResource_To_v1alpha1_GroupResource is an autogenerated conversion function.
func Convert_v1_GroupResource_To_v1alpha1_GroupResource(in *v1.GroupResource, out *v1alpha1.GroupResource, s conversion.Scope) error {
return autoConvert_v1_GroupResource_To_v1alpha1_GroupResource(in, out, s)
}

View File

@ -1,22 +0,0 @@
//go:build !ignore_autogenerated
// +build !ignore_autogenerated
/*
Copyright The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
// Code generated by deepcopy-gen. DO NOT EDIT.
package v1alpha1

View File

@ -1,38 +0,0 @@
//go:build !ignore_autogenerated
// +build !ignore_autogenerated
/*
Copyright The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
// Code generated by deepcopy-gen. DO NOT EDIT.
package config
// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil.
func (in *EphemeralVolumeControllerConfiguration) DeepCopyInto(out *EphemeralVolumeControllerConfiguration) {
*out = *in
return
}
// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new EphemeralVolumeControllerConfiguration.
func (in *EphemeralVolumeControllerConfiguration) DeepCopy() *EphemeralVolumeControllerConfiguration {
if in == nil {
return nil
}
out := new(EphemeralVolumeControllerConfiguration)
in.DeepCopyInto(out)
return out
}

View File

@ -14,58 +14,69 @@ See the License for the specific language governing permissions and
limitations under the License. limitations under the License.
*/ */
package ephemeral package resourceclaim
import ( import (
"context" "context"
"fmt" "fmt"
"strings"
"time" "time"
"k8s.io/klog/v2"
v1 "k8s.io/api/core/v1" v1 "k8s.io/api/core/v1"
resourcev1alpha1 "k8s.io/api/resource/v1alpha1"
"k8s.io/apimachinery/pkg/api/errors" "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/util/runtime" "k8s.io/apimachinery/pkg/util/runtime"
"k8s.io/apimachinery/pkg/util/wait" "k8s.io/apimachinery/pkg/util/wait"
coreinformers "k8s.io/client-go/informers/core/v1" v1informers "k8s.io/client-go/informers/core/v1"
resourcev1alpha1informers "k8s.io/client-go/informers/resource/v1alpha1"
clientset "k8s.io/client-go/kubernetes" clientset "k8s.io/client-go/kubernetes"
"k8s.io/client-go/kubernetes/scheme" "k8s.io/client-go/kubernetes/scheme"
v1core "k8s.io/client-go/kubernetes/typed/core/v1" v1core "k8s.io/client-go/kubernetes/typed/core/v1"
corelisters "k8s.io/client-go/listers/core/v1" v1listers "k8s.io/client-go/listers/core/v1"
resourcev1alpha1listers "k8s.io/client-go/listers/resource/v1alpha1"
"k8s.io/client-go/tools/cache" "k8s.io/client-go/tools/cache"
kcache "k8s.io/client-go/tools/cache"
"k8s.io/client-go/tools/record" "k8s.io/client-go/tools/record"
"k8s.io/client-go/util/workqueue" "k8s.io/client-go/util/workqueue"
"k8s.io/component-helpers/storage/ephemeral" "k8s.io/dynamic-resource-allocation/resourceclaim"
"k8s.io/kubernetes/pkg/controller/volume/common" "k8s.io/klog/v2"
ephemeralvolumemetrics "k8s.io/kubernetes/pkg/controller/volume/ephemeral/metrics" podutil "k8s.io/kubernetes/pkg/api/v1/pod"
"k8s.io/kubernetes/pkg/controller/volume/events" "k8s.io/kubernetes/pkg/controller/resourceclaim/metrics"
) )
// Controller creates PVCs for ephemeral inline volumes in a pod spec. const (
type Controller interface { // podResourceClaimIndex is the lookup name for the index function which indexes by pod ResourceClaim templates.
Run(ctx context.Context, workers int) podResourceClaimIndex = "pod-resource-claim-index"
}
type ephemeralController struct { maxUIDCacheEntries = 500
// kubeClient is the kube API client used by volumehost to communicate with )
// the API server.
// Controller creates ResourceClaims for ResourceClaimTemplates in a pod spec.
type Controller struct {
// kubeClient is the kube API client used to communicate with the API
// server.
kubeClient clientset.Interface kubeClient clientset.Interface
// pvcLister is the shared PVC lister used to fetch and store PVC // claimLister is the shared ResourceClaim lister used to fetch and store ResourceClaim
// objects from the API server. It is shared with other controllers and // objects from the API server. It is shared with other controllers and
// therefore the PVC objects in its store should be treated as immutable. // therefore the ResourceClaim objects in its store should be treated as immutable.
pvcLister corelisters.PersistentVolumeClaimLister claimLister resourcev1alpha1listers.ResourceClaimLister
pvcsSynced kcache.InformerSynced claimsSynced cache.InformerSynced
// podLister is the shared Pod lister used to fetch Pod // podLister is the shared Pod lister used to fetch Pod
// objects from the API server. It is shared with other controllers and // objects from the API server. It is shared with other controllers and
// therefore the Pod objects in its store should be treated as immutable. // therefore the Pod objects in its store should be treated as immutable.
podLister corelisters.PodLister podLister v1listers.PodLister
podSynced kcache.InformerSynced podSynced cache.InformerSynced
// podIndexer has the common PodPVC indexer indexer installed To // templateLister is the shared ResourceClaimTemplate lister used to
// fetch template objects from the API server. It is shared with other
// controllers and therefore the objects in its store should be treated
// as immutable.
templateLister resourcev1alpha1listers.ResourceClaimTemplateLister
templatesSynced cache.InformerSynced
// podIndexer has the common PodResourceClaim indexer indexer installed To
// limit iteration over pods to those of interest. // limit iteration over pods to those of interest.
podIndexer cache.Indexer podIndexer cache.Indexer
@ -73,104 +84,158 @@ type ephemeralController struct {
recorder record.EventRecorder recorder record.EventRecorder
queue workqueue.RateLimitingInterface queue workqueue.RateLimitingInterface
// The deletedObjects cache keeps track of Pods for which we know that
// they have existed and have been removed. For those we can be sure
// that a ReservedFor entry needs to be removed.
deletedObjects *uidCache
} }
// NewController creates an ephemeral volume controller. const (
claimKeyPrefix = "claim:"
podKeyPrefix = "pod:"
)
// NewController creates a ResourceClaim controller.
func NewController( func NewController(
kubeClient clientset.Interface, kubeClient clientset.Interface,
podInformer coreinformers.PodInformer, podInformer v1informers.PodInformer,
pvcInformer coreinformers.PersistentVolumeClaimInformer) (Controller, error) { claimInformer resourcev1alpha1informers.ResourceClaimInformer,
templateInformer resourcev1alpha1informers.ResourceClaimTemplateInformer) (*Controller, error) {
ec := &ephemeralController{ ec := &Controller{
kubeClient: kubeClient, kubeClient: kubeClient,
podLister: podInformer.Lister(), podLister: podInformer.Lister(),
podIndexer: podInformer.Informer().GetIndexer(), podIndexer: podInformer.Informer().GetIndexer(),
podSynced: podInformer.Informer().HasSynced, podSynced: podInformer.Informer().HasSynced,
pvcLister: pvcInformer.Lister(), claimLister: claimInformer.Lister(),
pvcsSynced: pvcInformer.Informer().HasSynced, claimsSynced: claimInformer.Informer().HasSynced,
queue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "ephemeral_volume"), templateLister: templateInformer.Lister(),
templatesSynced: templateInformer.Informer().HasSynced,
queue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "resource_claim"),
deletedObjects: newUIDCache(maxUIDCacheEntries),
} }
ephemeralvolumemetrics.RegisterMetrics() metrics.RegisterMetrics()
eventBroadcaster := record.NewBroadcaster() eventBroadcaster := record.NewBroadcaster()
eventBroadcaster.StartLogging(klog.Infof) eventBroadcaster.StartLogging(klog.Infof)
eventBroadcaster.StartRecordingToSink(&v1core.EventSinkImpl{Interface: kubeClient.CoreV1().Events("")}) eventBroadcaster.StartRecordingToSink(&v1core.EventSinkImpl{Interface: kubeClient.CoreV1().Events("")})
ec.recorder = eventBroadcaster.NewRecorder(scheme.Scheme, v1.EventSource{Component: "ephemeral_volume"}) ec.recorder = eventBroadcaster.NewRecorder(scheme.Scheme, v1.EventSource{Component: "resource_claim"})
podInformer.Informer().AddEventHandler(kcache.ResourceEventHandlerFuncs{ if _, err := podInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: ec.enqueuePod, AddFunc: func(obj interface{}) {
// The pod spec is immutable. Therefore the controller can ignore pod updates ec.enqueuePod(obj, false)
// because there cannot be any changes that have to be copied into the generated },
// PVC. UpdateFunc: func(old, updated interface{}) {
// Deletion of the PVC is handled through the owner reference and garbage collection. ec.enqueuePod(updated, false)
// Therefore pod deletions also can be ignored. },
}) DeleteFunc: func(obj interface{}) {
pvcInformer.Informer().AddEventHandler(kcache.ResourceEventHandlerFuncs{ ec.enqueuePod(obj, true)
DeleteFunc: ec.onPVCDelete, },
}) }); err != nil {
if err := common.AddPodPVCIndexerIfNotPresent(ec.podIndexer); err != nil { return nil, err
return nil, fmt.Errorf("could not initialize pvc protection controller: %w", err) }
if _, err := claimInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: ec.onResourceClaimAddOrUpdate,
UpdateFunc: func(old, updated interface{}) {
ec.onResourceClaimAddOrUpdate(updated)
},
DeleteFunc: ec.onResourceClaimDelete,
}); err != nil {
return nil, err
}
if err := ec.podIndexer.AddIndexers(cache.Indexers{podResourceClaimIndex: podResourceClaimIndexFunc}); err != nil {
return nil, fmt.Errorf("could not initialize ResourceClaim controller: %w", err)
} }
return ec, nil return ec, nil
} }
func (ec *ephemeralController) enqueuePod(obj interface{}) { func (ec *Controller) enqueuePod(obj interface{}, deleted bool) {
if d, ok := obj.(cache.DeletedFinalStateUnknown); ok {
obj = d.Obj
}
pod, ok := obj.(*v1.Pod) pod, ok := obj.(*v1.Pod)
if !ok { if !ok {
// Not a pod?!
return return
} }
// Ignore pods which are already getting deleted. if deleted {
if pod.DeletionTimestamp != nil { ec.deletedObjects.Add(pod.UID)
}
if len(pod.Spec.ResourceClaims) == 0 {
// Nothing to do for it at all.
return return
} }
for _, vol := range pod.Spec.Volumes { // Release reservations of a deleted or completed pod?
if vol.Ephemeral != nil { if deleted ||
// It has at least one ephemeral inline volume, work on it. podutil.IsPodTerminal(pod) ||
key, err := kcache.DeletionHandlingMetaNamespaceKeyFunc(pod) // Deleted and not scheduled:
if err != nil { pod.DeletionTimestamp != nil && pod.Spec.NodeName == "" {
runtime.HandleError(fmt.Errorf("couldn't get key for object %#v: %v", pod, err)) for _, podClaim := range pod.Spec.ResourceClaims {
return claimName := resourceclaim.Name(pod, &podClaim)
ec.queue.Add(claimKeyPrefix + pod.Namespace + "/" + claimName)
}
}
// Create ResourceClaim for inline templates?
if pod.DeletionTimestamp == nil {
for _, podClaim := range pod.Spec.ResourceClaims {
if podClaim.Source.ResourceClaimTemplateName != nil {
// It has at least one inline template, work on it.
ec.queue.Add(podKeyPrefix + pod.Namespace + "/" + pod.Name)
break
} }
ec.queue.Add(key)
break
} }
} }
} }
func (ec *ephemeralController) onPVCDelete(obj interface{}) { func (ec *Controller) onResourceClaimAddOrUpdate(obj interface{}) {
pvc, ok := obj.(*v1.PersistentVolumeClaim) claim, ok := obj.(*resourcev1alpha1.ResourceClaim)
if !ok { if !ok {
return return
} }
// Someone deleted a PVC, either intentionally or // When starting up, we have to check all claims to find those with
// stale pods in ReservedFor. During an update, a pod might get added
// that already no longer exists.
ec.queue.Add(claimKeyPrefix + claim.Namespace + "/" + claim.Name)
}
func (ec *Controller) onResourceClaimDelete(obj interface{}) {
claim, ok := obj.(*resourcev1alpha1.ResourceClaim)
if !ok {
return
}
// Someone deleted a ResourceClaim, either intentionally or
// accidentally. If there is a pod referencing it because of // accidentally. If there is a pod referencing it because of
// an ephemeral volume, then we should re-create the PVC. // an inline resource, then we should re-create the ResourceClaim.
// The common indexer does some prefiltering for us by // The common indexer does some prefiltering for us by
// limiting the list to those pods which reference // limiting the list to those pods which reference
// the PVC. // the ResourceClaim.
objs, err := ec.podIndexer.ByIndex(common.PodPVCIndex, fmt.Sprintf("%s/%s", pvc.Namespace, pvc.Name)) objs, err := ec.podIndexer.ByIndex(podResourceClaimIndex, fmt.Sprintf("%s/%s", claim.Namespace, claim.Name))
if err != nil { if err != nil {
runtime.HandleError(fmt.Errorf("listing pods from cache: %v", err)) runtime.HandleError(fmt.Errorf("listing pods from cache: %v", err))
return return
} }
for _, obj := range objs { for _, obj := range objs {
ec.enqueuePod(obj) ec.enqueuePod(obj, false)
} }
} }
func (ec *ephemeralController) Run(ctx context.Context, workers int) { func (ec *Controller) Run(ctx context.Context, workers int) {
defer runtime.HandleCrash() defer runtime.HandleCrash()
defer ec.queue.ShutDown() defer ec.queue.ShutDown()
klog.Infof("Starting ephemeral volume controller") klog.Infof("Starting ephemeral volume controller")
defer klog.Infof("Shutting down ephemeral volume controller") defer klog.Infof("Shutting down ephemeral volume controller")
if !cache.WaitForNamedCacheSync("ephemeral", ctx.Done(), ec.podSynced, ec.pvcsSynced) { if !cache.WaitForNamedCacheSync("ephemeral", ctx.Done(), ec.podSynced, ec.claimsSynced) {
return return
} }
@ -181,12 +246,12 @@ func (ec *ephemeralController) Run(ctx context.Context, workers int) {
<-ctx.Done() <-ctx.Done()
} }
func (ec *ephemeralController) runWorker(ctx context.Context) { func (ec *Controller) runWorker(ctx context.Context) {
for ec.processNextWorkItem(ctx) { for ec.processNextWorkItem(ctx) {
} }
} }
func (ec *ephemeralController) processNextWorkItem(ctx context.Context) bool { func (ec *Controller) processNextWorkItem(ctx context.Context) bool {
key, shutdown := ec.queue.Get() key, shutdown := ec.queue.Get()
if shutdown { if shutdown {
return false return false
@ -205,65 +270,92 @@ func (ec *ephemeralController) processNextWorkItem(ctx context.Context) bool {
return true return true
} }
// syncHandler is invoked for each pod which might need to be processed. // syncHandler is invoked for each work item which might need to be processed.
// If an error is returned from this function, the pod will be requeued. // If an error is returned from this function, the item will be requeued.
func (ec *ephemeralController) syncHandler(ctx context.Context, key string) error { func (ec *Controller) syncHandler(ctx context.Context, key string) error {
namespace, name, err := kcache.SplitMetaNamespaceKey(key) sep := strings.Index(key, ":")
if sep < 0 {
return fmt.Errorf("unexpected key: %s", key)
}
prefix, object := key[0:sep+1], key[sep+1:]
namespace, name, err := cache.SplitMetaNamespaceKey(object)
if err != nil { if err != nil {
return err return err
} }
switch prefix {
case podKeyPrefix:
return ec.syncPod(ctx, namespace, name)
case claimKeyPrefix:
return ec.syncClaim(ctx, namespace, name)
default:
return fmt.Errorf("unexpected key prefix: %s", prefix)
}
}
func (ec *Controller) syncPod(ctx context.Context, namespace, name string) error {
logger := klog.LoggerWithValues(klog.FromContext(ctx), "pod", klog.KRef(namespace, name))
ctx = klog.NewContext(ctx, logger)
pod, err := ec.podLister.Pods(namespace).Get(name) pod, err := ec.podLister.Pods(namespace).Get(name)
if err != nil { if err != nil {
if errors.IsNotFound(err) { if errors.IsNotFound(err) {
klog.V(5).Infof("ephemeral: nothing to do for pod %s, it is gone", key) logger.V(5).Info("nothing to do for pod, it is gone")
return nil return nil
} }
klog.V(5).Infof("Error getting pod %s/%s (uid: %q) from informer : %v", pod.Namespace, pod.Name, pod.UID, err)
return err return err
} }
// Ignore pods which are already getting deleted. // Ignore pods which are already getting deleted.
if pod.DeletionTimestamp != nil { if pod.DeletionTimestamp != nil {
klog.V(5).Infof("ephemeral: nothing to do for pod %s, it is marked for deletion", key) logger.V(5).Info("nothing to do for pod, it is marked for deletion")
return nil return nil
} }
for _, vol := range pod.Spec.Volumes { for _, podClaim := range pod.Spec.ResourceClaims {
if err := ec.handleVolume(ctx, pod, vol); err != nil { if err := ec.handleClaim(ctx, pod, podClaim); err != nil {
ec.recorder.Event(pod, v1.EventTypeWarning, events.FailedBinding, fmt.Sprintf("ephemeral volume %s: %v", vol.Name, err)) ec.recorder.Event(pod, v1.EventTypeWarning, "FailedResourceClaimCreation", fmt.Sprintf("PodResourceClaim %s: %v", podClaim.Name, err))
return fmt.Errorf("pod %s, ephemeral volume %s: %v", key, vol.Name, err) return fmt.Errorf("pod %s/%s, PodResourceClaim %s: %v", namespace, name, podClaim.Name, err)
} }
} }
return nil return nil
} }
// handleEphemeralVolume is invoked for each volume of a pod. // handleResourceClaim is invoked for each volume of a pod.
func (ec *ephemeralController) handleVolume(ctx context.Context, pod *v1.Pod, vol v1.Volume) error { func (ec *Controller) handleClaim(ctx context.Context, pod *v1.Pod, podClaim v1.PodResourceClaim) error {
klog.V(5).Infof("ephemeral: checking volume %s", vol.Name) logger := klog.LoggerWithValues(klog.FromContext(ctx), "podClaim", podClaim.Name)
if vol.Ephemeral == nil { ctx = klog.NewContext(ctx, logger)
logger.V(5).Info("checking", "podClaim", podClaim.Name)
templateName := podClaim.Source.ResourceClaimTemplateName
if templateName == nil {
return nil return nil
} }
pvcName := ephemeral.VolumeClaimName(pod, &vol) claimName := resourceclaim.Name(pod, &podClaim)
pvc, err := ec.pvcLister.PersistentVolumeClaims(pod.Namespace).Get(pvcName) claim, err := ec.claimLister.ResourceClaims(pod.Namespace).Get(claimName)
if err != nil && !errors.IsNotFound(err) { if err != nil && !errors.IsNotFound(err) {
return err return err
} }
if pvc != nil { if claim != nil {
if err := ephemeral.VolumeIsForPod(pod, pvc); err != nil { if err := resourceclaim.IsForPod(pod, claim); err != nil {
return err return err
} }
// Already created, nothing more to do. // Already created, nothing more to do.
klog.V(5).Infof("ephemeral: volume %s: PVC %s already created", vol.Name, pvcName) logger.V(5).Info("claim already created", "podClaim", podClaim.Name, "resourceClaim", claimName)
return nil return nil
} }
// Create the PVC with pod as owner. template, err := ec.templateLister.ResourceClaimTemplates(pod.Namespace).Get(*templateName)
if err != nil {
return fmt.Errorf("resource claim template %q: %v", *templateName, err)
}
// Create the ResourceClaim with pod as owner.
isTrue := true isTrue := true
pvc = &v1.PersistentVolumeClaim{ claim = &resourcev1alpha1.ResourceClaim{
ObjectMeta: metav1.ObjectMeta{ ObjectMeta: metav1.ObjectMeta{
Name: pvcName, Name: claimName,
OwnerReferences: []metav1.OwnerReference{ OwnerReferences: []metav1.OwnerReference{
{ {
APIVersion: "v1", APIVersion: "v1",
@ -274,16 +366,114 @@ func (ec *ephemeralController) handleVolume(ctx context.Context, pod *v1.Pod, vo
BlockOwnerDeletion: &isTrue, BlockOwnerDeletion: &isTrue,
}, },
}, },
Annotations: vol.Ephemeral.VolumeClaimTemplate.Annotations, Annotations: template.Spec.ObjectMeta.Annotations,
Labels: vol.Ephemeral.VolumeClaimTemplate.Labels, Labels: template.Spec.ObjectMeta.Labels,
}, },
Spec: vol.Ephemeral.VolumeClaimTemplate.Spec, Spec: template.Spec.Spec,
} }
ephemeralvolumemetrics.EphemeralVolumeCreateAttempts.Inc() metrics.ResourceClaimCreateAttempts.Inc()
_, err = ec.kubeClient.CoreV1().PersistentVolumeClaims(pod.Namespace).Create(ctx, pvc, metav1.CreateOptions{}) _, err = ec.kubeClient.ResourceV1alpha1().ResourceClaims(pod.Namespace).Create(ctx, claim, metav1.CreateOptions{})
if err != nil { if err != nil {
ephemeralvolumemetrics.EphemeralVolumeCreateFailures.Inc() metrics.ResourceClaimCreateFailures.Inc()
return fmt.Errorf("create PVC %s: %v", pvcName, err) return fmt.Errorf("create ResourceClaim %s: %v", claimName, err)
} }
return nil return nil
} }
func (ec *Controller) syncClaim(ctx context.Context, namespace, name string) error {
logger := klog.LoggerWithValues(klog.FromContext(ctx), "claim", klog.KRef(namespace, name))
ctx = klog.NewContext(ctx, logger)
claim, err := ec.claimLister.ResourceClaims(namespace).Get(name)
if err != nil {
if errors.IsNotFound(err) {
logger.V(5).Info("nothing to do for claim, it is gone")
return nil
}
return err
}
// Check if the ReservedFor entries are all still valid.
valid := make([]resourcev1alpha1.ResourceClaimConsumerReference, 0, len(claim.Status.ReservedFor))
for _, reservedFor := range claim.Status.ReservedFor {
if reservedFor.APIGroup == "" &&
reservedFor.Resource == "pods" {
// A pod falls into one of three categories:
// - we have it in our cache -> don't remove it until we are told that it got removed
// - we don't have it in our cache anymore, but we have seen it before -> it was deleted, remove it
// - not in our cache, not seen -> double-check with API server before removal
keepEntry := true
// Tracking deleted pods in the LRU cache is an
// optimization. Without this cache, the code would
// have to do the API call below for every deleted pod
// to ensure that the pod really doesn't exist. With
// the cache, most of the time the pod will be recorded
// as deleted and the API call can be avoided.
if ec.deletedObjects.Has(reservedFor.UID) {
// We know that the pod was deleted. This is
// easy to check and thus is done first.
keepEntry = false
} else {
pod, err := ec.podLister.Pods(claim.Namespace).Get(reservedFor.Name)
if err != nil && !errors.IsNotFound(err) {
return err
}
if pod == nil {
// We might not have it in our informer cache
// yet. Removing the pod while the scheduler is
// scheduling it would be bad. We have to be
// absolutely sure and thus have to check with
// the API server.
pod, err := ec.kubeClient.CoreV1().Pods(claim.Namespace).Get(ctx, reservedFor.Name, metav1.GetOptions{})
if err != nil && !errors.IsNotFound(err) {
return err
}
if pod == nil || pod.UID != reservedFor.UID {
keepEntry = false
}
} else if pod.UID != reservedFor.UID {
// Pod exists, but is a different incarnation under the same name.
keepEntry = false
}
}
if keepEntry {
valid = append(valid, reservedFor)
}
continue
}
// TODO: support generic object lookup
return fmt.Errorf("unsupported ReservedFor entry: %v", reservedFor)
}
if len(valid) < len(claim.Status.ReservedFor) {
// TODO (#113700): patch
claim := claim.DeepCopy()
claim.Status.ReservedFor = valid
_, err := ec.kubeClient.ResourceV1alpha1().ResourceClaims(claim.Namespace).UpdateStatus(ctx, claim, metav1.UpdateOptions{})
if err != nil {
return err
}
}
return nil
}
// podResourceClaimIndexFunc is an index function that returns ResourceClaim keys (=
// namespace/name) for ResourceClaimTemplates in a given pod.
func podResourceClaimIndexFunc(obj interface{}) ([]string, error) {
pod, ok := obj.(*v1.Pod)
if !ok {
return []string{}, nil
}
keys := []string{}
for _, podClaim := range pod.Spec.ResourceClaims {
if podClaim.Source.ResourceClaimTemplateName != nil {
claimName := resourceclaim.Name(pod, &podClaim)
keys = append(keys, fmt.Sprintf("%s/%s", pod.Namespace, claimName))
}
}
return keys, nil
}

View File

@ -14,7 +14,7 @@ See the License for the specific language governing permissions and
limitations under the License. limitations under the License.
*/ */
package ephemeral package resourceclaim
import ( import (
"context" "context"
@ -22,38 +22,62 @@ import (
"sort" "sort"
"testing" "testing"
"github.com/stretchr/testify/assert"
v1 "k8s.io/api/core/v1" v1 "k8s.io/api/core/v1"
// storagev1 "k8s.io/api/storage/v1" resourcev1alpha1 "k8s.io/api/resource/v1alpha1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
// "k8s.io/apimachinery/pkg/types"
apierrors "k8s.io/apimachinery/pkg/api/errors" apierrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/types" "k8s.io/apimachinery/pkg/types"
"k8s.io/client-go/informers" "k8s.io/client-go/informers"
"k8s.io/client-go/kubernetes/fake" "k8s.io/client-go/kubernetes/fake"
k8stesting "k8s.io/client-go/testing" k8stesting "k8s.io/client-go/testing"
"k8s.io/client-go/tools/cache" "k8s.io/client-go/tools/cache"
kcache "k8s.io/client-go/tools/cache"
"k8s.io/component-base/metrics/testutil" "k8s.io/component-base/metrics/testutil"
"k8s.io/klog/v2" "k8s.io/klog/v2"
"k8s.io/kubernetes/pkg/controller" "k8s.io/kubernetes/pkg/controller"
ephemeralvolumemetrics "k8s.io/kubernetes/pkg/controller/volume/ephemeral/metrics" ephemeralvolumemetrics "k8s.io/kubernetes/pkg/controller/resourceclaim/metrics"
"github.com/stretchr/testify/assert"
) )
var ( var (
testPodName = "test-pod" testPodName = "test-pod"
testNamespace = "my-namespace" testNamespace = "my-namespace"
testPodUID = types.UID("uidpod1") testPodUID = types.UID("uidpod1")
otherNamespace = "not-my-namespace" otherNamespace = "not-my-namespace"
ephemeralVolumeName = "ephemeral-volume" podResourceClaimName = "acme-resource"
templateName = "my-template"
className = "my-resource-class"
testPod = makePod(testPodName, testNamespace, testPodUID) testPod = makePod(testPodName, testNamespace, testPodUID)
testPodWithEphemeral = makePod(testPodName, testNamespace, testPodUID, *makeEphemeralVolume(ephemeralVolumeName)) testPodWithResource = makePod(testPodName, testNamespace, testPodUID, *makePodResourceClaim(podResourceClaimName, templateName))
testPodEphemeralClaim = makePVC(testPodName+"-"+ephemeralVolumeName, testNamespace, makeOwnerReference(testPodWithEphemeral, true)) otherTestPod = makePod(testPodName+"-II", testNamespace, testPodUID+"-II")
conflictingClaim = makePVC(testPodName+"-"+ephemeralVolumeName, testNamespace, nil) testClaim = makeClaim(testPodName+"-"+podResourceClaimName, testNamespace, className, makeOwnerReference(testPodWithResource, true))
otherNamespaceClaim = makePVC(testPodName+"-"+ephemeralVolumeName, otherNamespace, nil) testClaimReserved = func() *resourcev1alpha1.ResourceClaim {
claim := testClaim.DeepCopy()
claim.Status.ReservedFor = append(claim.Status.ReservedFor,
resourcev1alpha1.ResourceClaimConsumerReference{
Resource: "pods",
Name: testPodWithResource.Name,
UID: testPodWithResource.UID,
},
)
return claim
}()
testClaimReservedTwice = func() *resourcev1alpha1.ResourceClaim {
claim := testClaimReserved.DeepCopy()
claim.Status.ReservedFor = append(claim.Status.ReservedFor,
resourcev1alpha1.ResourceClaimConsumerReference{
Resource: "pods",
Name: otherTestPod.Name,
UID: otherTestPod.UID,
},
)
return claim
}()
conflictingClaim = makeClaim(testPodName+"-"+podResourceClaimName, testNamespace, className, nil)
otherNamespaceClaim = makeClaim(testPodName+"-"+podResourceClaimName, otherNamespace, className, nil)
template = makeTemplate(templateName, testNamespace, className)
) )
func init() { func init() {
@ -63,104 +87,170 @@ func init() {
func TestSyncHandler(t *testing.T) { func TestSyncHandler(t *testing.T) {
tests := []struct { tests := []struct {
name string name string
podKey string key string
pvcs []*v1.PersistentVolumeClaim claims []*resourcev1alpha1.ResourceClaim
pods []*v1.Pod pods []*v1.Pod
expectedPVCs []v1.PersistentVolumeClaim podsLater []*v1.Pod
templates []*resourcev1alpha1.ResourceClaimTemplate
expectedClaims []resourcev1alpha1.ResourceClaim
expectedError bool expectedError bool
expectedMetrics expectedMetrics expectedMetrics expectedMetrics
}{ }{
{ {
name: "create", name: "create",
pods: []*v1.Pod{testPodWithEphemeral}, pods: []*v1.Pod{testPodWithResource},
podKey: podKey(testPodWithEphemeral), templates: []*resourcev1alpha1.ResourceClaimTemplate{template},
expectedPVCs: []v1.PersistentVolumeClaim{*testPodEphemeralClaim}, key: podKey(testPodWithResource),
expectedClaims: []resourcev1alpha1.ResourceClaim{*testClaim},
expectedMetrics: expectedMetrics{1, 0}, expectedMetrics: expectedMetrics{1, 0},
}, },
{ {
name: "no-such-pod", name: "missing-template",
podKey: podKey(testPodWithEphemeral), pods: []*v1.Pod{testPodWithResource},
templates: nil,
key: podKey(testPodWithResource),
expectedError: true,
},
{
name: "nop",
pods: []*v1.Pod{testPodWithResource},
key: podKey(testPodWithResource),
claims: []*resourcev1alpha1.ResourceClaim{testClaim},
expectedClaims: []resourcev1alpha1.ResourceClaim{*testClaim},
expectedMetrics: expectedMetrics{0, 0},
},
{
name: "no-such-pod",
key: podKey(testPodWithResource),
}, },
{ {
name: "pod-deleted", name: "pod-deleted",
pods: func() []*v1.Pod { pods: func() []*v1.Pod {
deleted := metav1.Now() deleted := metav1.Now()
pods := []*v1.Pod{testPodWithEphemeral.DeepCopy()} pods := []*v1.Pod{testPodWithResource.DeepCopy()}
pods[0].DeletionTimestamp = &deleted pods[0].DeletionTimestamp = &deleted
return pods return pods
}(), }(),
podKey: podKey(testPodWithEphemeral), key: podKey(testPodWithResource),
}, },
{ {
name: "no-volumes", name: "no-volumes",
pods: []*v1.Pod{testPod}, pods: []*v1.Pod{testPod},
podKey: podKey(testPod), key: podKey(testPod),
}, },
{ {
name: "create-with-other-PVC", name: "create-with-other-claim",
pods: []*v1.Pod{testPodWithEphemeral}, pods: []*v1.Pod{testPodWithResource},
podKey: podKey(testPodWithEphemeral), templates: []*resourcev1alpha1.ResourceClaimTemplate{template},
pvcs: []*v1.PersistentVolumeClaim{otherNamespaceClaim}, key: podKey(testPodWithResource),
expectedPVCs: []v1.PersistentVolumeClaim{*otherNamespaceClaim, *testPodEphemeralClaim}, claims: []*resourcev1alpha1.ResourceClaim{otherNamespaceClaim},
expectedClaims: []resourcev1alpha1.ResourceClaim{*otherNamespaceClaim, *testClaim},
expectedMetrics: expectedMetrics{1, 0}, expectedMetrics: expectedMetrics{1, 0},
}, },
{ {
name: "wrong-PVC-owner", name: "wrong-claim-owner",
pods: []*v1.Pod{testPodWithEphemeral}, pods: []*v1.Pod{testPodWithResource},
podKey: podKey(testPodWithEphemeral), key: podKey(testPodWithResource),
pvcs: []*v1.PersistentVolumeClaim{conflictingClaim}, claims: []*resourcev1alpha1.ResourceClaim{conflictingClaim},
expectedPVCs: []v1.PersistentVolumeClaim{*conflictingClaim}, expectedClaims: []resourcev1alpha1.ResourceClaim{*conflictingClaim},
expectedError: true, expectedError: true,
}, },
{ {
name: "create-conflict", name: "create-conflict",
pods: []*v1.Pod{testPodWithEphemeral}, pods: []*v1.Pod{testPodWithResource},
podKey: podKey(testPodWithEphemeral), templates: []*resourcev1alpha1.ResourceClaimTemplate{template},
key: podKey(testPodWithResource),
expectedMetrics: expectedMetrics{1, 1}, expectedMetrics: expectedMetrics{1, 1},
expectedError: true, expectedError: true,
}, },
{
name: "stay-reserved-seen",
pods: []*v1.Pod{testPodWithResource},
key: claimKey(testClaimReserved),
claims: []*resourcev1alpha1.ResourceClaim{testClaimReserved},
expectedClaims: []resourcev1alpha1.ResourceClaim{*testClaimReserved},
expectedMetrics: expectedMetrics{0, 0},
},
{
name: "stay-reserved-not-seen",
podsLater: []*v1.Pod{testPodWithResource},
key: claimKey(testClaimReserved),
claims: []*resourcev1alpha1.ResourceClaim{testClaimReserved},
expectedClaims: []resourcev1alpha1.ResourceClaim{*testClaimReserved},
expectedMetrics: expectedMetrics{0, 0},
},
{
name: "clear-reserved",
pods: []*v1.Pod{},
key: claimKey(testClaimReserved),
claims: []*resourcev1alpha1.ResourceClaim{testClaimReserved},
expectedClaims: []resourcev1alpha1.ResourceClaim{*testClaim},
expectedMetrics: expectedMetrics{0, 0},
},
{
name: "remove-reserved",
pods: []*v1.Pod{testPod},
key: claimKey(testClaimReservedTwice),
claims: []*resourcev1alpha1.ResourceClaim{testClaimReservedTwice},
expectedClaims: []resourcev1alpha1.ResourceClaim{*testClaimReserved},
expectedMetrics: expectedMetrics{0, 0},
},
} }
for _, tc := range tests { for _, tc := range tests {
// Run sequentially because of global logging and global metrics. // Run sequentially because of global logging and global metrics.
t.Run(tc.name, func(t *testing.T) { t.Run(tc.name, func(t *testing.T) {
// There is no good way to shut down the informers. They spawn ctx, cancel := context.WithCancel(context.Background())
// various goroutines and some of them (in particular shared informer) defer cancel()
// become very unhappy ("close on closed channel") when using a context
// that gets cancelled. Therefore we just keep everything running.
ctx := context.Background()
var objects []runtime.Object var objects []runtime.Object
for _, pod := range tc.pods { for _, pod := range tc.pods {
objects = append(objects, pod) objects = append(objects, pod)
} }
for _, pvc := range tc.pvcs { for _, claim := range tc.claims {
objects = append(objects, pvc) objects = append(objects, claim)
}
for _, template := range tc.templates {
objects = append(objects, template)
} }
fakeKubeClient := createTestClient(objects...) fakeKubeClient := createTestClient(objects...)
if tc.expectedMetrics.numFailures > 0 { if tc.expectedMetrics.numFailures > 0 {
fakeKubeClient.PrependReactor("create", "persistentvolumeclaims", func(action k8stesting.Action) (handled bool, ret runtime.Object, err error) { fakeKubeClient.PrependReactor("create", "resourceclaims", func(action k8stesting.Action) (handled bool, ret runtime.Object, err error) {
return true, nil, apierrors.NewConflict(action.GetResource().GroupResource(), "fake name", errors.New("fake conflict")) return true, nil, apierrors.NewConflict(action.GetResource().GroupResource(), "fake name", errors.New("fake conflict"))
}) })
} }
setupMetrics() setupMetrics()
informerFactory := informers.NewSharedInformerFactory(fakeKubeClient, controller.NoResyncPeriodFunc()) informerFactory := informers.NewSharedInformerFactory(fakeKubeClient, controller.NoResyncPeriodFunc())
podInformer := informerFactory.Core().V1().Pods() podInformer := informerFactory.Core().V1().Pods()
pvcInformer := informerFactory.Core().V1().PersistentVolumeClaims() claimInformer := informerFactory.Resource().V1alpha1().ResourceClaims()
templateInformer := informerFactory.Resource().V1alpha1().ResourceClaimTemplates()
c, err := NewController(fakeKubeClient, podInformer, pvcInformer) ec, err := NewController(fakeKubeClient, podInformer, claimInformer, templateInformer)
if err != nil { if err != nil {
t.Fatalf("error creating ephemeral controller : %v", err) t.Fatalf("error creating ephemeral controller : %v", err)
} }
ec, _ := c.(*ephemeralController)
// Ensure informers are up-to-date. // Ensure informers are up-to-date.
go informerFactory.Start(ctx.Done()) go informerFactory.Start(ctx.Done())
stopInformers := func() {
cancel()
informerFactory.Shutdown()
}
defer stopInformers()
informerFactory.WaitForCacheSync(ctx.Done()) informerFactory.WaitForCacheSync(ctx.Done())
cache.WaitForCacheSync(ctx.Done(), podInformer.Informer().HasSynced, pvcInformer.Informer().HasSynced) cache.WaitForCacheSync(ctx.Done(), podInformer.Informer().HasSynced, claimInformer.Informer().HasSynced, templateInformer.Informer().HasSynced)
err = ec.syncHandler(context.TODO(), tc.podKey) // Simulate race: stop informers, add more pods that the controller doesn't know about.
stopInformers()
for _, pod := range tc.podsLater {
_, err := fakeKubeClient.CoreV1().Pods(pod.Namespace).Create(ctx, pod, metav1.CreateOptions{})
if err != nil {
t.Fatalf("unexpected error while creating pod: %v", err)
}
}
err = ec.syncHandler(context.TODO(), tc.key)
if err != nil && !tc.expectedError { if err != nil && !tc.expectedError {
t.Fatalf("unexpected error while running handler: %v", err) t.Fatalf("unexpected error while running handler: %v", err)
} }
@ -168,53 +258,68 @@ func TestSyncHandler(t *testing.T) {
t.Fatalf("unexpected success") t.Fatalf("unexpected success")
} }
pvcs, err := fakeKubeClient.CoreV1().PersistentVolumeClaims("").List(ctx, metav1.ListOptions{}) claims, err := fakeKubeClient.ResourceV1alpha1().ResourceClaims("").List(ctx, metav1.ListOptions{})
if err != nil { if err != nil {
t.Fatalf("unexpected error while listing PVCs: %v", err) t.Fatalf("unexpected error while listing claims: %v", err)
} }
assert.Equal(t, sortPVCs(tc.expectedPVCs), sortPVCs(pvcs.Items)) assert.Equal(t, normalizeClaims(tc.expectedClaims), normalizeClaims(claims.Items))
expectMetrics(t, tc.expectedMetrics) expectMetrics(t, tc.expectedMetrics)
}) })
} }
} }
func makePVC(name, namespace string, owner *metav1.OwnerReference) *v1.PersistentVolumeClaim { func makeClaim(name, namespace, classname string, owner *metav1.OwnerReference) *resourcev1alpha1.ResourceClaim {
pvc := &v1.PersistentVolumeClaim{ claim := &resourcev1alpha1.ResourceClaim{
ObjectMeta: metav1.ObjectMeta{Name: name, Namespace: namespace}, ObjectMeta: metav1.ObjectMeta{Name: name, Namespace: namespace},
Spec: v1.PersistentVolumeClaimSpec{}, Spec: resourcev1alpha1.ResourceClaimSpec{
ResourceClassName: classname,
},
} }
if owner != nil { if owner != nil {
pvc.OwnerReferences = []metav1.OwnerReference{*owner} claim.OwnerReferences = []metav1.OwnerReference{*owner}
} }
return pvc return claim
} }
func makeEphemeralVolume(name string) *v1.Volume { func makePodResourceClaim(name, templateName string) *v1.PodResourceClaim {
return &v1.Volume{ return &v1.PodResourceClaim{
Name: name, Name: name,
VolumeSource: v1.VolumeSource{ Source: v1.ClaimSource{
Ephemeral: &v1.EphemeralVolumeSource{ ResourceClaimTemplateName: &templateName,
VolumeClaimTemplate: &v1.PersistentVolumeClaimTemplate{}, },
}
}
func makePod(name, namespace string, uid types.UID, podClaims ...v1.PodResourceClaim) *v1.Pod {
pod := &v1.Pod{
ObjectMeta: metav1.ObjectMeta{Name: name, Namespace: namespace, UID: uid},
Spec: v1.PodSpec{
ResourceClaims: podClaims,
},
}
return pod
}
func makeTemplate(name, namespace, classname string) *resourcev1alpha1.ResourceClaimTemplate {
template := &resourcev1alpha1.ResourceClaimTemplate{
ObjectMeta: metav1.ObjectMeta{Name: name, Namespace: namespace},
Spec: resourcev1alpha1.ResourceClaimTemplateSpec{
Spec: resourcev1alpha1.ResourceClaimSpec{
ResourceClassName: classname,
}, },
}, },
} }
} return template
func makePod(name, namespace string, uid types.UID, volumes ...v1.Volume) *v1.Pod {
pvc := &v1.Pod{
ObjectMeta: metav1.ObjectMeta{Name: name, Namespace: namespace, UID: uid},
Spec: v1.PodSpec{
Volumes: volumes,
},
}
return pvc
} }
func podKey(pod *v1.Pod) string { func podKey(pod *v1.Pod) string {
key, _ := kcache.DeletionHandlingMetaNamespaceKeyFunc(testPodWithEphemeral) return podKeyPrefix + pod.Namespace + "/" + pod.Name
return key }
func claimKey(claim *resourcev1alpha1.ResourceClaim) string {
return claimKeyPrefix + claim.Namespace + "/" + claim.Name
} }
func makeOwnerReference(pod *v1.Pod, isController bool) *metav1.OwnerReference { func makeOwnerReference(pod *v1.Pod, isController bool) *metav1.OwnerReference {
@ -229,12 +334,17 @@ func makeOwnerReference(pod *v1.Pod, isController bool) *metav1.OwnerReference {
} }
} }
func sortPVCs(pvcs []v1.PersistentVolumeClaim) []v1.PersistentVolumeClaim { func normalizeClaims(claims []resourcev1alpha1.ResourceClaim) []resourcev1alpha1.ResourceClaim {
sort.Slice(pvcs, func(i, j int) bool { sort.Slice(claims, func(i, j int) bool {
return pvcs[i].Namespace < pvcs[j].Namespace || return claims[i].Namespace < claims[j].Namespace ||
pvcs[i].Name < pvcs[j].Name claims[i].Name < claims[j].Name
}) })
return pvcs for i := range claims {
if len(claims[i].Status.ReservedFor) == 0 {
claims[i].Status.ReservedFor = nil
}
}
return claims
} }
func createTestClient(objects ...runtime.Object) *fake.Clientset { func createTestClient(objects ...runtime.Object) *fake.Clientset {
@ -252,15 +362,15 @@ type expectedMetrics struct {
func expectMetrics(t *testing.T, em expectedMetrics) { func expectMetrics(t *testing.T, em expectedMetrics) {
t.Helper() t.Helper()
actualCreated, err := testutil.GetCounterMetricValue(ephemeralvolumemetrics.EphemeralVolumeCreateAttempts) actualCreated, err := testutil.GetCounterMetricValue(ephemeralvolumemetrics.ResourceClaimCreateAttempts)
handleErr(t, err, "ephemeralVolumeCreate") handleErr(t, err, "ResourceClaimCreate")
if actualCreated != float64(em.numCreated) { if actualCreated != float64(em.numCreated) {
t.Errorf("Expected PVCs to be created %d, got %v", em.numCreated, actualCreated) t.Errorf("Expected claims to be created %d, got %v", em.numCreated, actualCreated)
} }
actualConflicts, err := testutil.GetCounterMetricValue(ephemeralvolumemetrics.EphemeralVolumeCreateFailures) actualConflicts, err := testutil.GetCounterMetricValue(ephemeralvolumemetrics.ResourceClaimCreateFailures)
handleErr(t, err, "ephemeralVolumeCreate/Conflict") handleErr(t, err, "ResourceClaimCreate/Conflict")
if actualConflicts != float64(em.numFailures) { if actualConflicts != float64(em.numFailures) {
t.Errorf("Expected PVCs to have conflicts %d, got %v", em.numFailures, actualConflicts) t.Errorf("Expected claims to have conflicts %d, got %v", em.numFailures, actualConflicts)
} }
} }
@ -272,6 +382,6 @@ func handleErr(t *testing.T, err error, metricName string) {
func setupMetrics() { func setupMetrics() {
ephemeralvolumemetrics.RegisterMetrics() ephemeralvolumemetrics.RegisterMetrics()
ephemeralvolumemetrics.EphemeralVolumeCreateAttempts.Reset() ephemeralvolumemetrics.ResourceClaimCreateAttempts.Reset()
ephemeralvolumemetrics.EphemeralVolumeCreateFailures.Reset() ephemeralvolumemetrics.ResourceClaimCreateFailures.Reset()
} }

View File

@ -14,8 +14,8 @@ See the License for the specific language governing permissions and
limitations under the License. limitations under the License.
*/ */
// Package ephemeral implements the controller part of // Package resourceclaim implements the controller part of
// https://github.com/kubernetes/enhancements/tree/master/keps/sig-storage/1698-generic-ephemeral-volumes // https://github.com/kubernetes/enhancements/tree/master/keps/sig-node/3063-dynamic-resource-allocation
// //
// It was derived from the expand controller. // It was derived from the generic ephemeral volume controller.
package ephemeral package resourceclaim

View File

@ -0,0 +1,6 @@
# See the OWNERS docs at https://go.k8s.io/owners
reviewers:
- sig-instrumentation-reviewers
labels:
- sig/instrumentation

View File

@ -23,36 +23,36 @@ import (
"k8s.io/component-base/metrics/legacyregistry" "k8s.io/component-base/metrics/legacyregistry"
) )
// EphemeralVolumeSubsystem - subsystem name used for Endpoint Slices. // ResourceClaimSubsystem - subsystem name used for ResourceClaim creation
const EphemeralVolumeSubsystem = "ephemeral_volume_controller" const ResourceClaimSubsystem = "resourceclaim_controller"
var ( var (
// EphemeralVolumeCreateAttempts tracks the number of // ResourceClaimCreateAttempts tracks the number of
// PersistentVolumeClaims().Create calls (both successful and unsuccessful) // ResourceClaims().Create calls (both successful and unsuccessful)
EphemeralVolumeCreateAttempts = metrics.NewCounter( ResourceClaimCreateAttempts = metrics.NewCounter(
&metrics.CounterOpts{ &metrics.CounterOpts{
Subsystem: EphemeralVolumeSubsystem, Subsystem: ResourceClaimSubsystem,
Name: "create_total", Name: "create_attempts_total",
Help: "Number of PersistenVolumeClaims creation requests", Help: "Number of ResourceClaims creation requests",
StabilityLevel: metrics.ALPHA, StabilityLevel: metrics.ALPHA,
}) })
// EphemeralVolumeCreateFailures tracks the number of unsuccessful // ResourceClaimCreateFailures tracks the number of unsuccessful
// PersistentVolumeClaims().Create calls // ResourceClaims().Create calls
EphemeralVolumeCreateFailures = metrics.NewCounter( ResourceClaimCreateFailures = metrics.NewCounter(
&metrics.CounterOpts{ &metrics.CounterOpts{
Subsystem: EphemeralVolumeSubsystem, Subsystem: ResourceClaimSubsystem,
Name: "create_failures_total", Name: "create_failures_total",
Help: "Number of PersistenVolumeClaims creation requests", Help: "Number of ResourceClaims creation request failures",
StabilityLevel: metrics.ALPHA, StabilityLevel: metrics.ALPHA,
}) })
) )
var registerMetrics sync.Once var registerMetrics sync.Once
// RegisterMetrics registers EphemeralVolume metrics. // RegisterMetrics registers ResourceClaim metrics.
func RegisterMetrics() { func RegisterMetrics() {
registerMetrics.Do(func() { registerMetrics.Do(func() {
legacyregistry.MustRegister(EphemeralVolumeCreateAttempts) legacyregistry.MustRegister(ResourceClaimCreateAttempts)
legacyregistry.MustRegister(EphemeralVolumeCreateFailures) legacyregistry.MustRegister(ResourceClaimCreateFailures)
}) })
} }

View File

@ -0,0 +1,53 @@
/*
Copyright 2022 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package resourceclaim
import (
"sync"
"github.com/golang/groupcache/lru"
"k8s.io/apimachinery/pkg/types"
)
// uidCache is an LRU cache for uid.
type uidCache struct {
mutex sync.Mutex
cache *lru.Cache
}
// newUIDCache returns a uidCache.
func newUIDCache(maxCacheEntries int) *uidCache {
return &uidCache{
cache: lru.New(maxCacheEntries),
}
}
// Add adds a uid to the cache.
func (c *uidCache) Add(uid types.UID) {
c.mutex.Lock()
defer c.mutex.Unlock()
c.cache.Add(uid, nil)
}
// Has returns if a uid is in the cache.
func (c *uidCache) Has(uid types.UID) bool {
c.mutex.Lock()
defer c.mutex.Unlock()
_, found := c.cache.Get(uid)
return found
}

View File

@ -205,6 +205,19 @@ func buildControllerRoles() ([]rbacv1.ClusterRole, []rbacv1.ClusterRoleBinding)
}, },
}) })
if utilfeature.DefaultFeatureGate.Enabled(features.DynamicResourceAllocation) {
addControllerRole(&controllerRoles, &controllerRoleBindings, rbacv1.ClusterRole{
ObjectMeta: metav1.ObjectMeta{Name: saRolePrefix + "resource-claim-controller"},
Rules: []rbacv1.PolicyRule{
rbacv1helpers.NewRule("get", "list", "watch").Groups(legacyGroup).Resources("pods").RuleOrDie(),
rbacv1helpers.NewRule("update").Groups(legacyGroup).Resources("pods/finalizers").RuleOrDie(),
rbacv1helpers.NewRule("get", "list", "watch", "create").Groups(resourceGroup).Resources("resourceclaims").RuleOrDie(),
rbacv1helpers.NewRule("update", "patch").Groups(resourceGroup).Resources("resourceclaims/status").RuleOrDie(),
eventsRule(),
},
})
}
addControllerRole(&controllerRoles, &controllerRoleBindings, rbacv1.ClusterRole{ addControllerRole(&controllerRoles, &controllerRoleBindings, rbacv1.ClusterRole{
ObjectMeta: metav1.ObjectMeta{Name: saRolePrefix + "generic-garbage-collector"}, ObjectMeta: metav1.ObjectMeta{Name: saRolePrefix + "generic-garbage-collector"},
Rules: []rbacv1.PolicyRule{ Rules: []rbacv1.PolicyRule{

View File

@ -54,6 +54,7 @@ const (
extensionsGroup = "extensions" extensionsGroup = "extensions"
policyGroup = "policy" policyGroup = "policy"
rbacGroup = "rbac.authorization.k8s.io" rbacGroup = "rbac.authorization.k8s.io"
resourceGroup = "resource.k8s.io"
storageGroup = "storage.k8s.io" storageGroup = "storage.k8s.io"
resMetricsGroup = "metrics.k8s.io" resMetricsGroup = "metrics.k8s.io"
customMetricsGroup = "custom.metrics.k8s.io" customMetricsGroup = "custom.metrics.k8s.io"

3
vendor/modules.txt vendored
View File

@ -2053,6 +2053,9 @@ k8s.io/cri-api/pkg/errors
## explicit; go 1.19 ## explicit; go 1.19
k8s.io/csi-translation-lib k8s.io/csi-translation-lib
k8s.io/csi-translation-lib/plugins k8s.io/csi-translation-lib/plugins
# k8s.io/dynamic-resource-allocation v0.0.0 => ./staging/src/k8s.io/dynamic-resource-allocation
## explicit; go 1.19
k8s.io/dynamic-resource-allocation/resourceclaim
# k8s.io/gengo v0.0.0-20220902162205-c0856e24416d # k8s.io/gengo v0.0.0-20220902162205-c0856e24416d
## explicit; go 1.13 ## explicit; go 1.13
k8s.io/gengo/args k8s.io/gengo/args