e2e test wait for grpc server teardown and stop agents (#6479)

This commit is contained in:
6543
2026-04-23 09:57:43 +02:00
committed by GitHub
parent 52ed3f13ed
commit 46b73078e9
9 changed files with 33 additions and 32 deletions

View File

@@ -70,11 +70,6 @@ func NewGrpcClient(ctx context.Context, conn *grpc.ClientConn) rpc.Peer {
return client
}
func (c *client) Close() error {
close(c.logs)
return c.conn.Close()
}
func (c *client) IsConnected() bool {
state := c.conn.GetState()
connected := state == connectivity.Ready || state == connectivity.Idle
@@ -506,9 +501,10 @@ func (c *client) processLogs(ctx context.Context) {
bytes = 0
}
// ctx.Done() is covered by the log channel being closed
for {
select {
case <-ctx.Done():
return
case entry, ok := <-c.logs:
if !ok {
log.Info().Msg("log drain: channel closed")

View File

@@ -50,12 +50,12 @@ func TestAgentLabelRouting(t *testing.T) {
})
// Plain agent: wildcard repo label only — cannot satisfy gpu=true.
plainAgent := setup.StartAgent(t.Context(), t, env.GRPCAddr,
plainAgent := setup.StartAgent(t, env.GRPCAddr,
setup.WithHostname("plain-agent"),
)
// GPU agent: carries gpu=true — the only agent that can accept the task.
gpuAgent := setup.StartAgent(t.Context(), t, env.GRPCAddr,
gpuAgent := setup.StartAgent(t, env.GRPCAddr,
setup.WithHostname("gpu-agent"),
setup.WithCustomLabels(map[string]string{"gpu": "true"}),
)
@@ -108,12 +108,12 @@ Func TestOrgAgentPreferredOverGlobal(t *testing.T) {
})
// Global agent: matches org-id=* (score 1).
globalAgent := setup.StartAgent(t.Context(), t, env.GRPCAddr,
globalAgent := setup.StartAgent(t, env.GRPCAddr,
setup.WithHostname("global-agent"),
)
// Org agent: will be patched with the repo's OrgID (score 10).
orgAgent := setup.StartAgent(t.Context(), t, env.GRPCAddr,
orgAgent := setup.StartAgent(t, env.GRPCAddr,
setup.WithHostname("org-agent"),
setup.WithOrgID(env.Fixtures.Repo.OrgID),
)

View File

@@ -55,7 +55,7 @@ func TestCancelRunningPipeline(t *testing.T) {
env := setup.StartServer(t.Context(), t, []*forge_types.FileMeta{
{Name: ".woodpecker.yaml", Data: cancelPipelineYAML},
})
agent := setup.StartAgent(t.Context(), t, env.GRPCAddr)
agent := setup.StartAgent(t, env.GRPCAddr)
setup.WaitForAgentRegistered(t, env.Store, agent)
created, err := pipeline.Create(t.Context(), env.Store, env.Fixtures.Repo, &model.Pipeline{

View File

@@ -71,7 +71,7 @@ func TestInfraSmoke(t *testing.T) {
env := setup.StartServer(t.Context(), t, []*forge_types.FileMeta{
{Name: ".woodpecker.yaml", Data: simpleSuccessYAML},
})
agent := setup.StartAgent(t.Context(), t, env.GRPCAddr)
agent := setup.StartAgent(t, env.GRPCAddr)
setup.WaitForAgentRegistered(t, env.Store, agent)
draftPipeline := &model.Pipeline{

View File

@@ -73,7 +73,7 @@ func TestMatrixPipeline(t *testing.T) {
env := setup.StartServer(t.Context(), t, []*forge_types.FileMeta{
{Name: ".woodpecker.yaml", Data: matrixPipelineYAML},
})
agent := setup.StartAgent(t.Context(), t, env.GRPCAddr)
agent := setup.StartAgent(t, env.GRPCAddr)
setup.WaitForAgentRegistered(t, env.Store, agent)
created, err := pipeline.Create(t.Context(), env.Store, env.Fixtures.Repo, &model.Pipeline{
@@ -139,7 +139,7 @@ func TestMatrixIncludePipeline(t *testing.T) {
env := setup.StartServer(t.Context(), t, []*forge_types.FileMeta{
{Name: ".woodpecker.yaml", Data: matrixIncludePipelineYAML},
})
agent := setup.StartAgent(t.Context(), t, env.GRPCAddr)
agent := setup.StartAgent(t, env.GRPCAddr)
setup.WaitForAgentRegistered(t, env.Store, agent)
created, err := pipeline.Create(t.Context(), env.Store, env.Fixtures.Repo, &model.Pipeline{
@@ -211,7 +211,7 @@ steps:
env := setup.StartServer(t.Context(), t, []*forge_types.FileMeta{
{Name: ".woodpecker.yaml", Data: yaml},
})
agent := setup.StartAgent(t.Context(), t, env.GRPCAddr)
agent := setup.StartAgent(t, env.GRPCAddr)
setup.WaitForAgentRegistered(t, env.Store, agent)
created, err := pipeline.Create(t.Context(), env.Store, env.Fixtures.Repo, &model.Pipeline{
@@ -259,7 +259,7 @@ steps:
env := setup.StartServer(t.Context(), t, []*forge_types.FileMeta{
{Name: ".woodpecker.yaml", Data: yaml},
})
agent := setup.StartAgent(t.Context(), t, env.GRPCAddr)
agent := setup.StartAgent(t, env.GRPCAddr)
setup.WaitForAgentRegistered(t, env.Store, agent)
created, err := pipeline.Create(t.Context(), env.Store, env.Fixtures.Repo, &model.Pipeline{

View File

@@ -35,7 +35,7 @@ func TestRestartPipeline(t *testing.T) {
env := setup.StartServer(t.Context(), t, []*forge_types.FileMeta{
{Name: ".woodpecker.yaml", Data: simpleSuccessYAML},
})
agent := setup.StartAgent(t.Context(), t, env.GRPCAddr)
agent := setup.StartAgent(t, env.GRPCAddr)
setup.WaitForAgentRegistered(t, env.Store, agent)
// First run.

View File

@@ -47,7 +47,7 @@ func runScenario(t *testing.T, sc Scenario) {
t.Helper()
env := setup.StartServer(t.Context(), t, sc.Files)
agent := setup.StartAgent(t.Context(), t, env.GRPCAddr)
agent := setup.StartAgent(t, env.GRPCAddr)
setup.WaitForAgentRegistered(t, env.Store, agent)
created, err := pipeline.Create(t.Context(), env.Store, env.Fixtures.Repo, &model.Pipeline{

View File

@@ -108,7 +108,7 @@ func WithOrgID(id int64) AgentOption {
// server at grpcAddr and returns an *AgentEnv whose AgentID is populated once
// the agent has registered. Pass AgentOption values to configure labels, hostname,
// or org-scoping; multiple agents can be started in the same test.
func StartAgent(ctx context.Context, t *testing.T, grpcAddr string, opts ...AgentOption) *AgentEnv { //nolint:contextcheck
func StartAgent(t *testing.T, grpcAddr string, opts ...AgentOption) *AgentEnv {
t.Helper()
cfg := &agentConfig{
@@ -128,8 +128,8 @@ func StartAgent(ctx context.Context, t *testing.T, grpcAddr string, opts ...Agen
Timeout: shortTimeout,
})
authCtx, authCancel := context.WithCancelCause(context.Background())
t.Cleanup(func() { authCancel(nil) })
agentCtx, agentCancel := context.WithCancelCause(t.Context())
t.Cleanup(func() { agentCancel(nil) })
authConn, err := grpc.NewClient(grpcAddr, transport, keepaliveOpts)
if err != nil {
@@ -138,7 +138,7 @@ func StartAgent(ctx context.Context, t *testing.T, grpcAddr string, opts ...Agen
t.Cleanup(func() { authConn.Close() })
authClient := agent_rpc.NewAuthGrpcClient(authConn, TestAgentToken, -1)
authInterceptor, err := agent_rpc.NewAuthInterceptor(authCtx, authClient, agentAuthRefreshEvery) //nolint:contextcheck
authInterceptor, err := agent_rpc.NewAuthInterceptor(agentCtx, authClient, agentAuthRefreshEvery)
if err != nil {
t.Fatalf("StartAgent(%s): authenticate with server: %v", cfg.hostname, err)
}
@@ -155,20 +155,20 @@ func StartAgent(ctx context.Context, t *testing.T, grpcAddr string, opts ...Agen
}
t.Cleanup(func() { conn.Close() })
client := agent_rpc.NewGrpcClient(ctx, conn)
client := agent_rpc.NewGrpcClient(agentCtx, conn)
grpcCtx := metadata.NewOutgoingContext(authCtx, metadata.Pairs("hostname", cfg.hostname))
grpcCtx := metadata.NewOutgoingContext(agentCtx, metadata.Pairs("hostname", cfg.hostname))
backend := dummy.New()
if !backend.IsAvailable(ctx) {
if !backend.IsAvailable(agentCtx) {
t.Fatalf("StartAgent(%s): dummy backend is not available", cfg.hostname)
}
engInfo, err := backend.Load(ctx)
engInfo, err := backend.Load(agentCtx)
if err != nil {
t.Fatalf("StartAgent(%s): load dummy backend: %v", cfg.hostname, err)
}
env.AgentID, err = client.RegisterAgent(grpcCtx, rpc.AgentInfo{ //nolint:contextcheck
env.AgentID, err = client.RegisterAgent(grpcCtx, rpc.AgentInfo{
Version: version.String(),
Backend: backend.Name(),
Platform: engInfo.Platform,
@@ -218,16 +218,16 @@ func StartAgent(ctx context.Context, t *testing.T, grpcAddr string, opts ...Agen
runner := agent.NewRunner(client, filter, cfg.hostname, counter, backend)
log.Debug().Int("slot", slot).Str("hostname", cfg.hostname).Msg("test agent: runner started")
for {
if ctx.Err() != nil {
if agentCtx.Err() != nil {
return
}
if err := runner.Run(ctx); err != nil {
if ctx.Err() != nil {
if err := runner.Run(agentCtx); err != nil {
if agentCtx.Err() != nil {
return
}
log.Error().Err(err).Int("slot", slot).Str("hostname", cfg.hostname).Msg("test agent: runner error, retrying")
select {
case <-ctx.Done():
case <-agentCtx.Done():
return
case <-time.After(500 * time.Millisecond):
}

View File

@@ -193,10 +193,12 @@ func startGRPCServer(ctx context.Context, t *testing.T, s store.Store) string {
s,
))
stopped := make(chan struct{})
grpcCtx, grpcCancel := context.WithCancelCause(ctx)
go func() {
<-grpcCtx.Done()
grpcServer.GracefulStop()
close(stopped)
}()
go func() {
if err := grpcServer.Serve(lis); err != nil {
@@ -204,6 +206,9 @@ func startGRPCServer(ctx context.Context, t *testing.T, s store.Store) string {
}
}()
t.Cleanup(func() { grpcCancel(nil) })
t.Cleanup(func() {
grpcCancel(nil)
<-stopped
})
return addr
}