diff --git a/agent/rpc/client_grpc.go b/agent/rpc/client_grpc.go index 318bc09383..71ab4404c3 100644 --- a/agent/rpc/client_grpc.go +++ b/agent/rpc/client_grpc.go @@ -19,7 +19,6 @@ import ( "encoding/json" "errors" "strings" - "sync" "time" "github.com/cenkalti/backoff/v5" @@ -35,7 +34,10 @@ import ( "go.woodpecker-ci.org/woodpecker/v3/rpc/proto" ) -var ErrConnectionLost = errors.New("connection to server lost") +var ( + ErrConnectionLost = errors.New("connection to server lost") + errNotConnected = errors.New("grpc: not connected") +) const ( // Set grpc version on compile time to compare against server version response. @@ -51,12 +53,12 @@ const ( ) type client struct { - client proto.WoodpeckerClient - conn *grpc.ClientConn - logs chan *proto.LogEntry - connectionLostAt time.Time - connectionLostLock sync.Mutex - // connectionRetryTimeout is the maximum time to wait for a connection to be restored before the agent gives up and exits. + client proto.WoodpeckerClient + conn *grpc.ClientConn + logs chan *proto.LogEntry + // connectionRetryTimeout is the maximum time to wait for a connection to be + // restored before the agent gives up and exits. Zero means infinite. + // Maps directly onto backoff.WithMaxElapsedTime. connectionRetryTimeout time.Duration } @@ -86,31 +88,108 @@ func SetConnectionRetryTimeout(d time.Duration) ClientOption { } } +// IsConnected reports whether the underlying gRPC connection is currently up. +// It is a pure observer with no side effects. func (c *client) IsConnected() bool { state := c.conn.GetState() - connected := state == connectivity.Ready || state == connectivity.Idle - c.connectionLostLock.Lock() - defer c.connectionLostLock.Unlock() - if !connected && c.connectionLostAt.IsZero() { - c.connectionLostAt = time.Now() - } else if connected && !c.connectionLostAt.IsZero() { - c.connectionLostAt = time.Time{} - } - return connected + return state == connectivity.Ready || state == connectivity.Idle } -func (c *client) shouldGiveUp() bool { - if c.connectionRetryTimeout == 0 || c.connectionLostAt.IsZero() { - return false - } - return time.Since(c.connectionLostAt) > c.connectionRetryTimeout -} - -func (c *client) newBackOff() backoff.BackOff { +// retryOpts returns the backoff options used for every retry loop in this +// file. The exponential backoff parameters preserve the original tuning +// (10 ms initial, 10 s cap), and connectionRetryTimeout is wired straight into +// WithMaxElapsedTime — when it elapses, backoff.Retry returns the last error, +// which we translate into ErrConnectionLost in retryRPC. +func (c *client) retryOpts(op string) []backoff.RetryOption { b := backoff.NewExponentialBackOff() b.MaxInterval = 10 * time.Second //nolint:mnd b.InitialInterval = 10 * time.Millisecond //nolint:mnd - return b + + notify := func(err error, next time.Duration) { + // The "too_many_pings" GOAWAY is well-known noise; demote to trace. + // See https://github.com/woodpecker-ci/woodpecker/issues/717 + if strings.Contains(err.Error(), `"too_many_pings"`) { + log.Trace().Err(err).Dur("retry_in", next).Msgf("grpc: %s(): too many keepalive pings without sending data", op) + return + } + if errors.Is(err, errNotConnected) { + log.Warn().Dur("retry_in", next).Msgf("grpc: %s() waiting for server connection...", op) + return + } + log.Warn().Err(err).Dur("retry_in", next).Msgf("grpc error: %s(): code: %v", op, status.Code(err)) + } + + return []backoff.RetryOption{ + backoff.WithBackOff(b), + backoff.WithMaxElapsedTime(c.connectionRetryTimeout), + backoff.WithNotify(notify), + } +} + +// retryRPC is the workhorse used by every RPC method in this file. It runs op +// under backoff.Retry with the standard options, and translates the few +// special outcomes the callers care about: +// +// - op succeeds -> (result, nil) +// - ctx canceled -> (zero, nil) same contract as before +// - MaxElapsedTime hit -> (zero, ErrConnectionLost) +// - permanent (fatal) -> (zero, underlying err) +// +// The op closure is responsible for: +// - returning errNotConnected when IsConnected() is false (Retry will sleep +// and call again — same effect as the old "if !c.IsConnected()" preamble) +// - returning backoff.Permanent(err) for unrecoverable gRPC codes +// - returning the raw error for retryable codes (Aborted/DataLoss/...) +func retryRPC[T any](ctx context.Context, c *client, opName string, op backoff.Operation[T]) (T, error) { + res, err := backoff.Retry(ctx, op, c.retryOpts(opName)...) + if err == nil { + return res, nil + } + + var zero T + + // Context canceled while inside Retry: callers historically swallowed this + // and returned a zero-value error, so preserve that contract. + if ctxErr := context.Cause(ctx); ctxErr != nil && errors.Is(err, ctxErr) { + log.Debug().Err(err).Msgf("grpc: %s(): context canceled", opName) + return zero, nil + } + + // MaxElapsedTime exhausted while we were still in errNotConnected — give up. + if errors.Is(err, errNotConnected) { + log.Error().Msg("grpc: connection lost, giving up") + return zero, ErrConnectionLost + } + + log.Error().Err(err).Msgf("grpc error: %s(): code: %v", opName, status.Code(err)) + return zero, err +} + +// classifyRPCErr inspects a gRPC error and returns either the same error (for +// retryable codes) or a backoff.Permanent wrapping it (for fatal codes). It is +// the single source of truth for which gRPC codes are worth retrying. +func classifyRPCErr(ctx context.Context, err error) error { + if err == nil { + return nil + } + switch status.Code(err) { + case codes.Canceled: + // If our own ctx is dead, surface that as the cause so Retry's + // context.Cause(ctx) check exits cleanly. Otherwise it's a server-side + // cancel that we treat as permanent. + if ctx.Err() != nil { + return backoff.Permanent(ctx.Err()) + } + return backoff.Permanent(err) + case codes.Aborted, + codes.DataLoss, + codes.DeadlineExceeded, + codes.Internal, + codes.Unavailable: + return err + default: + return backoff.Permanent(err) + } } // Version returns the server- & grpc-version. @@ -127,71 +206,27 @@ func (c *client) Version(ctx context.Context) (*rpc.Version, error) { // Next returns the next workflow in the queue. func (c *client) Next(ctx context.Context, filter rpc.Filter) (*rpc.Workflow, error) { - var res *proto.NextResponse - var err error - retry := c.newBackOff() - req := new(proto.NextRequest) - req.Filter = new(proto.Filter) - req.Filter.Labels = filter.Labels - for { + req := &proto.NextRequest{Filter: &proto.Filter{Labels: filter.Labels}} + + res, err := retryRPC(ctx, c, "next", func() (*proto.NextResponse, error) { if !c.IsConnected() { - if c.shouldGiveUp() { - log.Error().Msg("grpc: connection lost, giving up") - return nil, ErrConnectionLost - } - log.Warn().Msg("grpc: next() waiting for server connection...") - time.Sleep(retry.NextBackOff()) - continue - } - - res, err = c.client.Next(ctx, req) - if err == nil { - break - } - - switch status.Code(err) { - case codes.Canceled: - if ctx.Err() != nil { - // expected as context was canceled - log.Debug().Err(err).Msgf("grpc error: next(): context canceled") - return nil, nil - } - log.Error().Err(err).Msgf("grpc error: next(): code: %v", status.Code(err)) - return nil, err - case - codes.Aborted, - codes.DataLoss, - codes.DeadlineExceeded, - codes.Internal, - codes.Unavailable: - // non-fatal errors - // TODO: remove after adding continuous data exchange by something like #536 - if strings.Contains(err.Error(), "\"too_many_pings\"") { - // https://github.com/woodpecker-ci/woodpecker/issues/717#issuecomment-1049365104 - log.Trace().Err(err).Msg("grpc: to many keepalive pings without sending data") - } else { - log.Warn().Err(err).Msgf("grpc error: next(): code: %v", status.Code(err)) - } - default: - log.Error().Err(err).Msgf("grpc error: next(): code: %v", status.Code(err)) - return nil, err - } - - select { - case <-time.After(retry.NextBackOff()): - case <-ctx.Done(): - return nil, nil + return nil, errNotConnected } + r, err := c.client.Next(ctx, req) + return r, classifyRPCErr(ctx, err) + }) + if err != nil { + return nil, err } - - if res.GetWorkflow() == nil { + if res == nil || res.GetWorkflow() == nil { return nil, nil } - w := new(rpc.Workflow) - w.ID = res.GetWorkflow().GetId() - w.Timeout = res.GetWorkflow().GetTimeout() - w.Config = new(backend_types.Config) + w := &rpc.Workflow{ + ID: res.GetWorkflow().GetId(), + Timeout: res.GetWorkflow().GetTimeout(), + Config: new(backend_types.Config), + } if err := json.Unmarshal(res.GetWorkflow().GetPayload(), w.Config); err != nil { log.Error().Err(err).Msgf("could not unmarshal workflow config of '%s'", w.ID) } @@ -200,289 +235,106 @@ func (c *client) Next(ctx context.Context, filter rpc.Filter) (*rpc.Workflow, er // Wait blocks until the workflow with the given ID is marked as completed or canceled by the server. func (c *client) Wait(ctx context.Context, workflowID string) (canceled bool, err error) { - retry := c.newBackOff() - req := new(proto.WaitRequest) - req.Id = workflowID - for { + req := &proto.WaitRequest{Id: workflowID} + + resp, err := retryRPC(ctx, c, "wait", func() (*proto.WaitResponse, error) { if !c.IsConnected() { - if c.shouldGiveUp() { - log.Error().Msg("grpc: connection lost, giving up") - return false, ErrConnectionLost - } - log.Warn().Msg("grpc: wait() waiting for server connection...") - time.Sleep(retry.NextBackOff()) - continue - } - - resp, err := c.client.Wait(ctx, req) - if err == nil { - // wait block was released normally as expected by server - return resp.GetCanceled(), nil - } - - switch status.Code(err) { - case codes.Canceled: - if ctx.Err() != nil { - // expected as context was canceled - log.Debug().Err(err).Msgf("grpc error: wait(): context canceled") - return false, nil - } - log.Error().Err(err).Msgf("grpc error: wait(): code: %v", status.Code(err)) - return false, err - case - codes.Aborted, - codes.DataLoss, - codes.DeadlineExceeded, - codes.Internal, - codes.Unavailable: - // non-fatal errors - log.Warn().Err(err).Msgf("grpc error: wait(): code: %v", status.Code(err)) - default: - log.Error().Err(err).Msgf("grpc error: wait(): code: %v", status.Code(err)) - return false, err - } - - select { - case <-time.After(retry.NextBackOff()): - case <-ctx.Done(): - return false, ctx.Err() + return nil, errNotConnected } + r, err := c.client.Wait(ctx, req) + return r, classifyRPCErr(ctx, err) + }) + if err != nil { + return false, err } + if resp == nil { + return false, nil + } + return resp.GetCanceled(), nil } // Init signals the workflow is initialized. -func (c *client) Init(ctx context.Context, workflowID string, state rpc.WorkflowState) (err error) { - retry := c.newBackOff() - req := new(proto.InitRequest) - req.Id = workflowID - req.State = new(proto.WorkflowState) - req.State.Started = state.Started - req.State.Finished = state.Finished - req.State.Error = state.Error - req.State.Canceled = state.Canceled - for { - if !c.IsConnected() { - if c.shouldGiveUp() { - log.Error().Msg("grpc: connection lost, giving up") - return ErrConnectionLost - } - log.Warn().Msg("grpc: init() waiting for server connection...") - time.Sleep(retry.NextBackOff()) - continue - } - - _, err = c.client.Init(ctx, req) - if err == nil { - break - } - - log.Error().Err(err).Msgf("grpc error: init(): code: %v", status.Code(err)) - - switch status.Code(err) { - case codes.Canceled: - if ctx.Err() != nil { - // expected as context was canceled - log.Debug().Err(err).Msgf("grpc error: init(): context canceled") - return nil - } - log.Error().Err(err).Msgf("grpc error: init(): code: %v", status.Code(err)) - return err - case - codes.Aborted, - codes.DataLoss, - codes.DeadlineExceeded, - codes.Internal, - codes.Unavailable: - // non-fatal errors - log.Warn().Err(err).Msgf("grpc error: init(): code: %v", status.Code(err)) - default: - log.Error().Err(err).Msgf("grpc error: init(): code: %v", status.Code(err)) - return err - } - - select { - case <-time.After(retry.NextBackOff()): - case <-ctx.Done(): - return ctx.Err() - } +func (c *client) Init(ctx context.Context, workflowID string, state rpc.WorkflowState) error { + req := &proto.InitRequest{ + Id: workflowID, + State: &proto.WorkflowState{ + Started: state.Started, + Finished: state.Finished, + Error: state.Error, + Canceled: state.Canceled, + }, } - return nil + + _, err := retryRPC(ctx, c, "init", func() (*proto.Empty, error) { + if !c.IsConnected() { + return nil, errNotConnected + } + r, err := c.client.Init(ctx, req) + return r, classifyRPCErr(ctx, err) + }) + return err } // Done let agent signal to server the workflow has stopped. -func (c *client) Done(ctx context.Context, workflowID string, state rpc.WorkflowState) (err error) { - retry := c.newBackOff() - req := new(proto.DoneRequest) - req.Id = workflowID - req.State = new(proto.WorkflowState) - req.State.Started = state.Started - req.State.Finished = state.Finished - req.State.Error = state.Error - req.State.Canceled = state.Canceled - for { - if !c.IsConnected() { - if c.shouldGiveUp() { - log.Error().Msg("grpc: connection lost, giving up") - return ErrConnectionLost - } - log.Warn().Msg("grpc: done() waiting for server connection...") - time.Sleep(retry.NextBackOff()) - continue - } - - _, err = c.client.Done(ctx, req) - if err == nil { - break - } - - log.Error().Err(err).Msgf("grpc error: done(): code: %v", status.Code(err)) - - switch status.Code(err) { - case codes.Canceled: - if ctx.Err() != nil { - // expected as context was canceled - log.Debug().Err(err).Msgf("grpc error: done(): context canceled") - return nil - } - log.Error().Err(err).Msgf("grpc error: done(): code: %v", status.Code(err)) - return err - case - codes.Aborted, - codes.DataLoss, - codes.DeadlineExceeded, - codes.Internal, - codes.Unavailable: - // non-fatal errors - log.Warn().Err(err).Msgf("grpc error: done(): code: %v", status.Code(err)) - default: - log.Error().Err(err).Msgf("grpc error: done(): code: %v", status.Code(err)) - return err - } - - select { - case <-time.After(retry.NextBackOff()): - case <-ctx.Done(): - return ctx.Err() - } +func (c *client) Done(ctx context.Context, workflowID string, state rpc.WorkflowState) error { + req := &proto.DoneRequest{ + Id: workflowID, + State: &proto.WorkflowState{ + Started: state.Started, + Finished: state.Finished, + Error: state.Error, + Canceled: state.Canceled, + }, } - return nil + + _, err := retryRPC(ctx, c, "done", func() (*proto.Empty, error) { + if !c.IsConnected() { + return nil, errNotConnected + } + r, err := c.client.Done(ctx, req) + return r, classifyRPCErr(ctx, err) + }) + return err } // Extend extends the workflow deadline. -func (c *client) Extend(ctx context.Context, workflowID string) (err error) { - retry := c.newBackOff() - req := new(proto.ExtendRequest) - req.Id = workflowID - for { +func (c *client) Extend(ctx context.Context, workflowID string) error { + req := &proto.ExtendRequest{Id: workflowID} + + _, err := retryRPC(ctx, c, "extend", func() (*proto.Empty, error) { if !c.IsConnected() { - if c.shouldGiveUp() { - log.Error().Msg("grpc: connection lost, giving up") - return ErrConnectionLost - } - log.Warn().Msg("grpc: extend() waiting for server connection...") - time.Sleep(retry.NextBackOff()) - continue + return nil, errNotConnected } - - _, err = c.client.Extend(ctx, req) - if err == nil { - break - } - - log.Error().Err(err).Msgf("grpc error: extend(): code: %v", status.Code(err)) - - switch status.Code(err) { - case codes.Canceled: - if ctx.Err() != nil { - // expected as context was canceled - log.Debug().Err(err).Msgf("grpc error: extend(): context canceled") - return nil - } - log.Error().Err(err).Msgf("grpc error: extend(): code: %v", status.Code(err)) - return err - case - codes.Aborted, - codes.DataLoss, - codes.DeadlineExceeded, - codes.Internal, - codes.Unavailable: - // non-fatal errors - log.Warn().Err(err).Msgf("grpc error: extend(): code: %v", status.Code(err)) - default: - log.Error().Err(err).Msgf("grpc error: extend(): code: %v", status.Code(err)) - return err - } - - select { - case <-time.After(retry.NextBackOff()): - case <-ctx.Done(): - return ctx.Err() - } - } - return nil + r, err := c.client.Extend(ctx, req) + return r, classifyRPCErr(ctx, err) + }) + return err } // Update let agent updates the step state at the server. -func (c *client) Update(ctx context.Context, workflowID string, state rpc.StepState) (err error) { - retry := c.newBackOff() - req := new(proto.UpdateRequest) - req.Id = workflowID - req.State = new(proto.StepState) - req.State.StepUuid = state.StepUUID - req.State.Started = state.Started - req.State.Finished = state.Finished - req.State.Exited = state.Exited - req.State.ExitCode = int32(state.ExitCode) - req.State.Error = state.Error - req.State.Canceled = state.Canceled - req.State.Skipped = state.Skipped - for { - if !c.IsConnected() { - if c.shouldGiveUp() { - log.Error().Msg("grpc: connection lost, giving up") - return ErrConnectionLost - } - log.Warn().Msg("grpc: update() waiting for server connection...") - time.Sleep(retry.NextBackOff()) - continue - } - - _, err = c.client.Update(ctx, req) - if err == nil { - break - } - - log.Error().Err(err).Msgf("grpc error: update(): code: %v", status.Code(err)) - - switch status.Code(err) { - case codes.Canceled: - if ctx.Err() != nil { - // expected as context was canceled - log.Debug().Err(err).Msgf("grpc error: update(): context canceled") - return nil - } - log.Error().Err(err).Msgf("grpc error: update(): code: %v", status.Code(err)) - return err - case - codes.Aborted, - codes.DataLoss, - codes.DeadlineExceeded, - codes.Internal, - codes.Unavailable: - // non-fatal errors - log.Warn().Err(err).Msgf("grpc error: update(): code: %v", status.Code(err)) - default: - log.Error().Err(err).Msgf("grpc error: update(): code: %v", status.Code(err)) - return err - } - - select { - case <-time.After(retry.NextBackOff()): - case <-ctx.Done(): - return ctx.Err() - } +func (c *client) Update(ctx context.Context, workflowID string, state rpc.StepState) error { + req := &proto.UpdateRequest{ + Id: workflowID, + State: &proto.StepState{ + StepUuid: state.StepUUID, + Started: state.Started, + Finished: state.Finished, + Exited: state.Exited, + ExitCode: int32(state.ExitCode), + Error: state.Error, + Canceled: state.Canceled, + Skipped: state.Skipped, + }, } - return nil + + _, err := retryRPC(ctx, c, "update", func() (*proto.Empty, error) { + if !c.IsConnected() { + return nil, errNotConnected + } + r, err := c.client.Update(ctx, req) + return r, classifyRPCErr(ctx, err) + }) + return err } // EnqueueLog queues the log entry to be written in a batch later. @@ -545,53 +397,26 @@ func (c *client) processLogs(ctx context.Context) { func (c *client) sendLogs(ctx context.Context, entries []*proto.LogEntry) error { req := &proto.LogRequest{LogEntries: entries} - retry := c.newBackOff() - for { - _, err := c.client.Log(ctx, req) - if err == nil { - break - } - - switch status.Code(err) { - case codes.Canceled: - if ctx.Err() != nil { - // expected as context was canceled - log.Debug().Err(err).Msgf("grpc error: log(): context canceled") - return nil - } - log.Error().Err(err).Msgf("grpc error: log(): code: %v", status.Code(err)) - return err - case - codes.Aborted, - codes.DataLoss, - codes.DeadlineExceeded, - codes.Internal, - codes.Unavailable: - // non-fatal errors - log.Warn().Err(err).Msgf("grpc error: log(): code: %v", status.Code(err)) - default: - log.Error().Err(err).Msgf("grpc error: log(): code: %v", status.Code(err)) - return err - } - - select { - case <-time.After(retry.NextBackOff()): - case <-ctx.Done(): - return ctx.Err() - } - } - return nil + // sendLogs intentionally does not gate on IsConnected — the original code + // didn't either. backoff.Retry will keep trying through transient transport + // errors until MaxElapsedTime elapses. + _, err := retryRPC(ctx, c, "log", func() (*proto.Empty, error) { + r, err := c.client.Log(ctx, req) + return r, classifyRPCErr(ctx, err) + }) + return err } func (c *client) RegisterAgent(ctx context.Context, info rpc.AgentInfo) (int64, error) { - req := new(proto.RegisterAgentRequest) - req.Info = &proto.AgentInfo{ - Platform: info.Platform, - Backend: info.Backend, - Version: info.Version, - Capacity: int32(info.Capacity), - CustomLabels: info.CustomLabels, + req := &proto.RegisterAgentRequest{ + Info: &proto.AgentInfo{ + Platform: info.Platform, + Backend: info.Backend, + Version: info.Version, + Capacity: int32(info.Capacity), + CustomLabels: info.CustomLabels, + }, } res, err := c.client.RegisterAgent(ctx, req) @@ -603,53 +428,15 @@ func (c *client) UnregisterAgent(ctx context.Context) error { return err } -func (c *client) ReportHealth(ctx context.Context) (err error) { - retry := c.newBackOff() - req := new(proto.ReportHealthRequest) - req.Status = "I am alive!" +func (c *client) ReportHealth(ctx context.Context) error { + req := &proto.ReportHealthRequest{Status: "I am alive!"} - for { + _, err := retryRPC(ctx, c, "report_health", func() (*proto.Empty, error) { if !c.IsConnected() { - if c.shouldGiveUp() { - log.Error().Msg("grpc: connection lost, giving up") - return ErrConnectionLost - } - log.Warn().Msg("grpc: report_health() waiting for server connection...") - time.Sleep(retry.NextBackOff()) - continue + return nil, errNotConnected } - _, err = c.client.ReportHealth(ctx, req) - if err == nil { - return nil - } - switch status.Code(err) { - case codes.Canceled: - if ctx.Err() != nil { - // expected as context was canceled - log.Debug().Err(err).Msgf("grpc error: report_health(): context canceled") - return nil - } - log.Error().Err(err).Msgf("grpc error: report_health(): code: %v", status.Code(err)) - return err - case - codes.Aborted, - codes.DataLoss, - codes.DeadlineExceeded, - codes.Internal: - // non-fatal errors - log.Warn().Err(err).Msgf("grpc error: report_health(): code: %v", status.Code(err)) - case - // code = Unavailable desc = connection error: desc = \"transport: Error while dialing: dial tcp 1.2.3.4:443: i/o timeout\"" - codes.Unavailable: - default: - log.Error().Err(err).Msgf("grpc error: report_health(): code: %v", status.Code(err)) - return err - } - - select { - case <-time.After(retry.NextBackOff()): - case <-ctx.Done(): - return ctx.Err() - } - } + r, err := c.client.ReportHealth(ctx, req) + return r, classifyRPCErr(ctx, err) + }) + return err }