Refactor agent rpc retry (#6515)

This commit is contained in:
6543
2026-05-01 12:49:40 +02:00
committed by GitHub
parent af313aad34
commit 63fccbed96

View File

@@ -19,7 +19,6 @@ import (
"encoding/json"
"errors"
"strings"
"sync"
"time"
"github.com/cenkalti/backoff/v5"
@@ -35,7 +34,10 @@ import (
"go.woodpecker-ci.org/woodpecker/v3/rpc/proto"
)
var ErrConnectionLost = errors.New("connection to server lost")
var (
ErrConnectionLost = errors.New("connection to server lost")
errNotConnected = errors.New("grpc: not connected")
)
const (
// Set grpc version on compile time to compare against server version response.
@@ -51,12 +53,12 @@ const (
)
type client struct {
client proto.WoodpeckerClient
conn *grpc.ClientConn
logs chan *proto.LogEntry
connectionLostAt time.Time
connectionLostLock sync.Mutex
// connectionRetryTimeout is the maximum time to wait for a connection to be restored before the agent gives up and exits.
client proto.WoodpeckerClient
conn *grpc.ClientConn
logs chan *proto.LogEntry
// connectionRetryTimeout is the maximum time to wait for a connection to be
// restored before the agent gives up and exits. Zero means infinite.
// Maps directly onto backoff.WithMaxElapsedTime.
connectionRetryTimeout time.Duration
}
@@ -86,31 +88,108 @@ func SetConnectionRetryTimeout(d time.Duration) ClientOption {
}
}
// IsConnected reports whether the underlying gRPC connection is currently up.
// It is a pure observer with no side effects.
func (c *client) IsConnected() bool {
state := c.conn.GetState()
connected := state == connectivity.Ready || state == connectivity.Idle
c.connectionLostLock.Lock()
defer c.connectionLostLock.Unlock()
if !connected && c.connectionLostAt.IsZero() {
c.connectionLostAt = time.Now()
} else if connected && !c.connectionLostAt.IsZero() {
c.connectionLostAt = time.Time{}
}
return connected
return state == connectivity.Ready || state == connectivity.Idle
}
func (c *client) shouldGiveUp() bool {
if c.connectionRetryTimeout == 0 || c.connectionLostAt.IsZero() {
return false
}
return time.Since(c.connectionLostAt) > c.connectionRetryTimeout
}
func (c *client) newBackOff() backoff.BackOff {
// retryOpts returns the backoff options used for every retry loop in this
// file. The exponential backoff parameters preserve the original tuning
// (10 ms initial, 10 s cap), and connectionRetryTimeout is wired straight into
// WithMaxElapsedTime — when it elapses, backoff.Retry returns the last error,
// which we translate into ErrConnectionLost in retryRPC.
func (c *client) retryOpts(op string) []backoff.RetryOption {
b := backoff.NewExponentialBackOff()
b.MaxInterval = 10 * time.Second //nolint:mnd
b.InitialInterval = 10 * time.Millisecond //nolint:mnd
return b
notify := func(err error, next time.Duration) {
// The "too_many_pings" GOAWAY is well-known noise; demote to trace.
// See https://github.com/woodpecker-ci/woodpecker/issues/717
if strings.Contains(err.Error(), `"too_many_pings"`) {
log.Trace().Err(err).Dur("retry_in", next).Msgf("grpc: %s(): too many keepalive pings without sending data", op)
return
}
if errors.Is(err, errNotConnected) {
log.Warn().Dur("retry_in", next).Msgf("grpc: %s() waiting for server connection...", op)
return
}
log.Warn().Err(err).Dur("retry_in", next).Msgf("grpc error: %s(): code: %v", op, status.Code(err))
}
return []backoff.RetryOption{
backoff.WithBackOff(b),
backoff.WithMaxElapsedTime(c.connectionRetryTimeout),
backoff.WithNotify(notify),
}
}
// retryRPC is the workhorse used by every RPC method in this file. It runs op
// under backoff.Retry with the standard options, and translates the few
// special outcomes the callers care about:
//
// - op succeeds -> (result, nil)
// - ctx canceled -> (zero, nil) same contract as before
// - MaxElapsedTime hit -> (zero, ErrConnectionLost)
// - permanent (fatal) -> (zero, underlying err)
//
// The op closure is responsible for:
// - returning errNotConnected when IsConnected() is false (Retry will sleep
// and call again — same effect as the old "if !c.IsConnected()" preamble)
// - returning backoff.Permanent(err) for unrecoverable gRPC codes
// - returning the raw error for retryable codes (Aborted/DataLoss/...)
func retryRPC[T any](ctx context.Context, c *client, opName string, op backoff.Operation[T]) (T, error) {
res, err := backoff.Retry(ctx, op, c.retryOpts(opName)...)
if err == nil {
return res, nil
}
var zero T
// Context canceled while inside Retry: callers historically swallowed this
// and returned a zero-value error, so preserve that contract.
if ctxErr := context.Cause(ctx); ctxErr != nil && errors.Is(err, ctxErr) {
log.Debug().Err(err).Msgf("grpc: %s(): context canceled", opName)
return zero, nil
}
// MaxElapsedTime exhausted while we were still in errNotConnected — give up.
if errors.Is(err, errNotConnected) {
log.Error().Msg("grpc: connection lost, giving up")
return zero, ErrConnectionLost
}
log.Error().Err(err).Msgf("grpc error: %s(): code: %v", opName, status.Code(err))
return zero, err
}
// classifyRPCErr inspects a gRPC error and returns either the same error (for
// retryable codes) or a backoff.Permanent wrapping it (for fatal codes). It is
// the single source of truth for which gRPC codes are worth retrying.
func classifyRPCErr(ctx context.Context, err error) error {
if err == nil {
return nil
}
switch status.Code(err) {
case codes.Canceled:
// If our own ctx is dead, surface that as the cause so Retry's
// context.Cause(ctx) check exits cleanly. Otherwise it's a server-side
// cancel that we treat as permanent.
if ctx.Err() != nil {
return backoff.Permanent(ctx.Err())
}
return backoff.Permanent(err)
case codes.Aborted,
codes.DataLoss,
codes.DeadlineExceeded,
codes.Internal,
codes.Unavailable:
return err
default:
return backoff.Permanent(err)
}
}
// Version returns the server- & grpc-version.
@@ -127,71 +206,27 @@ func (c *client) Version(ctx context.Context) (*rpc.Version, error) {
// Next returns the next workflow in the queue.
func (c *client) Next(ctx context.Context, filter rpc.Filter) (*rpc.Workflow, error) {
var res *proto.NextResponse
var err error
retry := c.newBackOff()
req := new(proto.NextRequest)
req.Filter = new(proto.Filter)
req.Filter.Labels = filter.Labels
for {
req := &proto.NextRequest{Filter: &proto.Filter{Labels: filter.Labels}}
res, err := retryRPC(ctx, c, "next", func() (*proto.NextResponse, error) {
if !c.IsConnected() {
if c.shouldGiveUp() {
log.Error().Msg("grpc: connection lost, giving up")
return nil, ErrConnectionLost
}
log.Warn().Msg("grpc: next() waiting for server connection...")
time.Sleep(retry.NextBackOff())
continue
}
res, err = c.client.Next(ctx, req)
if err == nil {
break
}
switch status.Code(err) {
case codes.Canceled:
if ctx.Err() != nil {
// expected as context was canceled
log.Debug().Err(err).Msgf("grpc error: next(): context canceled")
return nil, nil
}
log.Error().Err(err).Msgf("grpc error: next(): code: %v", status.Code(err))
return nil, err
case
codes.Aborted,
codes.DataLoss,
codes.DeadlineExceeded,
codes.Internal,
codes.Unavailable:
// non-fatal errors
// TODO: remove after adding continuous data exchange by something like #536
if strings.Contains(err.Error(), "\"too_many_pings\"") {
// https://github.com/woodpecker-ci/woodpecker/issues/717#issuecomment-1049365104
log.Trace().Err(err).Msg("grpc: to many keepalive pings without sending data")
} else {
log.Warn().Err(err).Msgf("grpc error: next(): code: %v", status.Code(err))
}
default:
log.Error().Err(err).Msgf("grpc error: next(): code: %v", status.Code(err))
return nil, err
}
select {
case <-time.After(retry.NextBackOff()):
case <-ctx.Done():
return nil, nil
return nil, errNotConnected
}
r, err := c.client.Next(ctx, req)
return r, classifyRPCErr(ctx, err)
})
if err != nil {
return nil, err
}
if res.GetWorkflow() == nil {
if res == nil || res.GetWorkflow() == nil {
return nil, nil
}
w := new(rpc.Workflow)
w.ID = res.GetWorkflow().GetId()
w.Timeout = res.GetWorkflow().GetTimeout()
w.Config = new(backend_types.Config)
w := &rpc.Workflow{
ID: res.GetWorkflow().GetId(),
Timeout: res.GetWorkflow().GetTimeout(),
Config: new(backend_types.Config),
}
if err := json.Unmarshal(res.GetWorkflow().GetPayload(), w.Config); err != nil {
log.Error().Err(err).Msgf("could not unmarshal workflow config of '%s'", w.ID)
}
@@ -200,289 +235,106 @@ func (c *client) Next(ctx context.Context, filter rpc.Filter) (*rpc.Workflow, er
// Wait blocks until the workflow with the given ID is marked as completed or canceled by the server.
func (c *client) Wait(ctx context.Context, workflowID string) (canceled bool, err error) {
retry := c.newBackOff()
req := new(proto.WaitRequest)
req.Id = workflowID
for {
req := &proto.WaitRequest{Id: workflowID}
resp, err := retryRPC(ctx, c, "wait", func() (*proto.WaitResponse, error) {
if !c.IsConnected() {
if c.shouldGiveUp() {
log.Error().Msg("grpc: connection lost, giving up")
return false, ErrConnectionLost
}
log.Warn().Msg("grpc: wait() waiting for server connection...")
time.Sleep(retry.NextBackOff())
continue
}
resp, err := c.client.Wait(ctx, req)
if err == nil {
// wait block was released normally as expected by server
return resp.GetCanceled(), nil
}
switch status.Code(err) {
case codes.Canceled:
if ctx.Err() != nil {
// expected as context was canceled
log.Debug().Err(err).Msgf("grpc error: wait(): context canceled")
return false, nil
}
log.Error().Err(err).Msgf("grpc error: wait(): code: %v", status.Code(err))
return false, err
case
codes.Aborted,
codes.DataLoss,
codes.DeadlineExceeded,
codes.Internal,
codes.Unavailable:
// non-fatal errors
log.Warn().Err(err).Msgf("grpc error: wait(): code: %v", status.Code(err))
default:
log.Error().Err(err).Msgf("grpc error: wait(): code: %v", status.Code(err))
return false, err
}
select {
case <-time.After(retry.NextBackOff()):
case <-ctx.Done():
return false, ctx.Err()
return nil, errNotConnected
}
r, err := c.client.Wait(ctx, req)
return r, classifyRPCErr(ctx, err)
})
if err != nil {
return false, err
}
if resp == nil {
return false, nil
}
return resp.GetCanceled(), nil
}
// Init signals the workflow is initialized.
func (c *client) Init(ctx context.Context, workflowID string, state rpc.WorkflowState) (err error) {
retry := c.newBackOff()
req := new(proto.InitRequest)
req.Id = workflowID
req.State = new(proto.WorkflowState)
req.State.Started = state.Started
req.State.Finished = state.Finished
req.State.Error = state.Error
req.State.Canceled = state.Canceled
for {
if !c.IsConnected() {
if c.shouldGiveUp() {
log.Error().Msg("grpc: connection lost, giving up")
return ErrConnectionLost
}
log.Warn().Msg("grpc: init() waiting for server connection...")
time.Sleep(retry.NextBackOff())
continue
}
_, err = c.client.Init(ctx, req)
if err == nil {
break
}
log.Error().Err(err).Msgf("grpc error: init(): code: %v", status.Code(err))
switch status.Code(err) {
case codes.Canceled:
if ctx.Err() != nil {
// expected as context was canceled
log.Debug().Err(err).Msgf("grpc error: init(): context canceled")
return nil
}
log.Error().Err(err).Msgf("grpc error: init(): code: %v", status.Code(err))
return err
case
codes.Aborted,
codes.DataLoss,
codes.DeadlineExceeded,
codes.Internal,
codes.Unavailable:
// non-fatal errors
log.Warn().Err(err).Msgf("grpc error: init(): code: %v", status.Code(err))
default:
log.Error().Err(err).Msgf("grpc error: init(): code: %v", status.Code(err))
return err
}
select {
case <-time.After(retry.NextBackOff()):
case <-ctx.Done():
return ctx.Err()
}
func (c *client) Init(ctx context.Context, workflowID string, state rpc.WorkflowState) error {
req := &proto.InitRequest{
Id: workflowID,
State: &proto.WorkflowState{
Started: state.Started,
Finished: state.Finished,
Error: state.Error,
Canceled: state.Canceled,
},
}
return nil
_, err := retryRPC(ctx, c, "init", func() (*proto.Empty, error) {
if !c.IsConnected() {
return nil, errNotConnected
}
r, err := c.client.Init(ctx, req)
return r, classifyRPCErr(ctx, err)
})
return err
}
// Done let agent signal to server the workflow has stopped.
func (c *client) Done(ctx context.Context, workflowID string, state rpc.WorkflowState) (err error) {
retry := c.newBackOff()
req := new(proto.DoneRequest)
req.Id = workflowID
req.State = new(proto.WorkflowState)
req.State.Started = state.Started
req.State.Finished = state.Finished
req.State.Error = state.Error
req.State.Canceled = state.Canceled
for {
if !c.IsConnected() {
if c.shouldGiveUp() {
log.Error().Msg("grpc: connection lost, giving up")
return ErrConnectionLost
}
log.Warn().Msg("grpc: done() waiting for server connection...")
time.Sleep(retry.NextBackOff())
continue
}
_, err = c.client.Done(ctx, req)
if err == nil {
break
}
log.Error().Err(err).Msgf("grpc error: done(): code: %v", status.Code(err))
switch status.Code(err) {
case codes.Canceled:
if ctx.Err() != nil {
// expected as context was canceled
log.Debug().Err(err).Msgf("grpc error: done(): context canceled")
return nil
}
log.Error().Err(err).Msgf("grpc error: done(): code: %v", status.Code(err))
return err
case
codes.Aborted,
codes.DataLoss,
codes.DeadlineExceeded,
codes.Internal,
codes.Unavailable:
// non-fatal errors
log.Warn().Err(err).Msgf("grpc error: done(): code: %v", status.Code(err))
default:
log.Error().Err(err).Msgf("grpc error: done(): code: %v", status.Code(err))
return err
}
select {
case <-time.After(retry.NextBackOff()):
case <-ctx.Done():
return ctx.Err()
}
func (c *client) Done(ctx context.Context, workflowID string, state rpc.WorkflowState) error {
req := &proto.DoneRequest{
Id: workflowID,
State: &proto.WorkflowState{
Started: state.Started,
Finished: state.Finished,
Error: state.Error,
Canceled: state.Canceled,
},
}
return nil
_, err := retryRPC(ctx, c, "done", func() (*proto.Empty, error) {
if !c.IsConnected() {
return nil, errNotConnected
}
r, err := c.client.Done(ctx, req)
return r, classifyRPCErr(ctx, err)
})
return err
}
// Extend extends the workflow deadline.
func (c *client) Extend(ctx context.Context, workflowID string) (err error) {
retry := c.newBackOff()
req := new(proto.ExtendRequest)
req.Id = workflowID
for {
func (c *client) Extend(ctx context.Context, workflowID string) error {
req := &proto.ExtendRequest{Id: workflowID}
_, err := retryRPC(ctx, c, "extend", func() (*proto.Empty, error) {
if !c.IsConnected() {
if c.shouldGiveUp() {
log.Error().Msg("grpc: connection lost, giving up")
return ErrConnectionLost
}
log.Warn().Msg("grpc: extend() waiting for server connection...")
time.Sleep(retry.NextBackOff())
continue
return nil, errNotConnected
}
_, err = c.client.Extend(ctx, req)
if err == nil {
break
}
log.Error().Err(err).Msgf("grpc error: extend(): code: %v", status.Code(err))
switch status.Code(err) {
case codes.Canceled:
if ctx.Err() != nil {
// expected as context was canceled
log.Debug().Err(err).Msgf("grpc error: extend(): context canceled")
return nil
}
log.Error().Err(err).Msgf("grpc error: extend(): code: %v", status.Code(err))
return err
case
codes.Aborted,
codes.DataLoss,
codes.DeadlineExceeded,
codes.Internal,
codes.Unavailable:
// non-fatal errors
log.Warn().Err(err).Msgf("grpc error: extend(): code: %v", status.Code(err))
default:
log.Error().Err(err).Msgf("grpc error: extend(): code: %v", status.Code(err))
return err
}
select {
case <-time.After(retry.NextBackOff()):
case <-ctx.Done():
return ctx.Err()
}
}
return nil
r, err := c.client.Extend(ctx, req)
return r, classifyRPCErr(ctx, err)
})
return err
}
// Update let agent updates the step state at the server.
func (c *client) Update(ctx context.Context, workflowID string, state rpc.StepState) (err error) {
retry := c.newBackOff()
req := new(proto.UpdateRequest)
req.Id = workflowID
req.State = new(proto.StepState)
req.State.StepUuid = state.StepUUID
req.State.Started = state.Started
req.State.Finished = state.Finished
req.State.Exited = state.Exited
req.State.ExitCode = int32(state.ExitCode)
req.State.Error = state.Error
req.State.Canceled = state.Canceled
req.State.Skipped = state.Skipped
for {
if !c.IsConnected() {
if c.shouldGiveUp() {
log.Error().Msg("grpc: connection lost, giving up")
return ErrConnectionLost
}
log.Warn().Msg("grpc: update() waiting for server connection...")
time.Sleep(retry.NextBackOff())
continue
}
_, err = c.client.Update(ctx, req)
if err == nil {
break
}
log.Error().Err(err).Msgf("grpc error: update(): code: %v", status.Code(err))
switch status.Code(err) {
case codes.Canceled:
if ctx.Err() != nil {
// expected as context was canceled
log.Debug().Err(err).Msgf("grpc error: update(): context canceled")
return nil
}
log.Error().Err(err).Msgf("grpc error: update(): code: %v", status.Code(err))
return err
case
codes.Aborted,
codes.DataLoss,
codes.DeadlineExceeded,
codes.Internal,
codes.Unavailable:
// non-fatal errors
log.Warn().Err(err).Msgf("grpc error: update(): code: %v", status.Code(err))
default:
log.Error().Err(err).Msgf("grpc error: update(): code: %v", status.Code(err))
return err
}
select {
case <-time.After(retry.NextBackOff()):
case <-ctx.Done():
return ctx.Err()
}
func (c *client) Update(ctx context.Context, workflowID string, state rpc.StepState) error {
req := &proto.UpdateRequest{
Id: workflowID,
State: &proto.StepState{
StepUuid: state.StepUUID,
Started: state.Started,
Finished: state.Finished,
Exited: state.Exited,
ExitCode: int32(state.ExitCode),
Error: state.Error,
Canceled: state.Canceled,
Skipped: state.Skipped,
},
}
return nil
_, err := retryRPC(ctx, c, "update", func() (*proto.Empty, error) {
if !c.IsConnected() {
return nil, errNotConnected
}
r, err := c.client.Update(ctx, req)
return r, classifyRPCErr(ctx, err)
})
return err
}
// EnqueueLog queues the log entry to be written in a batch later.
@@ -545,53 +397,26 @@ func (c *client) processLogs(ctx context.Context) {
func (c *client) sendLogs(ctx context.Context, entries []*proto.LogEntry) error {
req := &proto.LogRequest{LogEntries: entries}
retry := c.newBackOff()
for {
_, err := c.client.Log(ctx, req)
if err == nil {
break
}
switch status.Code(err) {
case codes.Canceled:
if ctx.Err() != nil {
// expected as context was canceled
log.Debug().Err(err).Msgf("grpc error: log(): context canceled")
return nil
}
log.Error().Err(err).Msgf("grpc error: log(): code: %v", status.Code(err))
return err
case
codes.Aborted,
codes.DataLoss,
codes.DeadlineExceeded,
codes.Internal,
codes.Unavailable:
// non-fatal errors
log.Warn().Err(err).Msgf("grpc error: log(): code: %v", status.Code(err))
default:
log.Error().Err(err).Msgf("grpc error: log(): code: %v", status.Code(err))
return err
}
select {
case <-time.After(retry.NextBackOff()):
case <-ctx.Done():
return ctx.Err()
}
}
return nil
// sendLogs intentionally does not gate on IsConnected — the original code
// didn't either. backoff.Retry will keep trying through transient transport
// errors until MaxElapsedTime elapses.
_, err := retryRPC(ctx, c, "log", func() (*proto.Empty, error) {
r, err := c.client.Log(ctx, req)
return r, classifyRPCErr(ctx, err)
})
return err
}
func (c *client) RegisterAgent(ctx context.Context, info rpc.AgentInfo) (int64, error) {
req := new(proto.RegisterAgentRequest)
req.Info = &proto.AgentInfo{
Platform: info.Platform,
Backend: info.Backend,
Version: info.Version,
Capacity: int32(info.Capacity),
CustomLabels: info.CustomLabels,
req := &proto.RegisterAgentRequest{
Info: &proto.AgentInfo{
Platform: info.Platform,
Backend: info.Backend,
Version: info.Version,
Capacity: int32(info.Capacity),
CustomLabels: info.CustomLabels,
},
}
res, err := c.client.RegisterAgent(ctx, req)
@@ -603,53 +428,15 @@ func (c *client) UnregisterAgent(ctx context.Context) error {
return err
}
func (c *client) ReportHealth(ctx context.Context) (err error) {
retry := c.newBackOff()
req := new(proto.ReportHealthRequest)
req.Status = "I am alive!"
func (c *client) ReportHealth(ctx context.Context) error {
req := &proto.ReportHealthRequest{Status: "I am alive!"}
for {
_, err := retryRPC(ctx, c, "report_health", func() (*proto.Empty, error) {
if !c.IsConnected() {
if c.shouldGiveUp() {
log.Error().Msg("grpc: connection lost, giving up")
return ErrConnectionLost
}
log.Warn().Msg("grpc: report_health() waiting for server connection...")
time.Sleep(retry.NextBackOff())
continue
return nil, errNotConnected
}
_, err = c.client.ReportHealth(ctx, req)
if err == nil {
return nil
}
switch status.Code(err) {
case codes.Canceled:
if ctx.Err() != nil {
// expected as context was canceled
log.Debug().Err(err).Msgf("grpc error: report_health(): context canceled")
return nil
}
log.Error().Err(err).Msgf("grpc error: report_health(): code: %v", status.Code(err))
return err
case
codes.Aborted,
codes.DataLoss,
codes.DeadlineExceeded,
codes.Internal:
// non-fatal errors
log.Warn().Err(err).Msgf("grpc error: report_health(): code: %v", status.Code(err))
case
// code = Unavailable desc = connection error: desc = \"transport: Error while dialing: dial tcp 1.2.3.4:443: i/o timeout\""
codes.Unavailable:
default:
log.Error().Err(err).Msgf("grpc error: report_health(): code: %v", status.Code(err))
return err
}
select {
case <-time.After(retry.NextBackOff()):
case <-ctx.Done():
return ctx.Err()
}
}
r, err := c.client.ReportHealth(ctx, req)
return r, classifyRPCErr(ctx, err)
})
return err
}