Merge pull request #107326 from MikeSpreitzer/apf-concurrentize-config-producer

Concurrentize pkg/registry/flowcontrol/ensurer/strategy.go
This commit is contained in:
Kubernetes Prow Robot 2022-01-05 09:23:34 -08:00 committed by GitHub
commit 23ee308ed7
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

View File

@ -219,19 +219,27 @@ func ensureConfiguration(wrapper configurationWrapper, strategy ensureStrategy,
name := bootstrap.GetName() name := bootstrap.GetName()
configurationType := strategy.Name() configurationType := strategy.Name()
current, err := wrapper.Get(bootstrap.GetName()) var current configurationObject
if err != nil { var err error
for {
current, err = wrapper.Get(bootstrap.GetName())
if err == nil {
break
}
if !apierrors.IsNotFound(err) { if !apierrors.IsNotFound(err) {
return fmt.Errorf("failed to retrieve %s type=%s name=%q error=%w", wrapper.TypeName(), configurationType, name, err) return fmt.Errorf("failed to retrieve %s type=%s name=%q error=%w", wrapper.TypeName(), configurationType, name, err)
} }
// we always re-create a missing configuration object // we always re-create a missing configuration object
if _, err := wrapper.Create(bootstrap); err != nil { if _, err = wrapper.Create(bootstrap); err == nil {
return fmt.Errorf("cannot create %s type=%s name=%q error=%w", wrapper.TypeName(), configurationType, name, err) klog.V(2).InfoS(fmt.Sprintf("Successfully created %s", wrapper.TypeName()), "type", configurationType, "name", name)
return nil
} }
klog.V(2).InfoS(fmt.Sprintf("Successfully created %s", wrapper.TypeName()), "type", configurationType, "name", name) if !apierrors.IsAlreadyExists(err) {
return nil return fmt.Errorf("cannot create %s type=%s name=%q error=%w", wrapper.TypeName(), configurationType, name, err)
}
klog.V(5).InfoS(fmt.Sprintf("Something created the %s concurrently", wrapper.TypeName()), "type", configurationType, "name", name)
} }
klog.V(5).InfoS(fmt.Sprintf("The %s already exists, checking whether it is up to date", wrapper.TypeName()), "type", configurationType, "name", name) klog.V(5).InfoS(fmt.Sprintf("The %s already exists, checking whether it is up to date", wrapper.TypeName()), "type", configurationType, "name", name)
@ -247,12 +255,17 @@ func ensureConfiguration(wrapper configurationWrapper, strategy ensureStrategy,
return nil return nil
} }
if _, err := wrapper.Update(newObject); err != nil { if _, err = wrapper.Update(newObject); err == nil {
return fmt.Errorf("failed to update the %s, will retry later type=%s name=%q error=%w", wrapper.TypeName(), configurationType, name, err) klog.V(2).Infof("Updated the %s type=%s name=%q diff: %s", wrapper.TypeName(), configurationType, name, cmp.Diff(current, newObject))
return nil
} }
klog.V(2).Infof("Updated the %s type=%s name=%q diff: %s", wrapper.TypeName(), configurationType, name, cmp.Diff(current, newObject)) if apierrors.IsConflict(err) {
return nil klog.V(2).InfoS(fmt.Sprintf("Something updated the %s concurrently, I will check its spec later", wrapper.TypeName()), "type", configurationType, "name", name)
return nil
}
return fmt.Errorf("failed to update the %s, will retry later type=%s name=%q error=%w", wrapper.TypeName(), configurationType, name, err)
} }
func removeConfiguration(wrapper configurationWrapper, name string) error { func removeConfiguration(wrapper configurationWrapper, name string) error {
@ -281,6 +294,7 @@ func removeConfiguration(wrapper configurationWrapper, name string) error {
if err := wrapper.Delete(name); err != nil { if err := wrapper.Delete(name); err != nil {
if apierrors.IsNotFound(err) { if apierrors.IsNotFound(err) {
klog.V(5).InfoS(fmt.Sprintf("Something concurrently deleted the %s", wrapper.TypeName()), "name", name)
return nil return nil
} }