mirror of
https://github.com/k3s-io/kubernetes.git
synced 2025-08-17 15:50:10 +00:00
refactor: use the provided sharedInformerFactory for params
This commit is contained in:
parent
9a595209bd
commit
6323c106e9
@ -134,6 +134,7 @@ func NewAdmissionController(
|
|||||||
restMapper,
|
restMapper,
|
||||||
client,
|
client,
|
||||||
dynamicClient,
|
dynamicClient,
|
||||||
|
informerFactory,
|
||||||
nil,
|
nil,
|
||||||
NewMatcher(matching.NewMatcher(informerFactory.Core().V1().Namespaces().Lister(), client)),
|
NewMatcher(matching.NewMatcher(informerFactory.Core().V1().Namespaces().Lister(), client)),
|
||||||
generic.NewInformer[*v1alpha1.ValidatingAdmissionPolicy](
|
generic.NewInformer[*v1alpha1.ValidatingAdmissionPolicy](
|
||||||
|
@ -40,7 +40,6 @@ import (
|
|||||||
"k8s.io/client-go/dynamic/dynamicinformer"
|
"k8s.io/client-go/dynamic/dynamicinformer"
|
||||||
"k8s.io/client-go/informers"
|
"k8s.io/client-go/informers"
|
||||||
"k8s.io/client-go/kubernetes"
|
"k8s.io/client-go/kubernetes"
|
||||||
k8sscheme "k8s.io/client-go/kubernetes/scheme"
|
|
||||||
"k8s.io/client-go/tools/cache"
|
"k8s.io/client-go/tools/cache"
|
||||||
)
|
)
|
||||||
|
|
||||||
@ -48,6 +47,7 @@ type policyController struct {
|
|||||||
once sync.Once
|
once sync.Once
|
||||||
context context.Context
|
context context.Context
|
||||||
dynamicClient dynamic.Interface
|
dynamicClient dynamic.Interface
|
||||||
|
informerFactory informers.SharedInformerFactory
|
||||||
restMapper meta.RESTMapper
|
restMapper meta.RESTMapper
|
||||||
policyDefinitionsController generic.Controller[*v1alpha1.ValidatingAdmissionPolicy]
|
policyDefinitionsController generic.Controller[*v1alpha1.ValidatingAdmissionPolicy]
|
||||||
policyBindingController generic.Controller[*v1alpha1.ValidatingAdmissionPolicyBinding]
|
policyBindingController generic.Controller[*v1alpha1.ValidatingAdmissionPolicyBinding]
|
||||||
@ -61,13 +61,10 @@ type policyController struct {
|
|||||||
|
|
||||||
newValidator
|
newValidator
|
||||||
|
|
||||||
// Lock which protects:
|
client kubernetes.Interface
|
||||||
// - cachedPolicies
|
// Lock which protects
|
||||||
// - paramCRDControllers
|
// All Below fields
|
||||||
// - definitionInfo
|
// All above fields should be assumed constant
|
||||||
// - bindingInfos
|
|
||||||
// - definitionsToBindings
|
|
||||||
// All other fields should be assumed constant
|
|
||||||
mutex sync.RWMutex
|
mutex sync.RWMutex
|
||||||
|
|
||||||
cachedPolicies []policyData
|
cachedPolicies []policyData
|
||||||
@ -88,8 +85,6 @@ type policyController struct {
|
|||||||
// All keys must have at least one dependent binding
|
// All keys must have at least one dependent binding
|
||||||
// All binding names MUST exist as a key bindingInfos
|
// All binding names MUST exist as a key bindingInfos
|
||||||
definitionsToBindings map[namespacedName]sets.Set[namespacedName]
|
definitionsToBindings map[namespacedName]sets.Set[namespacedName]
|
||||||
|
|
||||||
client kubernetes.Interface
|
|
||||||
}
|
}
|
||||||
|
|
||||||
type newValidator func(validationFilter cel.Filter, celMatcher matchconditions.Matcher, auditAnnotationFilter, messageFilter cel.Filter, failurePolicy *v1.FailurePolicyType) Validator
|
type newValidator func(validationFilter cel.Filter, celMatcher matchconditions.Matcher, auditAnnotationFilter, messageFilter cel.Filter, failurePolicy *v1.FailurePolicyType) Validator
|
||||||
@ -98,6 +93,7 @@ func newPolicyController(
|
|||||||
restMapper meta.RESTMapper,
|
restMapper meta.RESTMapper,
|
||||||
client kubernetes.Interface,
|
client kubernetes.Interface,
|
||||||
dynamicClient dynamic.Interface,
|
dynamicClient dynamic.Interface,
|
||||||
|
informerFactory informers.SharedInformerFactory,
|
||||||
filterCompiler cel.FilterCompiler,
|
filterCompiler cel.FilterCompiler,
|
||||||
matcher Matcher,
|
matcher Matcher,
|
||||||
policiesInformer generic.Informer[*v1alpha1.ValidatingAdmissionPolicy],
|
policiesInformer generic.Informer[*v1alpha1.ValidatingAdmissionPolicy],
|
||||||
@ -128,9 +124,10 @@ func newPolicyController(
|
|||||||
Name: "cel-policy-bindings",
|
Name: "cel-policy-bindings",
|
||||||
},
|
},
|
||||||
),
|
),
|
||||||
restMapper: restMapper,
|
restMapper: restMapper,
|
||||||
dynamicClient: dynamicClient,
|
dynamicClient: dynamicClient,
|
||||||
client: client,
|
informerFactory: informerFactory,
|
||||||
|
client: client,
|
||||||
}
|
}
|
||||||
return res
|
return res
|
||||||
}
|
}
|
||||||
@ -235,7 +232,6 @@ func (c *policyController) reconcilePolicyDefinitionSpec(namespace, name string,
|
|||||||
// Skip setting up controller for empty param type
|
// Skip setting up controller for empty param type
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// find GVR for params
|
// find GVR for params
|
||||||
// Parse param source into a GVK
|
// Parse param source into a GVK
|
||||||
|
|
||||||
@ -262,101 +258,74 @@ func (c *policyController) reconcilePolicyDefinitionSpec(namespace, name string,
|
|||||||
return info.configurationError
|
return info.configurationError
|
||||||
}
|
}
|
||||||
|
|
||||||
|
paramInfo := c.ensureParamInfo(paramSource, paramsGVR.Resource)
|
||||||
|
paramInfo.dependentDefinitions.Insert(nn)
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// Ensures that there is an informer started for the given GVK to be used as a
|
||||||
|
// param
|
||||||
|
func (c *policyController) ensureParamInfo(paramSource *v1alpha1.ParamKind, paramsGVR schema.GroupVersionResource) *paramInfo {
|
||||||
if info, ok := c.paramsCRDControllers[*paramSource]; ok {
|
if info, ok := c.paramsCRDControllers[*paramSource]; ok {
|
||||||
// If a param controller is already active for this paramsource, make
|
return info
|
||||||
// sure it is tracking this policy's dependency upon it
|
}
|
||||||
info.dependentDefinitions.Insert(nn)
|
|
||||||
|
|
||||||
|
// We are not watching this param. Start an informer for it.
|
||||||
|
instanceContext, instanceCancel := context.WithCancel(c.context)
|
||||||
|
|
||||||
|
var informer cache.SharedIndexInformer
|
||||||
|
|
||||||
|
// Try to see if our provided informer factory has an informer for this type.
|
||||||
|
// We assume the informer is already started, and starts all types associated
|
||||||
|
// with it.
|
||||||
|
if genericInformer, err := c.informerFactory.ForResource(paramsGVR); err == nil {
|
||||||
|
informer = genericInformer.Informer()
|
||||||
|
|
||||||
|
// Ensure the informer is started
|
||||||
|
// Use policyController's context rather than the instance context.
|
||||||
|
// PolicyController context is expected to last until app shutdown
|
||||||
|
// This is due to behavior of informerFactory which would cause the
|
||||||
|
// informer to stop running once the context is cancelled, and
|
||||||
|
// never started again.
|
||||||
|
c.informerFactory.Start(c.context.Done())
|
||||||
} else {
|
} else {
|
||||||
instanceContext, instanceCancel := context.WithCancel(c.context)
|
// Dynamic JSON informer fallback.
|
||||||
|
// Cannot use shared dynamic informer since it would be impossible
|
||||||
var informer cache.SharedIndexInformer
|
// to clean CRD informers properly with multiple dependents
|
||||||
|
// (cannot start ahead of time, and cannot track dependencies via stopCh)
|
||||||
// Informer Factory is optional
|
informer = dynamicinformer.NewFilteredDynamicInformer(
|
||||||
if c.client != nil {
|
c.dynamicClient,
|
||||||
// Create temporary informer factory
|
paramsGVR,
|
||||||
// Cannot use the k8s shared informer factory for dynamic params informer.
|
corev1.NamespaceAll,
|
||||||
// Would leak unnecessary informers when we are done since we would have to
|
// Use same interval as is used for k8s typed sharedInformerFactory
|
||||||
// call informerFactory.Start() with a longer-lived stopCh than necessary.
|
// https://github.com/kubernetes/kubernetes/blob/7e0923899fed622efbc8679cca6b000d43633e38/cmd/kube-apiserver/app/server.go#L430
|
||||||
// SharedInformerFactory does not support temporary usage.
|
10*time.Minute,
|
||||||
dynamicFactory := informers.NewSharedInformerFactory(c.client, 10*time.Minute)
|
cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc},
|
||||||
|
nil,
|
||||||
// Look for a typed informer. If it does not exist
|
).Informer()
|
||||||
genericInformer, err := dynamicFactory.ForResource(paramsGVR.Resource)
|
|
||||||
|
|
||||||
// Ignore error. We fallback to dynamic informer if there is no
|
|
||||||
// typed informer
|
|
||||||
if err != nil {
|
|
||||||
informer = nil
|
|
||||||
} else {
|
|
||||||
informer = genericInformer.Informer()
|
|
||||||
|
|
||||||
// Set transformer on the informer to workaround inconsistency
|
|
||||||
// where typed objects have TypeMeta wiped out but dynamic
|
|
||||||
// objects keep kind/apiVersion fields
|
|
||||||
informer.SetTransform(func(i interface{}) (interface{}, error) {
|
|
||||||
// Ensure param is populated with its GVK for consistency
|
|
||||||
// (CRD dynamic informer always returns objects with kind/apiversion,
|
|
||||||
// but native types do not include populated TypeMeta.
|
|
||||||
if param := i.(runtime.Object); param != nil {
|
|
||||||
if param.GetObjectKind().GroupVersionKind().Empty() {
|
|
||||||
// https://github.com/kubernetes/client-go/issues/413#issue-324586398
|
|
||||||
gvks, _, _ := k8sscheme.Scheme.ObjectKinds(param)
|
|
||||||
for _, gvk := range gvks {
|
|
||||||
if len(gvk.Kind) == 0 {
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
if len(gvk.Version) == 0 || gvk.Version == runtime.APIVersionInternal {
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
param.GetObjectKind().SetGroupVersionKind(gvk)
|
|
||||||
break
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
return i, nil
|
|
||||||
})
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
if informer == nil {
|
|
||||||
// Dynamic JSON informer fallback.
|
|
||||||
// Cannot use shared dynamic informer since it would be impossible
|
|
||||||
// to clean CRD informers properly with multiple dependents
|
|
||||||
// (cannot start ahead of time, and cannot track dependencies via stopCh)
|
|
||||||
informer = dynamicinformer.NewFilteredDynamicInformer(
|
|
||||||
c.dynamicClient,
|
|
||||||
paramsGVR.Resource,
|
|
||||||
corev1.NamespaceAll,
|
|
||||||
// Use same interval as is used for k8s typed sharedInformerFactory
|
|
||||||
// https://github.com/kubernetes/kubernetes/blob/7e0923899fed622efbc8679cca6b000d43633e38/cmd/kube-apiserver/app/server.go#L430
|
|
||||||
10*time.Minute,
|
|
||||||
cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc},
|
|
||||||
nil,
|
|
||||||
).Informer()
|
|
||||||
}
|
|
||||||
|
|
||||||
controller := generic.NewController(
|
|
||||||
generic.NewInformer[runtime.Object](informer),
|
|
||||||
c.reconcileParams,
|
|
||||||
generic.ControllerOptions{
|
|
||||||
Workers: 1,
|
|
||||||
Name: paramSource.String() + "-controller",
|
|
||||||
},
|
|
||||||
)
|
|
||||||
|
|
||||||
c.paramsCRDControllers[*paramSource] = ¶mInfo{
|
|
||||||
controller: controller,
|
|
||||||
stop: instanceCancel,
|
|
||||||
dependentDefinitions: sets.New(nn),
|
|
||||||
}
|
|
||||||
|
|
||||||
go controller.Run(instanceContext)
|
|
||||||
go informer.Run(instanceContext.Done())
|
go informer.Run(instanceContext.Done())
|
||||||
}
|
}
|
||||||
|
|
||||||
return nil
|
controller := generic.NewController(
|
||||||
|
generic.NewInformer[runtime.Object](informer),
|
||||||
|
c.reconcileParams,
|
||||||
|
generic.ControllerOptions{
|
||||||
|
Workers: 1,
|
||||||
|
Name: paramSource.String() + "-controller",
|
||||||
|
},
|
||||||
|
)
|
||||||
|
|
||||||
|
ret := ¶mInfo{
|
||||||
|
controller: controller,
|
||||||
|
stop: instanceCancel,
|
||||||
|
dependentDefinitions: sets.New[namespacedName](),
|
||||||
|
}
|
||||||
|
c.paramsCRDControllers[*paramSource] = ret
|
||||||
|
|
||||||
|
go controller.Run(instanceContext)
|
||||||
|
return ret
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func (c *policyController) reconcilePolicyBinding(namespace, name string, binding *v1alpha1.ValidatingAdmissionPolicyBinding) error {
|
func (c *policyController) reconcilePolicyBinding(namespace, name string, binding *v1alpha1.ValidatingAdmissionPolicyBinding) error {
|
||||||
|
Loading…
Reference in New Issue
Block a user