diff --git a/pkg/kubelet/logs/container_log_manager_test.go b/pkg/kubelet/logs/container_log_manager_test.go index 7d6e7055274..130723a068e 100644 --- a/pkg/kubelet/logs/container_log_manager_test.go +++ b/pkg/kubelet/logs/container_log_manager_test.go @@ -157,15 +157,23 @@ func TestRotateLogs(t *testing.T) { }, } f.SetFakeContainers(testContainers) - go c.processQueueItems(ctx, 1) + + // Push the items into the queue for before starting the worker to avoid issue with the queue being empty. require.NoError(t, c.rotateLogs(ctx)) - pollTimeoutCtx, cancel := context.WithTimeout(ctx, 10*time.Second) - defer cancel() - err = wait.PollUntilContextCancel(pollTimeoutCtx, 20*time.Millisecond, false, func(ctx context.Context) (done bool, err error) { - return c.queue.Len() == 0, nil - }) - require.NoError(t, err) + // Start a routine that can monitor the queue and shutdown the queue to trigger the retrun from the processQueueItems + // Keeping the monitor duration smaller in order to keep the unwanted delay in the test to a minimal. + go func() { + pollTimeoutCtx, cancel := context.WithTimeout(ctx, 10*time.Second) + defer cancel() + err = wait.PollUntilContextCancel(pollTimeoutCtx, 5*time.Millisecond, false, func(ctx context.Context) (done bool, err error) { + return c.queue.Len() == 0, nil + }) + require.NoError(t, err) + c.queue.ShutDown() + }() + // This is a blocking call. But the above routine takes care of ensuring that this is terminated once the queue is shutdown + c.processQueueItems(ctx, 1) timestamp := now.Format(timestampFormat) logs, err := os.ReadDir(dir) @@ -176,7 +184,6 @@ func TestRotateLogs(t *testing.T) { assert.Equal(t, testLogs[4]+compressSuffix, logs[2].Name()) assert.Equal(t, testLogs[2]+"."+timestamp, logs[3].Name()) assert.Equal(t, testLogs[3], logs[4].Name()) - c.queue.ShutDown() } func TestClean(t *testing.T) {