From 50ef81ad18f1c16f6cae35e7b98c424cd70d5a4c Mon Sep 17 00:00:00 2001 From: Richa Banker Date: Wed, 28 Jan 2026 19:01:51 -0800 Subject: [PATCH] Add identity tracking for queue items Kubernetes-commit: db27f4c1234a50e3157373c060ac8ba71c513500 --- tools/cache/identity.go | 217 +++++++++++++++++++++++++++++++++++ tools/cache/identity_test.go | 188 ++++++++++++++++++++++++++++++ 2 files changed, 405 insertions(+) create mode 100644 tools/cache/identity.go create mode 100644 tools/cache/identity_test.go diff --git a/tools/cache/identity.go b/tools/cache/identity.go new file mode 100644 index 000000000..eed278dc0 --- /dev/null +++ b/tools/cache/identity.go @@ -0,0 +1,217 @@ +/* +Copyright The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +// Package cache is a client-side caching mechanism. It is useful for +// reducing the number of server calls you'd otherwise need to make. +// Reflector watches a server and updates a Store. Two stores are provided; +// one that simply caches objects (for example, to allow a scheduler to +// list currently available nodes), and one that additionally acts as +// a FIFO queue (for example, to allow a scheduler to process incoming +// pods). +package cache + +import ( + "fmt" + "sync" + "sync/atomic" + + "k8s.io/apimachinery/pkg/runtime/schema" + "k8s.io/klog/v2" +) + +// informerNameRegistry tracks all registered InformerName instances to detect collisions. +// Names must be globally unique across a process. +var informerNameRegistry = struct { + sync.Mutex + names map[string]*InformerName +}{ + names: make(map[string]*InformerName), +} + +// InformerName represents a named informer identity used for metrics. +// It is created once at startup (e.g., in cmd/kube-controller-manager) and passed to +// the SharedInformerFactory. The name must be globally unique within a process. +// +// InformerName tracks which GVRs have been registered under +// this name. When an informer requests a name+GVR combination, the first one wins +// and gets metrics enabled. Subsequent requests for the same GVR silently get +// metrics disabled. +type InformerName struct { + name string + // lock protects gvrs map modifications + lock sync.Mutex + // reserved is flipped to false when Release() is called + reserved *atomic.Bool + // gvrs maps each registered GVR to its atomic bool for lock-free Reserved() checks + gvrs map[schema.GroupVersionResource]*atomic.Bool +} + +// NewInformerName creates a new InformerName with the given name. +// The name must be globally unique within the process. If a name collision +// is detected, an error is returned. +// +// The caller should call Release() when the informer name is no longer needed +// (eg. at shutdown) to allow the name to be reused. +func NewInformerName(name string) (*InformerName, error) { + if name == "" { + return nil, fmt.Errorf("informer name cannot be empty") + } + + informerNameRegistry.Lock() + defer informerNameRegistry.Unlock() + + if existing, ok := informerNameRegistry.names[name]; ok { + // Check if the existing one is still reserved + if existing.reserved.Load() { + return nil, fmt.Errorf("informer name %q is already registered", name) + } + // Previous one was released, we can reuse the name + delete(informerNameRegistry.names, name) + } + + reserved := &atomic.Bool{} + reserved.Store(true) + + n := &InformerName{ + name: name, + reserved: reserved, + gvrs: make(map[schema.GroupVersionResource]*atomic.Bool), + } + + informerNameRegistry.names[name] = n + return n, nil +} + +// WithResource registers a GVR under this InformerName and returns an +// InformerNameAndResource that can be passed to FIFO/SharedIndexInformer. +// +// If this is the first time this GVR is registered under this name, the +// returned InformerNameAndResource will have Reserved() return true. +// If the GVR was already registered, the returned InformerNameAndResource +// will have Reserved() return false to prevent duplicate metrics. +func (n *InformerName) WithResource(gvr schema.GroupVersionResource) InformerNameAndResource { + if n == nil { + return InformerNameAndResource{gvr: gvr} + } + + n.lock.Lock() + defer n.lock.Unlock() + + retval := InformerNameAndResource{name: n.name, gvr: gvr, reserved: &atomic.Bool{}} + if n.reserved.Load() { + if _, gvrExists := n.gvrs[gvr]; !gvrExists { + retval.reserved.Store(true) + n.gvrs[gvr] = retval.reserved + } else { + // WithResource is called by generated informer code and probably + // not worth converting to contextual logging, which would require + // changing all those generated APIs. + klog.TODO().Error(nil, "Duplicate informer registration - metrics will not be published", "informerName", n.name, "group", gvr.Group, "version", gvr.Version, "resource", gvr.Resource) + } + } + return retval +} + +// Release marks this InformerName as no longer in use. +// All InformerNameAndResource instances created from this InformerName +// will have their Reserved() return false after this call. +// The name can be reused by a subsequent NewInformerName call. +func (n *InformerName) Release() { + if n == nil { + return + } + + n.lock.Lock() + defer n.lock.Unlock() + + // Flip all GVR-specific flags so that any InformerNameAndResource + // instances that were returned from WithResource() will have + // Reserved() return false. These instances hold pointers to the + // same atomic bools, so we must flip them before clearing the map. + for _, reserved := range n.gvrs { + reserved.Store(false) + } + + // Clear the map + n.gvrs = make(map[schema.GroupVersionResource]*atomic.Bool) + + // Flip the main reserved flag + n.reserved.Store(false) + + // Remove from global registry + informerNameRegistry.Lock() + defer informerNameRegistry.Unlock() + delete(informerNameRegistry.names, n.name) +} + +// Name returns the name of this InformerName. +func (n *InformerName) Name() string { + if n == nil { + return "" + } + return n.name +} + +// InformerNameAndResource represents a specific informer identity with both +// a name and a GVR. This is passed to FIFO and SharedIndexInformer for metrics. +// +// The Reserved() method provides a lock-free check to determine +// if metrics should be published. This is called on every queue operation +// so it must be fast. +type InformerNameAndResource struct { + name string + gvr schema.GroupVersionResource + reserved *atomic.Bool +} + +// Reserved returns true if this informer identity is reserved for metrics. +// This is a lock-free atomic load, safe and fast for hot-path usage. +// +// Returns false if: +// - The InformerNameAndResource is zero-valued (no name was configured) +// - The parent InformerName was released +// - This was a duplicate GVR registration +func (n InformerNameAndResource) Reserved() bool { + if n.reserved == nil { + return false + } + return n.reserved.Load() +} + +// Name returns the informer name. +func (n InformerNameAndResource) Name() string { + return n.name +} + +// GroupVersionResource returns the GVR for this informer identity. +func (n InformerNameAndResource) GroupVersionResource() schema.GroupVersionResource { + return n.gvr +} + +// ResetInformerNamesForTesting clears the informer name registry. +// This is exported for testing purposes only. +func ResetInformerNamesForTesting() { + informerNameRegistry.Lock() + names := make([]*InformerName, 0, len(informerNameRegistry.names)) + for _, name := range informerNameRegistry.names { + names = append(names, name) + } + informerNameRegistry.Unlock() + + for _, name := range names { + name.Release() + } +} diff --git a/tools/cache/identity_test.go b/tools/cache/identity_test.go new file mode 100644 index 000000000..e4c59eec8 --- /dev/null +++ b/tools/cache/identity_test.go @@ -0,0 +1,188 @@ +/* +Copyright The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +// Package cache is a client-side caching mechanism. It is useful for +// reducing the number of server calls you'd otherwise need to make. +// Reflector watches a server and updates a Store. Two stores are provided; +// one that simply caches objects (for example, to allow a scheduler to +// list currently available nodes), and one that additionally acts as +// a FIFO queue (for example, to allow a scheduler to process incoming +// pods). +package cache + +import ( + "testing" + + "k8s.io/apimachinery/pkg/runtime/schema" +) + +var ( + podsGVR = schema.GroupVersionResource{Group: "", Version: "v1", Resource: "pods"} + configMapsGVR = schema.GroupVersionResource{Group: "", Version: "v1", Resource: "configmaps"} +) + +func TestNewInformerName(t *testing.T) { + tests := []struct { + name string + setup func() + inName string + wantErr bool + }{ + { + name: "first name is unique", + setup: func() {}, + inName: "my-informer", + wantErr: false, + }, + { + name: "duplicate name returns error", + setup: func() { + _, _ = NewInformerName("my-informer") + }, + inName: "my-informer", + wantErr: true, + }, + { + name: "empty name returns error", + setup: func() {}, + inName: "", + wantErr: true, + }, + { + name: "different name is unique", + setup: func() { + _, _ = NewInformerName("informer-1") + }, + inName: "informer-2", + wantErr: false, + }, + { + name: "released name can be reused", + setup: func() { + n, _ := NewInformerName("my-informer") + n.Release() + }, + inName: "my-informer", + wantErr: false, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + ResetInformerNamesForTesting() + tt.setup() + + _, err := NewInformerName(tt.inName) + if (err != nil) != tt.wantErr { + t.Errorf("NewInformerName() error = %v, wantErr %v", err, tt.wantErr) + } + }) + } +} + +func TestWithResource(t *testing.T) { + tests := []struct { + name string + setup func() *InformerName + gvr schema.GroupVersionResource + wantReserved bool + }{ + { + name: "first GVR is reserved", + setup: func() *InformerName { + n, _ := NewInformerName("my-informer") + return n + }, + gvr: podsGVR, + wantReserved: true, + }, + { + name: "same GVR second time is not reserved", + setup: func() *InformerName { + n, _ := NewInformerName("my-informer") + _ = n.WithResource(podsGVR) + return n + }, + gvr: podsGVR, + wantReserved: false, + }, + { + name: "different GVR is reserved", + setup: func() *InformerName { + n, _ := NewInformerName("my-informer") + _ = n.WithResource(podsGVR) + return n + }, + gvr: configMapsGVR, + wantReserved: true, + }, + { + name: "nil InformerName returns not reserved", + setup: func() *InformerName { + return nil + }, + gvr: podsGVR, + wantReserved: false, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + ResetInformerNamesForTesting() + n := tt.setup() + + id := n.WithResource(tt.gvr) + if got := id.Reserved(); got != tt.wantReserved { + t.Errorf("Reserved() = %v, want %v", got, tt.wantReserved) + } + }) + } +} + +func TestRelease(t *testing.T) { + ResetInformerNamesForTesting() + + n, err := NewInformerName("my-informer") + if err != nil { + t.Fatalf("NewInformerName() error = %v", err) + } + + // Get a reserved identifier + id := n.WithResource(podsGVR) + if !id.Reserved() { + t.Error("Expected Reserved() = true before Release()") + } + + // Release the name + n.Release() + + // The identifier should no longer be reserved + if id.Reserved() { + t.Error("Expected Reserved() = false after Release()") + } + + // Should be able to reuse the name + n2, err := NewInformerName("my-informer") + if err != nil { + t.Errorf("NewInformerName() after Release() error = %v", err) + } + + // New identifier from new name should be reserved + id2 := n2.WithResource(podsGVR) + if !id2.Reserved() { + t.Error("Expected Reserved() = true for new InformerName after Release()") + } +}