From b87530af4fd1dac526c9dbde1a4f17f0e212f4a6 Mon Sep 17 00:00:00 2001 From: Patrick Ohly Date: Tue, 22 Mar 2022 09:59:20 +0100 Subject: [PATCH] kube-controller-manager: clone resource controller from volume/ephemeral --- pkg/controller/resourceclaim/OWNERS | 6 + pkg/controller/resourceclaim/config/doc.go | 19 ++ pkg/controller/resourceclaim/config/types.go | 25 ++ .../config/v1alpha1/conversion.go | 40 +++ .../resourceclaim/config/v1alpha1/defaults.go | 36 +++ .../resourceclaim/config/v1alpha1/doc.go | 21 ++ .../resourceclaim/config/v1alpha1/register.go | 31 ++ .../v1alpha1/zz_generated.conversion.go | 92 ++++++ .../config/v1alpha1/zz_generated.deepcopy.go | 22 ++ .../config/zz_generated.deepcopy.go | 38 +++ pkg/controller/resourceclaim/controller.go | 289 ++++++++++++++++++ .../resourceclaim/controller_test.go | 277 +++++++++++++++++ pkg/controller/resourceclaim/doc.go | 21 ++ .../resourceclaim/metrics/metrics.go | 58 ++++ 14 files changed, 975 insertions(+) create mode 100644 pkg/controller/resourceclaim/OWNERS create mode 100644 pkg/controller/resourceclaim/config/doc.go create mode 100644 pkg/controller/resourceclaim/config/types.go create mode 100644 pkg/controller/resourceclaim/config/v1alpha1/conversion.go create mode 100644 pkg/controller/resourceclaim/config/v1alpha1/defaults.go create mode 100644 pkg/controller/resourceclaim/config/v1alpha1/doc.go create mode 100644 pkg/controller/resourceclaim/config/v1alpha1/register.go create mode 100644 pkg/controller/resourceclaim/config/v1alpha1/zz_generated.conversion.go create mode 100644 pkg/controller/resourceclaim/config/v1alpha1/zz_generated.deepcopy.go create mode 100644 pkg/controller/resourceclaim/config/zz_generated.deepcopy.go create mode 100644 pkg/controller/resourceclaim/controller.go create mode 100644 pkg/controller/resourceclaim/controller_test.go create mode 100644 pkg/controller/resourceclaim/doc.go create mode 100644 pkg/controller/resourceclaim/metrics/metrics.go diff --git a/pkg/controller/resourceclaim/OWNERS b/pkg/controller/resourceclaim/OWNERS new file mode 100644 index 00000000000..1d5ec5e5051 --- /dev/null +++ b/pkg/controller/resourceclaim/OWNERS @@ -0,0 +1,6 @@ +# See the OWNERS docs at https://go.k8s.io/owners + +approvers: + - saad-ali + - jsafrane + - pohly diff --git a/pkg/controller/resourceclaim/config/doc.go b/pkg/controller/resourceclaim/config/doc.go new file mode 100644 index 00000000000..01d52859902 --- /dev/null +++ b/pkg/controller/resourceclaim/config/doc.go @@ -0,0 +1,19 @@ +/* +Copyright 2021 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +// +k8s:deepcopy-gen=package + +package config // import "k8s.io/kubernetes/pkg/controller/volume/ephemeral/config" diff --git a/pkg/controller/resourceclaim/config/types.go b/pkg/controller/resourceclaim/config/types.go new file mode 100644 index 00000000000..64af06a1b2b --- /dev/null +++ b/pkg/controller/resourceclaim/config/types.go @@ -0,0 +1,25 @@ +/* +Copyright 2021 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package config + +// EphemeralVolumeControllerConfiguration contains elements describing EphemeralVolumeController. +type EphemeralVolumeControllerConfiguration struct { + // ConcurrentEphemeralVolumeSyncs is the number of ephemeral volume syncing operations + // that will be done concurrently. Larger number = faster ephemeral volume updating, + // but more CPU (and network) load. + ConcurrentEphemeralVolumeSyncs int32 +} diff --git a/pkg/controller/resourceclaim/config/v1alpha1/conversion.go b/pkg/controller/resourceclaim/config/v1alpha1/conversion.go new file mode 100644 index 00000000000..d3a5fb58e96 --- /dev/null +++ b/pkg/controller/resourceclaim/config/v1alpha1/conversion.go @@ -0,0 +1,40 @@ +/* +Copyright 2021 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package v1alpha1 + +import ( + "k8s.io/apimachinery/pkg/conversion" + "k8s.io/kube-controller-manager/config/v1alpha1" + "k8s.io/kubernetes/pkg/controller/volume/ephemeral/config" +) + +// Important! The public back-and-forth conversion functions for the types in this package +// with EphemeralVolumeControllerConfiguration types need to be manually exposed like this in order for +// other packages that reference this package to be able to call these conversion functions +// in an autogenerated manner. +// TODO: Fix the bug in conversion-gen so it automatically discovers these Convert_* functions +// in autogenerated code as well. + +// Convert_v1alpha1_EphemeralVolumeControllerConfiguration_To_config_EphemeralVolumeControllerConfiguration is an autogenerated conversion function. +func Convert_v1alpha1_EphemeralVolumeControllerConfiguration_To_config_EphemeralVolumeControllerConfiguration(in *v1alpha1.EphemeralVolumeControllerConfiguration, out *config.EphemeralVolumeControllerConfiguration, s conversion.Scope) error { + return autoConvert_v1alpha1_EphemeralVolumeControllerConfiguration_To_config_EphemeralVolumeControllerConfiguration(in, out, s) +} + +// Convert_config_EphemeralVolumeControllerConfiguration_To_v1alpha1_EphemeralVolumeControllerConfiguration is an autogenerated conversion function. +func Convert_config_EphemeralVolumeControllerConfiguration_To_v1alpha1_EphemeralVolumeControllerConfiguration(in *config.EphemeralVolumeControllerConfiguration, out *v1alpha1.EphemeralVolumeControllerConfiguration, s conversion.Scope) error { + return autoConvert_config_EphemeralVolumeControllerConfiguration_To_v1alpha1_EphemeralVolumeControllerConfiguration(in, out, s) +} diff --git a/pkg/controller/resourceclaim/config/v1alpha1/defaults.go b/pkg/controller/resourceclaim/config/v1alpha1/defaults.go new file mode 100644 index 00000000000..1907699d6cf --- /dev/null +++ b/pkg/controller/resourceclaim/config/v1alpha1/defaults.go @@ -0,0 +1,36 @@ +/* +Copyright 2021 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package v1alpha1 + +import ( + kubectrlmgrconfigv1alpha1 "k8s.io/kube-controller-manager/config/v1alpha1" +) + +// RecommendedDefaultEphemeralVolumeControllerConfiguration defaults a pointer to a +// EphemeralVolumeControllerConfiguration struct. This will set the recommended default +// values, but they may be subject to change between API versions. This function +// is intentionally not registered in the scheme as a "normal" `SetDefaults_Foo` +// function to allow consumers of this type to set whatever defaults for their +// embedded configs. Forcing consumers to use these defaults would be problematic +// as defaulting in the scheme is done as part of the conversion, and there would +// be no easy way to opt-out. Instead, if you want to use this defaulting method +// run it in your wrapper struct of this type in its `SetDefaults_` method. +func RecommendedDefaultEphemeralVolumeControllerConfiguration(obj *kubectrlmgrconfigv1alpha1.EphemeralVolumeControllerConfiguration) { + if obj.ConcurrentEphemeralVolumeSyncs == 0 { + obj.ConcurrentEphemeralVolumeSyncs = 5 + } +} diff --git a/pkg/controller/resourceclaim/config/v1alpha1/doc.go b/pkg/controller/resourceclaim/config/v1alpha1/doc.go new file mode 100644 index 00000000000..0ed1dcc5526 --- /dev/null +++ b/pkg/controller/resourceclaim/config/v1alpha1/doc.go @@ -0,0 +1,21 @@ +/* +Copyright 2021 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +// +k8s:deepcopy-gen=package +// +k8s:conversion-gen=k8s.io/kubernetes/pkg/controller/volume/ephemeral/config +// +k8s:conversion-gen-external-types=k8s.io/kube-controller-manager/config/v1alpha1 + +package v1alpha1 // import "k8s.io/kubernetes/pkg/controller/volume/ephemeral/config/v1alpha1" diff --git a/pkg/controller/resourceclaim/config/v1alpha1/register.go b/pkg/controller/resourceclaim/config/v1alpha1/register.go new file mode 100644 index 00000000000..63cff4d8810 --- /dev/null +++ b/pkg/controller/resourceclaim/config/v1alpha1/register.go @@ -0,0 +1,31 @@ +/* +Copyright 2021 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package v1alpha1 + +import ( + "k8s.io/apimachinery/pkg/runtime" +) + +var ( + // SchemeBuilder is the scheme builder with scheme init functions to run for this API package + SchemeBuilder runtime.SchemeBuilder + // localSchemeBuilder extends the SchemeBuilder instance with the external types. In this package, + // defaulting and conversion init funcs are registered as well. + localSchemeBuilder = &SchemeBuilder + // AddToScheme is a global function that registers this API group & version to a scheme + AddToScheme = localSchemeBuilder.AddToScheme +) diff --git a/pkg/controller/resourceclaim/config/v1alpha1/zz_generated.conversion.go b/pkg/controller/resourceclaim/config/v1alpha1/zz_generated.conversion.go new file mode 100644 index 00000000000..34697f48409 --- /dev/null +++ b/pkg/controller/resourceclaim/config/v1alpha1/zz_generated.conversion.go @@ -0,0 +1,92 @@ +//go:build !ignore_autogenerated +// +build !ignore_autogenerated + +/* +Copyright The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +// Code generated by conversion-gen. DO NOT EDIT. + +package v1alpha1 + +import ( + v1 "k8s.io/apimachinery/pkg/apis/meta/v1" + conversion "k8s.io/apimachinery/pkg/conversion" + runtime "k8s.io/apimachinery/pkg/runtime" + v1alpha1 "k8s.io/kube-controller-manager/config/v1alpha1" + config "k8s.io/kubernetes/pkg/controller/volume/ephemeral/config" +) + +func init() { + localSchemeBuilder.Register(RegisterConversions) +} + +// RegisterConversions adds conversion functions to the given scheme. +// Public to allow building arbitrary schemes. +func RegisterConversions(s *runtime.Scheme) error { + if err := s.AddGeneratedConversionFunc((*v1alpha1.GroupResource)(nil), (*v1.GroupResource)(nil), func(a, b interface{}, scope conversion.Scope) error { + return Convert_v1alpha1_GroupResource_To_v1_GroupResource(a.(*v1alpha1.GroupResource), b.(*v1.GroupResource), scope) + }); err != nil { + return err + } + if err := s.AddGeneratedConversionFunc((*v1.GroupResource)(nil), (*v1alpha1.GroupResource)(nil), func(a, b interface{}, scope conversion.Scope) error { + return Convert_v1_GroupResource_To_v1alpha1_GroupResource(a.(*v1.GroupResource), b.(*v1alpha1.GroupResource), scope) + }); err != nil { + return err + } + if err := s.AddConversionFunc((*config.EphemeralVolumeControllerConfiguration)(nil), (*v1alpha1.EphemeralVolumeControllerConfiguration)(nil), func(a, b interface{}, scope conversion.Scope) error { + return Convert_config_EphemeralVolumeControllerConfiguration_To_v1alpha1_EphemeralVolumeControllerConfiguration(a.(*config.EphemeralVolumeControllerConfiguration), b.(*v1alpha1.EphemeralVolumeControllerConfiguration), scope) + }); err != nil { + return err + } + if err := s.AddConversionFunc((*v1alpha1.EphemeralVolumeControllerConfiguration)(nil), (*config.EphemeralVolumeControllerConfiguration)(nil), func(a, b interface{}, scope conversion.Scope) error { + return Convert_v1alpha1_EphemeralVolumeControllerConfiguration_To_config_EphemeralVolumeControllerConfiguration(a.(*v1alpha1.EphemeralVolumeControllerConfiguration), b.(*config.EphemeralVolumeControllerConfiguration), scope) + }); err != nil { + return err + } + return nil +} + +func autoConvert_v1alpha1_EphemeralVolumeControllerConfiguration_To_config_EphemeralVolumeControllerConfiguration(in *v1alpha1.EphemeralVolumeControllerConfiguration, out *config.EphemeralVolumeControllerConfiguration, s conversion.Scope) error { + out.ConcurrentEphemeralVolumeSyncs = in.ConcurrentEphemeralVolumeSyncs + return nil +} + +func autoConvert_config_EphemeralVolumeControllerConfiguration_To_v1alpha1_EphemeralVolumeControllerConfiguration(in *config.EphemeralVolumeControllerConfiguration, out *v1alpha1.EphemeralVolumeControllerConfiguration, s conversion.Scope) error { + out.ConcurrentEphemeralVolumeSyncs = in.ConcurrentEphemeralVolumeSyncs + return nil +} + +func autoConvert_v1alpha1_GroupResource_To_v1_GroupResource(in *v1alpha1.GroupResource, out *v1.GroupResource, s conversion.Scope) error { + out.Group = in.Group + out.Resource = in.Resource + return nil +} + +// Convert_v1alpha1_GroupResource_To_v1_GroupResource is an autogenerated conversion function. +func Convert_v1alpha1_GroupResource_To_v1_GroupResource(in *v1alpha1.GroupResource, out *v1.GroupResource, s conversion.Scope) error { + return autoConvert_v1alpha1_GroupResource_To_v1_GroupResource(in, out, s) +} + +func autoConvert_v1_GroupResource_To_v1alpha1_GroupResource(in *v1.GroupResource, out *v1alpha1.GroupResource, s conversion.Scope) error { + out.Group = in.Group + out.Resource = in.Resource + return nil +} + +// Convert_v1_GroupResource_To_v1alpha1_GroupResource is an autogenerated conversion function. +func Convert_v1_GroupResource_To_v1alpha1_GroupResource(in *v1.GroupResource, out *v1alpha1.GroupResource, s conversion.Scope) error { + return autoConvert_v1_GroupResource_To_v1alpha1_GroupResource(in, out, s) +} diff --git a/pkg/controller/resourceclaim/config/v1alpha1/zz_generated.deepcopy.go b/pkg/controller/resourceclaim/config/v1alpha1/zz_generated.deepcopy.go new file mode 100644 index 00000000000..61f6555edfc --- /dev/null +++ b/pkg/controller/resourceclaim/config/v1alpha1/zz_generated.deepcopy.go @@ -0,0 +1,22 @@ +//go:build !ignore_autogenerated +// +build !ignore_autogenerated + +/* +Copyright The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +// Code generated by deepcopy-gen. DO NOT EDIT. + +package v1alpha1 diff --git a/pkg/controller/resourceclaim/config/zz_generated.deepcopy.go b/pkg/controller/resourceclaim/config/zz_generated.deepcopy.go new file mode 100644 index 00000000000..467824bb7ec --- /dev/null +++ b/pkg/controller/resourceclaim/config/zz_generated.deepcopy.go @@ -0,0 +1,38 @@ +//go:build !ignore_autogenerated +// +build !ignore_autogenerated + +/* +Copyright The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +// Code generated by deepcopy-gen. DO NOT EDIT. + +package config + +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. +func (in *EphemeralVolumeControllerConfiguration) DeepCopyInto(out *EphemeralVolumeControllerConfiguration) { + *out = *in + return +} + +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new EphemeralVolumeControllerConfiguration. +func (in *EphemeralVolumeControllerConfiguration) DeepCopy() *EphemeralVolumeControllerConfiguration { + if in == nil { + return nil + } + out := new(EphemeralVolumeControllerConfiguration) + in.DeepCopyInto(out) + return out +} diff --git a/pkg/controller/resourceclaim/controller.go b/pkg/controller/resourceclaim/controller.go new file mode 100644 index 00000000000..f40527cba76 --- /dev/null +++ b/pkg/controller/resourceclaim/controller.go @@ -0,0 +1,289 @@ +/* +Copyright 2020 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package ephemeral + +import ( + "context" + "fmt" + "time" + + "k8s.io/klog/v2" + + v1 "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/api/errors" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/util/runtime" + "k8s.io/apimachinery/pkg/util/wait" + coreinformers "k8s.io/client-go/informers/core/v1" + clientset "k8s.io/client-go/kubernetes" + "k8s.io/client-go/kubernetes/scheme" + v1core "k8s.io/client-go/kubernetes/typed/core/v1" + corelisters "k8s.io/client-go/listers/core/v1" + "k8s.io/client-go/tools/cache" + kcache "k8s.io/client-go/tools/cache" + "k8s.io/client-go/tools/record" + "k8s.io/client-go/util/workqueue" + "k8s.io/component-helpers/storage/ephemeral" + "k8s.io/kubernetes/pkg/controller/volume/common" + ephemeralvolumemetrics "k8s.io/kubernetes/pkg/controller/volume/ephemeral/metrics" + "k8s.io/kubernetes/pkg/controller/volume/events" +) + +// Controller creates PVCs for ephemeral inline volumes in a pod spec. +type Controller interface { + Run(ctx context.Context, workers int) +} + +type ephemeralController struct { + // kubeClient is the kube API client used by volumehost to communicate with + // the API server. + kubeClient clientset.Interface + + // pvcLister is the shared PVC lister used to fetch and store PVC + // objects from the API server. It is shared with other controllers and + // therefore the PVC objects in its store should be treated as immutable. + pvcLister corelisters.PersistentVolumeClaimLister + pvcsSynced kcache.InformerSynced + + // podLister is the shared Pod lister used to fetch Pod + // objects from the API server. It is shared with other controllers and + // therefore the Pod objects in its store should be treated as immutable. + podLister corelisters.PodLister + podSynced kcache.InformerSynced + + // podIndexer has the common PodPVC indexer indexer installed To + // limit iteration over pods to those of interest. + podIndexer cache.Indexer + + // recorder is used to record events in the API server + recorder record.EventRecorder + + queue workqueue.RateLimitingInterface +} + +// NewController creates an ephemeral volume controller. +func NewController( + kubeClient clientset.Interface, + podInformer coreinformers.PodInformer, + pvcInformer coreinformers.PersistentVolumeClaimInformer) (Controller, error) { + + ec := &ephemeralController{ + kubeClient: kubeClient, + podLister: podInformer.Lister(), + podIndexer: podInformer.Informer().GetIndexer(), + podSynced: podInformer.Informer().HasSynced, + pvcLister: pvcInformer.Lister(), + pvcsSynced: pvcInformer.Informer().HasSynced, + queue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "ephemeral_volume"), + } + + ephemeralvolumemetrics.RegisterMetrics() + + eventBroadcaster := record.NewBroadcaster() + eventBroadcaster.StartLogging(klog.Infof) + eventBroadcaster.StartRecordingToSink(&v1core.EventSinkImpl{Interface: kubeClient.CoreV1().Events("")}) + ec.recorder = eventBroadcaster.NewRecorder(scheme.Scheme, v1.EventSource{Component: "ephemeral_volume"}) + + podInformer.Informer().AddEventHandler(kcache.ResourceEventHandlerFuncs{ + AddFunc: ec.enqueuePod, + // The pod spec is immutable. Therefore the controller can ignore pod updates + // because there cannot be any changes that have to be copied into the generated + // PVC. + // Deletion of the PVC is handled through the owner reference and garbage collection. + // Therefore pod deletions also can be ignored. + }) + pvcInformer.Informer().AddEventHandler(kcache.ResourceEventHandlerFuncs{ + DeleteFunc: ec.onPVCDelete, + }) + if err := common.AddPodPVCIndexerIfNotPresent(ec.podIndexer); err != nil { + return nil, fmt.Errorf("could not initialize pvc protection controller: %w", err) + } + + return ec, nil +} + +func (ec *ephemeralController) enqueuePod(obj interface{}) { + pod, ok := obj.(*v1.Pod) + if !ok { + return + } + + // Ignore pods which are already getting deleted. + if pod.DeletionTimestamp != nil { + return + } + + for _, vol := range pod.Spec.Volumes { + if vol.Ephemeral != nil { + // It has at least one ephemeral inline volume, work on it. + key, err := kcache.DeletionHandlingMetaNamespaceKeyFunc(pod) + if err != nil { + runtime.HandleError(fmt.Errorf("couldn't get key for object %#v: %v", pod, err)) + return + } + ec.queue.Add(key) + break + } + } +} + +func (ec *ephemeralController) onPVCDelete(obj interface{}) { + pvc, ok := obj.(*v1.PersistentVolumeClaim) + if !ok { + return + } + + // Someone deleted a PVC, either intentionally or + // accidentally. If there is a pod referencing it because of + // an ephemeral volume, then we should re-create the PVC. + // The common indexer does some prefiltering for us by + // limiting the list to those pods which reference + // the PVC. + objs, err := ec.podIndexer.ByIndex(common.PodPVCIndex, fmt.Sprintf("%s/%s", pvc.Namespace, pvc.Name)) + if err != nil { + runtime.HandleError(fmt.Errorf("listing pods from cache: %v", err)) + return + } + for _, obj := range objs { + ec.enqueuePod(obj) + } +} + +func (ec *ephemeralController) Run(ctx context.Context, workers int) { + defer runtime.HandleCrash() + defer ec.queue.ShutDown() + + klog.Infof("Starting ephemeral volume controller") + defer klog.Infof("Shutting down ephemeral volume controller") + + if !cache.WaitForNamedCacheSync("ephemeral", ctx.Done(), ec.podSynced, ec.pvcsSynced) { + return + } + + for i := 0; i < workers; i++ { + go wait.UntilWithContext(ctx, ec.runWorker, time.Second) + } + + <-ctx.Done() +} + +func (ec *ephemeralController) runWorker(ctx context.Context) { + for ec.processNextWorkItem(ctx) { + } +} + +func (ec *ephemeralController) processNextWorkItem(ctx context.Context) bool { + key, shutdown := ec.queue.Get() + if shutdown { + return false + } + defer ec.queue.Done(key) + + err := ec.syncHandler(ctx, key.(string)) + if err == nil { + ec.queue.Forget(key) + return true + } + + runtime.HandleError(fmt.Errorf("%v failed with: %v", key, err)) + ec.queue.AddRateLimited(key) + + return true +} + +// syncHandler is invoked for each pod which might need to be processed. +// If an error is returned from this function, the pod will be requeued. +func (ec *ephemeralController) syncHandler(ctx context.Context, key string) error { + namespace, name, err := kcache.SplitMetaNamespaceKey(key) + if err != nil { + return err + } + pod, err := ec.podLister.Pods(namespace).Get(name) + if err != nil { + if errors.IsNotFound(err) { + klog.V(5).Infof("ephemeral: nothing to do for pod %s, it is gone", key) + return nil + } + klog.V(5).Infof("Error getting pod %s/%s (uid: %q) from informer : %v", pod.Namespace, pod.Name, pod.UID, err) + return err + } + + // Ignore pods which are already getting deleted. + if pod.DeletionTimestamp != nil { + klog.V(5).Infof("ephemeral: nothing to do for pod %s, it is marked for deletion", key) + return nil + } + + for _, vol := range pod.Spec.Volumes { + if err := ec.handleVolume(ctx, pod, vol); err != nil { + ec.recorder.Event(pod, v1.EventTypeWarning, events.FailedBinding, fmt.Sprintf("ephemeral volume %s: %v", vol.Name, err)) + return fmt.Errorf("pod %s, ephemeral volume %s: %v", key, vol.Name, err) + } + } + + return nil +} + +// handleEphemeralVolume is invoked for each volume of a pod. +func (ec *ephemeralController) handleVolume(ctx context.Context, pod *v1.Pod, vol v1.Volume) error { + klog.V(5).Infof("ephemeral: checking volume %s", vol.Name) + if vol.Ephemeral == nil { + return nil + } + + pvcName := ephemeral.VolumeClaimName(pod, &vol) + pvc, err := ec.pvcLister.PersistentVolumeClaims(pod.Namespace).Get(pvcName) + if err != nil && !errors.IsNotFound(err) { + return err + } + if pvc != nil { + if err := ephemeral.VolumeIsForPod(pod, pvc); err != nil { + return err + } + // Already created, nothing more to do. + klog.V(5).Infof("ephemeral: volume %s: PVC %s already created", vol.Name, pvcName) + return nil + } + + // Create the PVC with pod as owner. + isTrue := true + pvc = &v1.PersistentVolumeClaim{ + ObjectMeta: metav1.ObjectMeta{ + Name: pvcName, + OwnerReferences: []metav1.OwnerReference{ + { + APIVersion: "v1", + Kind: "Pod", + Name: pod.Name, + UID: pod.UID, + Controller: &isTrue, + BlockOwnerDeletion: &isTrue, + }, + }, + Annotations: vol.Ephemeral.VolumeClaimTemplate.Annotations, + Labels: vol.Ephemeral.VolumeClaimTemplate.Labels, + }, + Spec: vol.Ephemeral.VolumeClaimTemplate.Spec, + } + ephemeralvolumemetrics.EphemeralVolumeCreateAttempts.Inc() + _, err = ec.kubeClient.CoreV1().PersistentVolumeClaims(pod.Namespace).Create(ctx, pvc, metav1.CreateOptions{}) + if err != nil { + ephemeralvolumemetrics.EphemeralVolumeCreateFailures.Inc() + return fmt.Errorf("create PVC %s: %v", pvcName, err) + } + return nil +} diff --git a/pkg/controller/resourceclaim/controller_test.go b/pkg/controller/resourceclaim/controller_test.go new file mode 100644 index 00000000000..e6c8d6d396f --- /dev/null +++ b/pkg/controller/resourceclaim/controller_test.go @@ -0,0 +1,277 @@ +/* +Copyright 2020 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package ephemeral + +import ( + "context" + "errors" + "sort" + "testing" + + v1 "k8s.io/api/core/v1" + // storagev1 "k8s.io/api/storage/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + // "k8s.io/apimachinery/pkg/types" + apierrors "k8s.io/apimachinery/pkg/api/errors" + "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/types" + "k8s.io/client-go/informers" + "k8s.io/client-go/kubernetes/fake" + k8stesting "k8s.io/client-go/testing" + "k8s.io/client-go/tools/cache" + kcache "k8s.io/client-go/tools/cache" + "k8s.io/component-base/metrics/testutil" + "k8s.io/klog/v2" + "k8s.io/kubernetes/pkg/controller" + ephemeralvolumemetrics "k8s.io/kubernetes/pkg/controller/volume/ephemeral/metrics" + + "github.com/stretchr/testify/assert" +) + +var ( + testPodName = "test-pod" + testNamespace = "my-namespace" + testPodUID = types.UID("uidpod1") + otherNamespace = "not-my-namespace" + ephemeralVolumeName = "ephemeral-volume" + + testPod = makePod(testPodName, testNamespace, testPodUID) + testPodWithEphemeral = makePod(testPodName, testNamespace, testPodUID, *makeEphemeralVolume(ephemeralVolumeName)) + testPodEphemeralClaim = makePVC(testPodName+"-"+ephemeralVolumeName, testNamespace, makeOwnerReference(testPodWithEphemeral, true)) + conflictingClaim = makePVC(testPodName+"-"+ephemeralVolumeName, testNamespace, nil) + otherNamespaceClaim = makePVC(testPodName+"-"+ephemeralVolumeName, otherNamespace, nil) +) + +func init() { + klog.InitFlags(nil) +} + +func TestSyncHandler(t *testing.T) { + tests := []struct { + name string + podKey string + pvcs []*v1.PersistentVolumeClaim + pods []*v1.Pod + expectedPVCs []v1.PersistentVolumeClaim + expectedError bool + expectedMetrics expectedMetrics + }{ + { + name: "create", + pods: []*v1.Pod{testPodWithEphemeral}, + podKey: podKey(testPodWithEphemeral), + expectedPVCs: []v1.PersistentVolumeClaim{*testPodEphemeralClaim}, + expectedMetrics: expectedMetrics{1, 0}, + }, + { + name: "no-such-pod", + podKey: podKey(testPodWithEphemeral), + }, + { + name: "pod-deleted", + pods: func() []*v1.Pod { + deleted := metav1.Now() + pods := []*v1.Pod{testPodWithEphemeral.DeepCopy()} + pods[0].DeletionTimestamp = &deleted + return pods + }(), + podKey: podKey(testPodWithEphemeral), + }, + { + name: "no-volumes", + pods: []*v1.Pod{testPod}, + podKey: podKey(testPod), + }, + { + name: "create-with-other-PVC", + pods: []*v1.Pod{testPodWithEphemeral}, + podKey: podKey(testPodWithEphemeral), + pvcs: []*v1.PersistentVolumeClaim{otherNamespaceClaim}, + expectedPVCs: []v1.PersistentVolumeClaim{*otherNamespaceClaim, *testPodEphemeralClaim}, + expectedMetrics: expectedMetrics{1, 0}, + }, + { + name: "wrong-PVC-owner", + pods: []*v1.Pod{testPodWithEphemeral}, + podKey: podKey(testPodWithEphemeral), + pvcs: []*v1.PersistentVolumeClaim{conflictingClaim}, + expectedPVCs: []v1.PersistentVolumeClaim{*conflictingClaim}, + expectedError: true, + }, + { + name: "create-conflict", + pods: []*v1.Pod{testPodWithEphemeral}, + podKey: podKey(testPodWithEphemeral), + expectedMetrics: expectedMetrics{1, 1}, + expectedError: true, + }, + } + + for _, tc := range tests { + // Run sequentially because of global logging and global metrics. + t.Run(tc.name, func(t *testing.T) { + // There is no good way to shut down the informers. They spawn + // various goroutines and some of them (in particular shared informer) + // become very unhappy ("close on closed channel") when using a context + // that gets cancelled. Therefore we just keep everything running. + ctx := context.Background() + + var objects []runtime.Object + for _, pod := range tc.pods { + objects = append(objects, pod) + } + for _, pvc := range tc.pvcs { + objects = append(objects, pvc) + } + + fakeKubeClient := createTestClient(objects...) + if tc.expectedMetrics.numFailures > 0 { + fakeKubeClient.PrependReactor("create", "persistentvolumeclaims", func(action k8stesting.Action) (handled bool, ret runtime.Object, err error) { + return true, nil, apierrors.NewConflict(action.GetResource().GroupResource(), "fake name", errors.New("fake conflict")) + }) + } + setupMetrics() + informerFactory := informers.NewSharedInformerFactory(fakeKubeClient, controller.NoResyncPeriodFunc()) + podInformer := informerFactory.Core().V1().Pods() + pvcInformer := informerFactory.Core().V1().PersistentVolumeClaims() + + c, err := NewController(fakeKubeClient, podInformer, pvcInformer) + if err != nil { + t.Fatalf("error creating ephemeral controller : %v", err) + } + ec, _ := c.(*ephemeralController) + + // Ensure informers are up-to-date. + go informerFactory.Start(ctx.Done()) + informerFactory.WaitForCacheSync(ctx.Done()) + cache.WaitForCacheSync(ctx.Done(), podInformer.Informer().HasSynced, pvcInformer.Informer().HasSynced) + + err = ec.syncHandler(context.TODO(), tc.podKey) + if err != nil && !tc.expectedError { + t.Fatalf("unexpected error while running handler: %v", err) + } + if err == nil && tc.expectedError { + t.Fatalf("unexpected success") + } + + pvcs, err := fakeKubeClient.CoreV1().PersistentVolumeClaims("").List(ctx, metav1.ListOptions{}) + if err != nil { + t.Fatalf("unexpected error while listing PVCs: %v", err) + } + assert.Equal(t, sortPVCs(tc.expectedPVCs), sortPVCs(pvcs.Items)) + expectMetrics(t, tc.expectedMetrics) + }) + } +} + +func makePVC(name, namespace string, owner *metav1.OwnerReference) *v1.PersistentVolumeClaim { + pvc := &v1.PersistentVolumeClaim{ + ObjectMeta: metav1.ObjectMeta{Name: name, Namespace: namespace}, + Spec: v1.PersistentVolumeClaimSpec{}, + } + if owner != nil { + pvc.OwnerReferences = []metav1.OwnerReference{*owner} + } + + return pvc +} + +func makeEphemeralVolume(name string) *v1.Volume { + return &v1.Volume{ + Name: name, + VolumeSource: v1.VolumeSource{ + Ephemeral: &v1.EphemeralVolumeSource{ + VolumeClaimTemplate: &v1.PersistentVolumeClaimTemplate{}, + }, + }, + } +} + +func makePod(name, namespace string, uid types.UID, volumes ...v1.Volume) *v1.Pod { + pvc := &v1.Pod{ + ObjectMeta: metav1.ObjectMeta{Name: name, Namespace: namespace, UID: uid}, + Spec: v1.PodSpec{ + Volumes: volumes, + }, + } + + return pvc +} + +func podKey(pod *v1.Pod) string { + key, _ := kcache.DeletionHandlingMetaNamespaceKeyFunc(testPodWithEphemeral) + return key +} + +func makeOwnerReference(pod *v1.Pod, isController bool) *metav1.OwnerReference { + isTrue := true + return &metav1.OwnerReference{ + APIVersion: "v1", + Kind: "Pod", + Name: pod.Name, + UID: pod.UID, + Controller: &isController, + BlockOwnerDeletion: &isTrue, + } +} + +func sortPVCs(pvcs []v1.PersistentVolumeClaim) []v1.PersistentVolumeClaim { + sort.Slice(pvcs, func(i, j int) bool { + return pvcs[i].Namespace < pvcs[j].Namespace || + pvcs[i].Name < pvcs[j].Name + }) + return pvcs +} + +func createTestClient(objects ...runtime.Object) *fake.Clientset { + fakeClient := fake.NewSimpleClientset(objects...) + return fakeClient +} + +// Metrics helpers + +type expectedMetrics struct { + numCreated int + numFailures int +} + +func expectMetrics(t *testing.T, em expectedMetrics) { + t.Helper() + + actualCreated, err := testutil.GetCounterMetricValue(ephemeralvolumemetrics.EphemeralVolumeCreateAttempts) + handleErr(t, err, "ephemeralVolumeCreate") + if actualCreated != float64(em.numCreated) { + t.Errorf("Expected PVCs to be created %d, got %v", em.numCreated, actualCreated) + } + actualConflicts, err := testutil.GetCounterMetricValue(ephemeralvolumemetrics.EphemeralVolumeCreateFailures) + handleErr(t, err, "ephemeralVolumeCreate/Conflict") + if actualConflicts != float64(em.numFailures) { + t.Errorf("Expected PVCs to have conflicts %d, got %v", em.numFailures, actualConflicts) + } +} + +func handleErr(t *testing.T, err error, metricName string) { + if err != nil { + t.Errorf("Failed to get %s value, err: %v", metricName, err) + } +} + +func setupMetrics() { + ephemeralvolumemetrics.RegisterMetrics() + ephemeralvolumemetrics.EphemeralVolumeCreateAttempts.Reset() + ephemeralvolumemetrics.EphemeralVolumeCreateFailures.Reset() +} diff --git a/pkg/controller/resourceclaim/doc.go b/pkg/controller/resourceclaim/doc.go new file mode 100644 index 00000000000..ae45cbaad1d --- /dev/null +++ b/pkg/controller/resourceclaim/doc.go @@ -0,0 +1,21 @@ +/* +Copyright 2020 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +// Package ephemeral implements the controller part of +// https://github.com/kubernetes/enhancements/tree/master/keps/sig-storage/1698-generic-ephemeral-volumes +// +// It was derived from the expand controller. +package ephemeral diff --git a/pkg/controller/resourceclaim/metrics/metrics.go b/pkg/controller/resourceclaim/metrics/metrics.go new file mode 100644 index 00000000000..20eb9f1f6e4 --- /dev/null +++ b/pkg/controller/resourceclaim/metrics/metrics.go @@ -0,0 +1,58 @@ +/* +Copyright 2021 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package metrics + +import ( + "sync" + + "k8s.io/component-base/metrics" + "k8s.io/component-base/metrics/legacyregistry" +) + +// EphemeralVolumeSubsystem - subsystem name used for Endpoint Slices. +const EphemeralVolumeSubsystem = "ephemeral_volume_controller" + +var ( + // EphemeralVolumeCreateAttempts tracks the number of + // PersistentVolumeClaims().Create calls (both successful and unsuccessful) + EphemeralVolumeCreateAttempts = metrics.NewCounter( + &metrics.CounterOpts{ + Subsystem: EphemeralVolumeSubsystem, + Name: "create_total", + Help: "Number of PersistenVolumeClaims creation requests", + StabilityLevel: metrics.ALPHA, + }) + // EphemeralVolumeCreateFailures tracks the number of unsuccessful + // PersistentVolumeClaims().Create calls + EphemeralVolumeCreateFailures = metrics.NewCounter( + &metrics.CounterOpts{ + Subsystem: EphemeralVolumeSubsystem, + Name: "create_failures_total", + Help: "Number of PersistenVolumeClaims creation requests", + StabilityLevel: metrics.ALPHA, + }) +) + +var registerMetrics sync.Once + +// RegisterMetrics registers EphemeralVolume metrics. +func RegisterMetrics() { + registerMetrics.Do(func() { + legacyregistry.MustRegister(EphemeralVolumeCreateAttempts) + legacyregistry.MustRegister(EphemeralVolumeCreateFailures) + }) +}