mirror of
https://github.com/kubernetes/client-go.git
synced 2025-06-29 08:29:23 +00:00
Merge pull request #129345 from pohly/log-client-go-workqueue
client-go workqueue: add optional logger Kubernetes-commit: 9fd0e20bc2526483fdb1328305fc12ac638085dc
This commit is contained in:
commit
5f676853f1
4
go.mod
4
go.mod
@ -25,8 +25,8 @@ require (
|
|||||||
golang.org/x/time v0.9.0
|
golang.org/x/time v0.9.0
|
||||||
google.golang.org/protobuf v1.36.5
|
google.golang.org/protobuf v1.36.5
|
||||||
gopkg.in/evanphx/json-patch.v4 v4.12.0
|
gopkg.in/evanphx/json-patch.v4 v4.12.0
|
||||||
k8s.io/api v0.0.0-20250313213104-173e173fb2b8
|
k8s.io/api v0.0.0-20250313213105-aae61a387040
|
||||||
k8s.io/apimachinery v0.0.0-20250313012745-a04ff375cef8
|
k8s.io/apimachinery v0.0.0-20250314052748-eaf4038701d0
|
||||||
k8s.io/klog/v2 v2.130.1
|
k8s.io/klog/v2 v2.130.1
|
||||||
k8s.io/kube-openapi v0.0.0-20250304201544-e5f78fe3ede9
|
k8s.io/kube-openapi v0.0.0-20250304201544-e5f78fe3ede9
|
||||||
k8s.io/utils v0.0.0-20241104100929-3ea5e8cea738
|
k8s.io/utils v0.0.0-20241104100929-3ea5e8cea738
|
||||||
|
8
go.sum
8
go.sum
@ -146,10 +146,10 @@ gopkg.in/inf.v0 v0.9.1/go.mod h1:cWUDdTG/fYaXco+Dcufb5Vnc6Gp2YChqWtbxRZE0mXw=
|
|||||||
gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
|
gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
|
||||||
gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA=
|
gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA=
|
||||||
gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
|
gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
|
||||||
k8s.io/api v0.0.0-20250313213104-173e173fb2b8 h1:Z7fpBA1zEvgJJjIhketIRekQDQzUp67A3ou5MQZBBFk=
|
k8s.io/api v0.0.0-20250313213105-aae61a387040 h1:t4Fsq5t8BkN81dKsvJL0PobJ7UcEONapi8OEL7GKxdo=
|
||||||
k8s.io/api v0.0.0-20250313213104-173e173fb2b8/go.mod h1:nyT7xKOld0k8IraPSyWNF96YIMppoRBYxtAUIehRQW0=
|
k8s.io/api v0.0.0-20250313213105-aae61a387040/go.mod h1:nyT7xKOld0k8IraPSyWNF96YIMppoRBYxtAUIehRQW0=
|
||||||
k8s.io/apimachinery v0.0.0-20250313012745-a04ff375cef8 h1:aZ3lyC/cbDqhg3/RCSobSABNBeUzBWh6LrHeSnIVzVY=
|
k8s.io/apimachinery v0.0.0-20250314052748-eaf4038701d0 h1:JT6F6sHvZMnT4KRpUaBsuOg5b7AQzK7qwv1stub2/Q4=
|
||||||
k8s.io/apimachinery v0.0.0-20250313012745-a04ff375cef8/go.mod h1:S2OIkExGqJOXYSYcAJwQ9zWcc6BkBUdTJUu4M7z0cvo=
|
k8s.io/apimachinery v0.0.0-20250314052748-eaf4038701d0/go.mod h1:S2OIkExGqJOXYSYcAJwQ9zWcc6BkBUdTJUu4M7z0cvo=
|
||||||
k8s.io/klog/v2 v2.130.1 h1:n9Xl7H1Xvksem4KFG4PYbdQCQxqc/tTUyrgXaOhHSzk=
|
k8s.io/klog/v2 v2.130.1 h1:n9Xl7H1Xvksem4KFG4PYbdQCQxqc/tTUyrgXaOhHSzk=
|
||||||
k8s.io/klog/v2 v2.130.1/go.mod h1:3Jpz1GvMt720eyJH1ckRHK1EDfpxISzJ7I9OYgaDtPE=
|
k8s.io/klog/v2 v2.130.1/go.mod h1:3Jpz1GvMt720eyJH1ckRHK1EDfpxISzJ7I9OYgaDtPE=
|
||||||
k8s.io/kube-openapi v0.0.0-20250304201544-e5f78fe3ede9 h1:t0huyHnz6HsokckRxAF1bY0cqPFwzINKCL7yltEjZQc=
|
k8s.io/kube-openapi v0.0.0-20250304201544-e5f78fe3ede9 h1:t0huyHnz6HsokckRxAF1bY0cqPFwzINKCL7yltEjZQc=
|
||||||
|
@ -22,6 +22,7 @@ import (
|
|||||||
"time"
|
"time"
|
||||||
|
|
||||||
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
|
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
|
||||||
|
"k8s.io/klog/v2"
|
||||||
"k8s.io/utils/clock"
|
"k8s.io/utils/clock"
|
||||||
)
|
)
|
||||||
|
|
||||||
@ -46,6 +47,10 @@ type DelayingQueueConfig = TypedDelayingQueueConfig[any]
|
|||||||
|
|
||||||
// TypedDelayingQueueConfig specifies optional configurations to customize a DelayingInterface.
|
// TypedDelayingQueueConfig specifies optional configurations to customize a DelayingInterface.
|
||||||
type TypedDelayingQueueConfig[T comparable] struct {
|
type TypedDelayingQueueConfig[T comparable] struct {
|
||||||
|
// An optional logger. The name of the queue does *not* get added to it, this should
|
||||||
|
// be done by the caller if desired.
|
||||||
|
Logger *klog.Logger
|
||||||
|
|
||||||
// Name for the queue. If unnamed, the metrics will not be registered.
|
// Name for the queue. If unnamed, the metrics will not be registered.
|
||||||
Name string
|
Name string
|
||||||
|
|
||||||
@ -94,6 +99,10 @@ func TypedNewDelayingQueue[T comparable]() TypedDelayingInterface[T] {
|
|||||||
// NewTypedDelayingQueueWithConfig constructs a new workqueue with options to
|
// NewTypedDelayingQueueWithConfig constructs a new workqueue with options to
|
||||||
// customize different properties.
|
// customize different properties.
|
||||||
func NewTypedDelayingQueueWithConfig[T comparable](config TypedDelayingQueueConfig[T]) TypedDelayingInterface[T] {
|
func NewTypedDelayingQueueWithConfig[T comparable](config TypedDelayingQueueConfig[T]) TypedDelayingInterface[T] {
|
||||||
|
logger := klog.Background()
|
||||||
|
if config.Logger != nil {
|
||||||
|
logger = *config.Logger
|
||||||
|
}
|
||||||
if config.Clock == nil {
|
if config.Clock == nil {
|
||||||
config.Clock = clock.RealClock{}
|
config.Clock = clock.RealClock{}
|
||||||
}
|
}
|
||||||
@ -106,7 +115,7 @@ func NewTypedDelayingQueueWithConfig[T comparable](config TypedDelayingQueueConf
|
|||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
return newDelayingQueue(config.Clock, config.Queue, config.Name, config.MetricsProvider)
|
return newDelayingQueue(logger, config.Clock, config.Queue, config.Name, config.MetricsProvider)
|
||||||
}
|
}
|
||||||
|
|
||||||
// NewDelayingQueueWithCustomQueue constructs a new workqueue with ability to
|
// NewDelayingQueueWithCustomQueue constructs a new workqueue with ability to
|
||||||
@ -135,7 +144,7 @@ func NewDelayingQueueWithCustomClock(clock clock.WithTicker, name string) Delayi
|
|||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
func newDelayingQueue[T comparable](clock clock.WithTicker, q TypedInterface[T], name string, provider MetricsProvider) *delayingType[T] {
|
func newDelayingQueue[T comparable](logger klog.Logger, clock clock.WithTicker, q TypedInterface[T], name string, provider MetricsProvider) *delayingType[T] {
|
||||||
ret := &delayingType[T]{
|
ret := &delayingType[T]{
|
||||||
TypedInterface: q,
|
TypedInterface: q,
|
||||||
clock: clock,
|
clock: clock,
|
||||||
@ -145,7 +154,7 @@ func newDelayingQueue[T comparable](clock clock.WithTicker, q TypedInterface[T],
|
|||||||
metrics: newRetryMetrics(name, provider),
|
metrics: newRetryMetrics(name, provider),
|
||||||
}
|
}
|
||||||
|
|
||||||
go ret.waitingLoop()
|
go ret.waitingLoop(logger)
|
||||||
return ret
|
return ret
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -264,8 +273,8 @@ func (q *delayingType[T]) AddAfter(item T, duration time.Duration) {
|
|||||||
const maxWait = 10 * time.Second
|
const maxWait = 10 * time.Second
|
||||||
|
|
||||||
// waitingLoop runs until the workqueue is shutdown and keeps a check on the list of items to be added.
|
// waitingLoop runs until the workqueue is shutdown and keeps a check on the list of items to be added.
|
||||||
func (q *delayingType[T]) waitingLoop() {
|
func (q *delayingType[T]) waitingLoop(logger klog.Logger) {
|
||||||
defer utilruntime.HandleCrash()
|
defer utilruntime.HandleCrashWithLogger(logger)
|
||||||
|
|
||||||
// Make a placeholder channel to use when there are no items in our list
|
// Make a placeholder channel to use when there are no items in our list
|
||||||
never := make(<-chan time.Time)
|
never := make(<-chan time.Time)
|
||||||
|
@ -74,7 +74,7 @@ func ParallelizeUntil(ctx context.Context, workers, pieces int, doWorkPiece DoWo
|
|||||||
wg.Add(workers)
|
wg.Add(workers)
|
||||||
for i := 0; i < workers; i++ {
|
for i := 0; i < workers; i++ {
|
||||||
go func() {
|
go func() {
|
||||||
defer utilruntime.HandleCrash()
|
defer utilruntime.HandleCrashWithContext(ctx)
|
||||||
defer wg.Done()
|
defer wg.Done()
|
||||||
for chunk := range toProcess {
|
for chunk := range toProcess {
|
||||||
start := chunk * chunkSize
|
start := chunk * chunkSize
|
||||||
|
Loading…
Reference in New Issue
Block a user