Merge pull request #20007 from smarterclayton/ipallocrace

19848: Retry service IP repair on conflict
This commit is contained in:
Zach Loafman 2016-01-22 15:02:05 -08:00
commit 0791de15e3
2 changed files with 23 additions and 4 deletions

View File

@ -22,6 +22,8 @@ import (
"time"
"k8s.io/kubernetes/pkg/api"
"k8s.io/kubernetes/pkg/api/errors"
client "k8s.io/kubernetes/pkg/client/unversioned"
"k8s.io/kubernetes/pkg/registry/service"
"k8s.io/kubernetes/pkg/registry/service/ipallocator"
"k8s.io/kubernetes/pkg/util"
@ -72,6 +74,11 @@ func (c *Repair) RunUntil(ch chan struct{}) {
// RunOnce verifies the state of the cluster IP allocations and returns an error if an unrecoverable problem occurs.
func (c *Repair) RunOnce() error {
return client.RetryOnConflict(client.DefaultBackoff, c.runOnce)
}
// runOnce verifies the state of the cluster IP allocations and returns an error if an unrecoverable problem occurs.
func (c *Repair) runOnce() error {
// TODO: (per smarterclayton) if Get() or ListServices() is a weak consistency read,
// or if they are executed against different leaders,
// the ordering guarantee required to ensure no IP is allocated twice is violated.
@ -127,12 +134,14 @@ func (c *Repair) RunOnce() error {
}
}
err = r.Snapshot(latest)
if err != nil {
return fmt.Errorf("unable to persist the updated service IP allocations: %v", err)
if err := r.Snapshot(latest); err != nil {
return fmt.Errorf("unable to snapshot the updated service IP allocations: %v", err)
}
if err := c.alloc.CreateOrUpdate(latest); err != nil {
if errors.IsConflict(err) {
return err
}
return fmt.Errorf("unable to persist the updated service IP allocations: %v", err)
}
return nil

View File

@ -21,6 +21,8 @@ import (
"time"
"k8s.io/kubernetes/pkg/api"
"k8s.io/kubernetes/pkg/api/errors"
client "k8s.io/kubernetes/pkg/client/unversioned"
"k8s.io/kubernetes/pkg/registry/service"
"k8s.io/kubernetes/pkg/registry/service/portallocator"
"k8s.io/kubernetes/pkg/util"
@ -57,6 +59,11 @@ func (c *Repair) RunUntil(ch chan struct{}) {
// RunOnce verifies the state of the port allocations and returns an error if an unrecoverable problem occurs.
func (c *Repair) RunOnce() error {
return client.RetryOnConflict(client.DefaultBackoff, c.runOnce)
}
// runOnce verifies the state of the port allocations and returns an error if an unrecoverable problem occurs.
func (c *Repair) runOnce() error {
// TODO: (per smarterclayton) if Get() or ListServices() is a weak consistency read,
// or if they are executed against different leaders,
// the ordering guarantee required to ensure no port is allocated twice is violated.
@ -116,10 +123,13 @@ func (c *Repair) RunOnce() error {
err = r.Snapshot(latest)
if err != nil {
return fmt.Errorf("unable to persist the updated port allocations: %v", err)
return fmt.Errorf("unable to snapshot the updated port allocations: %v", err)
}
if err := c.alloc.CreateOrUpdate(latest); err != nil {
if errors.IsConflict(err) {
return err
}
return fmt.Errorf("unable to persist the updated port allocations: %v", err)
}
return nil