mirror of
https://github.com/k3s-io/kubernetes.git
synced 2025-07-27 13:37:30 +00:00
Get rid of unused flowSchemaRVs in P&F
This commit is contained in:
parent
73211256e8
commit
c79a0a0882
@ -327,7 +327,7 @@ func (cfgCtlr *configController) processNextWorkItem() bool {
|
|||||||
|
|
||||||
func(obj interface{}) {
|
func(obj interface{}) {
|
||||||
defer cfgCtlr.configQueue.Done(obj)
|
defer cfgCtlr.configQueue.Done(obj)
|
||||||
specificDelay, err := cfgCtlr.syncOne(map[string]string{})
|
specificDelay, err := cfgCtlr.syncOne()
|
||||||
switch {
|
switch {
|
||||||
case err != nil:
|
case err != nil:
|
||||||
klog.Error(err)
|
klog.Error(err)
|
||||||
@ -346,7 +346,7 @@ func (cfgCtlr *configController) processNextWorkItem() bool {
|
|||||||
// objects that configure API Priority and Fairness and updates the
|
// objects that configure API Priority and Fairness and updates the
|
||||||
// local configController accordingly.
|
// local configController accordingly.
|
||||||
// Only invoke this in the one and only worker goroutine
|
// Only invoke this in the one and only worker goroutine
|
||||||
func (cfgCtlr *configController) syncOne(flowSchemaRVs map[string]string) (specificDelay time.Duration, err error) {
|
func (cfgCtlr *configController) syncOne() (specificDelay time.Duration, err error) {
|
||||||
klog.V(5).Infof("%s syncOne at %s", cfgCtlr.name, cfgCtlr.clock.Now().Format(timeFmt))
|
klog.V(5).Infof("%s syncOne at %s", cfgCtlr.name, cfgCtlr.clock.Now().Format(timeFmt))
|
||||||
all := labels.Everything()
|
all := labels.Everything()
|
||||||
newPLs, err := cfgCtlr.plLister.List(all)
|
newPLs, err := cfgCtlr.plLister.List(all)
|
||||||
@ -357,7 +357,7 @@ func (cfgCtlr *configController) syncOne(flowSchemaRVs map[string]string) (speci
|
|||||||
if err != nil {
|
if err != nil {
|
||||||
return 0, fmt.Errorf("unable to list FlowSchema objects: %w", err)
|
return 0, fmt.Errorf("unable to list FlowSchema objects: %w", err)
|
||||||
}
|
}
|
||||||
return cfgCtlr.digestConfigObjects(newPLs, newFSs, flowSchemaRVs)
|
return cfgCtlr.digestConfigObjects(newPLs, newFSs)
|
||||||
}
|
}
|
||||||
|
|
||||||
// cfgMeal is the data involved in the process of digesting the API
|
// cfgMeal is the data involved in the process of digesting the API
|
||||||
@ -398,7 +398,7 @@ type fsStatusUpdate struct {
|
|||||||
// digestConfigObjects is given all the API objects that configure
|
// digestConfigObjects is given all the API objects that configure
|
||||||
// cfgCtlr and writes its consequent new configState.
|
// cfgCtlr and writes its consequent new configState.
|
||||||
// Only invoke this in the one and only worker goroutine
|
// Only invoke this in the one and only worker goroutine
|
||||||
func (cfgCtlr *configController) digestConfigObjects(newPLs []*flowcontrol.PriorityLevelConfiguration, newFSs []*flowcontrol.FlowSchema, flowSchemaRVs map[string]string) (time.Duration, error) {
|
func (cfgCtlr *configController) digestConfigObjects(newPLs []*flowcontrol.PriorityLevelConfiguration, newFSs []*flowcontrol.FlowSchema) (time.Duration, error) {
|
||||||
fsStatusUpdates := cfgCtlr.lockAndDigestConfigObjects(newPLs, newFSs)
|
fsStatusUpdates := cfgCtlr.lockAndDigestConfigObjects(newPLs, newFSs)
|
||||||
var errs []error
|
var errs []error
|
||||||
currResult := updateAttempt{
|
currResult := updateAttempt{
|
||||||
@ -427,11 +427,9 @@ func (cfgCtlr *configController) digestConfigObjects(newPLs []*flowcontrol.Prior
|
|||||||
fsIfc := cfgCtlr.flowcontrolClient.FlowSchemas()
|
fsIfc := cfgCtlr.flowcontrolClient.FlowSchemas()
|
||||||
patchBytes := []byte(fmt.Sprintf(`{"status": {"conditions": [ %s ] } }`, string(enc)))
|
patchBytes := []byte(fmt.Sprintf(`{"status": {"conditions": [ %s ] } }`, string(enc)))
|
||||||
patchOptions := metav1.PatchOptions{FieldManager: cfgCtlr.asFieldManager}
|
patchOptions := metav1.PatchOptions{FieldManager: cfgCtlr.asFieldManager}
|
||||||
patchedFlowSchema, err := fsIfc.Patch(context.TODO(), fsu.flowSchema.Name, apitypes.StrategicMergePatchType, patchBytes, patchOptions, "status")
|
_, err = fsIfc.Patch(context.TODO(), fsu.flowSchema.Name, apitypes.StrategicMergePatchType, patchBytes, patchOptions, "status")
|
||||||
if err == nil {
|
if err != nil {
|
||||||
key, _ := cache.MetaNamespaceKeyFunc(patchedFlowSchema)
|
if apierrors.IsNotFound(err) {
|
||||||
flowSchemaRVs[key] = patchedFlowSchema.ResourceVersion
|
|
||||||
} else if apierrors.IsNotFound(err) {
|
|
||||||
// This object has been deleted. A notification is coming
|
// This object has been deleted. A notification is coming
|
||||||
// and nothing more needs to be done here.
|
// and nothing more needs to be done here.
|
||||||
klog.V(5).Infof("%s at %s: attempted update of concurrently deleted FlowSchema %s; nothing more needs to be done", cfgCtlr.name, cfgCtlr.clock.Now().Format(timeFmt), fsu.flowSchema.Name)
|
klog.V(5).Infof("%s at %s: attempted update of concurrently deleted FlowSchema %s; nothing more needs to be done", cfgCtlr.name, cfgCtlr.clock.Now().Format(timeFmt), fsu.flowSchema.Name)
|
||||||
@ -439,6 +437,7 @@ func (cfgCtlr *configController) digestConfigObjects(newPLs []*flowcontrol.Prior
|
|||||||
errs = append(errs, fmt.Errorf("failed to set a status.condition for FlowSchema %s: %w", fsu.flowSchema.Name, err))
|
errs = append(errs, fmt.Errorf("failed to set a status.condition for FlowSchema %s: %w", fsu.flowSchema.Name, err))
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
}
|
||||||
cfgCtlr.addUpdateResult(currResult)
|
cfgCtlr.addUpdateResult(currResult)
|
||||||
|
|
||||||
return suggestedDelay, utilerrors.NewAggregate(errs)
|
return suggestedDelay, utilerrors.NewAggregate(errs)
|
||||||
|
Loading…
Reference in New Issue
Block a user