Add podUID in AddReference and DeleteReference

Signed-off-by: Ruquan Zhao <ruquan.zhao@arm.com>
This commit is contained in:
RuquanZhao 2023-05-06 13:58:14 +08:00
parent c9ff286668
commit 936265e870
6 changed files with 38 additions and 34 deletions

View File

@ -22,7 +22,7 @@ import (
"sync"
"time"
"k8s.io/api/core/v1"
v1 "k8s.io/api/core/v1"
"k8s.io/apiserver/pkg/storage"
"k8s.io/kubernetes/pkg/kubelet/util"
@ -92,7 +92,7 @@ func isObjectOlder(newObject, oldObject runtime.Object) bool {
return newVersion < oldVersion
}
func (s *objectStore) AddReference(namespace, name string) {
func (s *objectStore) AddReference(namespace, name string, podUID types.UID) {
key := objectKey{namespace: namespace, name: name}
// AddReference is called from RegisterPod, thus it needs to be efficient.
@ -114,7 +114,7 @@ func (s *objectStore) AddReference(namespace, name string) {
item.data = nil
}
func (s *objectStore) DeleteReference(namespace, name string) {
func (s *objectStore) DeleteReference(namespace, name string, podUID types.UID) {
key := objectKey{namespace: namespace, name: name}
s.lock.Lock()
@ -225,7 +225,7 @@ func (c *cacheBasedManager) RegisterPod(pod *v1.Pod) {
c.lock.Lock()
defer c.lock.Unlock()
for name := range names {
c.objectStore.AddReference(pod.Namespace, name)
c.objectStore.AddReference(pod.Namespace, name, pod.UID)
}
var prev *v1.Pod
key := objectKey{namespace: pod.Namespace, name: pod.Name, uid: pod.UID}
@ -238,7 +238,7 @@ func (c *cacheBasedManager) RegisterPod(pod *v1.Pod) {
// names and prev need to have their ref counts decremented. Any that
// are only in prev need to be completely removed. This unconditional
// call takes care of both cases.
c.objectStore.DeleteReference(prev.Namespace, name)
c.objectStore.DeleteReference(prev.Namespace, name, prev.UID)
}
}
}
@ -252,7 +252,7 @@ func (c *cacheBasedManager) UnregisterPod(pod *v1.Pod) {
delete(c.registeredPods, key)
if prev != nil {
for name := range c.getReferencedObjects(prev) {
c.objectStore.DeleteReference(prev.Namespace, name)
c.objectStore.DeleteReference(prev.Namespace, name, prev.UID)
}
}
}

View File

@ -89,13 +89,13 @@ func newCacheBasedSecretManager(store Store) Manager {
func TestSecretStore(t *testing.T) {
fakeClient := &fake.Clientset{}
store := newSecretStore(fakeClient, clock.RealClock{}, noObjectTTL, 0)
store.AddReference("ns1", "name1")
store.AddReference("ns2", "name2")
store.AddReference("ns1", "name1")
store.AddReference("ns1", "name1")
store.DeleteReference("ns1", "name1")
store.DeleteReference("ns2", "name2")
store.AddReference("ns3", "name3")
store.AddReference("ns1", "name1", "pod1")
store.AddReference("ns2", "name2", "pod2")
store.AddReference("ns1", "name1", "pod3")
store.AddReference("ns1", "name1", "pod4")
store.DeleteReference("ns1", "name1", "pod1")
store.DeleteReference("ns2", "name2", "pod2")
store.AddReference("ns3", "name3", "pod5")
// Adds don't issue Get requests.
actions := fakeClient.Actions()
@ -123,7 +123,7 @@ func TestSecretStore(t *testing.T) {
func TestSecretStoreDeletingSecret(t *testing.T) {
fakeClient := &fake.Clientset{}
store := newSecretStore(fakeClient, clock.RealClock{}, noObjectTTL, 0)
store.AddReference("ns", "name")
store.AddReference("ns", "name", "pod")
result := &v1.Secret{ObjectMeta: metav1.ObjectMeta{Namespace: "ns", Name: "name", ResourceVersion: "10"}}
fakeClient.AddReactor("get", "secrets", func(action core.Action) (bool, runtime.Object, error) {
@ -155,7 +155,7 @@ func TestSecretStoreGetAlwaysRefresh(t *testing.T) {
store := newSecretStore(fakeClient, fakeClock, noObjectTTL, 0)
for i := 0; i < 10; i++ {
store.AddReference(fmt.Sprintf("ns-%d", i), fmt.Sprintf("name-%d", i))
store.AddReference(fmt.Sprintf("ns-%d", i), fmt.Sprintf("name-%d", i), types.UID(fmt.Sprintf("pod-%d", i)))
}
fakeClient.ClearActions()
@ -182,7 +182,7 @@ func TestSecretStoreGetNeverRefresh(t *testing.T) {
store := newSecretStore(fakeClient, fakeClock, noObjectTTL, time.Minute)
for i := 0; i < 10; i++ {
store.AddReference(fmt.Sprintf("ns-%d", i), fmt.Sprintf("name-%d", i))
store.AddReference(fmt.Sprintf("ns-%d", i), fmt.Sprintf("name-%d", i), types.UID(fmt.Sprintf("pod-%d", i)))
}
fakeClient.ClearActions()
@ -211,7 +211,7 @@ func TestCustomTTL(t *testing.T) {
fakeClock := testingclock.NewFakeClock(time.Time{})
store := newSecretStore(fakeClient, fakeClock, customTTL, time.Minute)
store.AddReference("ns", "name")
store.AddReference("ns", "name", "pod")
store.Get("ns", "name")
fakeClient.ClearActions()

View File

@ -17,8 +17,9 @@ limitations under the License.
package manager
import (
"k8s.io/api/core/v1"
v1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/types"
)
// Manager is the interface for registering and unregistering
@ -49,12 +50,12 @@ type Store interface {
// AddReference adds a reference to the object to the store.
// Note that multiple additions to the store has to be allowed
// in the implementations and effectively treated as refcounted.
AddReference(namespace, name string)
AddReference(namespace, name string, podUID types.UID)
// DeleteReference deletes reference to the object from the store.
// Note that object should be deleted only when there was a
// corresponding Delete call for each of Add calls (effectively
// when refcount was reduced to zero).
DeleteReference(namespace, name string)
DeleteReference(namespace, name string, podUID types.UID)
// Get an object from a store.
Get(namespace, name string) (runtime.Object, error)
}

View File

@ -21,7 +21,7 @@ import (
"sync"
"time"
"k8s.io/api/core/v1"
v1 "k8s.io/api/core/v1"
"k8s.io/client-go/tools/cache"
"k8s.io/klog/v2"
@ -31,6 +31,7 @@ import (
"k8s.io/apimachinery/pkg/fields"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/sets"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/apimachinery/pkg/watch"
@ -245,7 +246,7 @@ func (c *objectCache) newReflectorLocked(namespace, name string) *objectCacheIte
return item
}
func (c *objectCache) AddReference(namespace, name string) {
func (c *objectCache) AddReference(namespace, name string, podUID types.UID) {
key := objectKey{namespace: namespace, name: name}
// AddReference is called from RegisterPod thus it needs to be efficient.
@ -263,7 +264,7 @@ func (c *objectCache) AddReference(namespace, name string) {
item.refCount++
}
func (c *objectCache) DeleteReference(namespace, name string) {
func (c *objectCache) DeleteReference(namespace, name string, podUID types.UID) {
key := objectKey{namespace: namespace, name: name}
c.lock.Lock()

View File

@ -23,11 +23,12 @@ import (
"testing"
"time"
"k8s.io/api/core/v1"
v1 "k8s.io/api/core/v1"
apiequality "k8s.io/apimachinery/pkg/api/equality"
apierrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/apimachinery/pkg/watch"
@ -93,7 +94,7 @@ func TestSecretCache(t *testing.T) {
fakeClock := testingclock.NewFakeClock(time.Now())
store := newSecretCache(fakeClient, fakeClock, time.Minute)
store.AddReference("ns", "name")
store.AddReference("ns", "name", "pod")
_, err := store.Get("ns", "name")
if !apierrors.IsNotFound(err) {
t.Errorf("Expected NotFound error, got: %v", err)
@ -138,7 +139,7 @@ func TestSecretCache(t *testing.T) {
t.Errorf("unexpected error: %v", err)
}
store.DeleteReference("ns", "name")
store.DeleteReference("ns", "name", "pod")
_, err = store.Get("ns", "name")
if err == nil || !strings.Contains(err.Error(), "not registered") {
t.Errorf("unexpected error: %v", err)
@ -163,7 +164,7 @@ func TestSecretCacheMultipleRegistrations(t *testing.T) {
fakeClock := testingclock.NewFakeClock(time.Now())
store := newSecretCache(fakeClient, fakeClock, time.Minute)
store.AddReference("ns", "name")
store.AddReference("ns", "name", "pod")
// This should trigger List and Watch actions eventually.
actionsFn := func() (bool, error) {
actions := fakeClient.Actions()
@ -184,14 +185,14 @@ func TestSecretCacheMultipleRegistrations(t *testing.T) {
// Next registrations shouldn't trigger any new actions.
for i := 0; i < 20; i++ {
store.AddReference("ns", "name")
store.DeleteReference("ns", "name")
store.AddReference("ns", "name", types.UID(fmt.Sprintf("pod-%d", i)))
store.DeleteReference("ns", "name", types.UID(fmt.Sprintf("pod-%d", i)))
}
actions := fakeClient.Actions()
assert.Equal(t, 2, len(actions), "unexpected actions: %#v", actions)
// Final delete also doesn't trigger any action.
store.DeleteReference("ns", "name")
store.DeleteReference("ns", "name", "pod")
_, err := store.Get("ns", "name")
if err == nil || !strings.Contains(err.Error(), "not registered") {
t.Errorf("unexpected error: %v", err)
@ -287,7 +288,7 @@ func TestImmutableSecretStopsTheReflector(t *testing.T) {
}
// AddReference should start reflector.
store.AddReference("ns", "name")
store.AddReference("ns", "name", "pod")
if err := wait.Poll(10*time.Millisecond, time.Second, itemExists); err != nil {
t.Errorf("item wasn't added to cache")
}
@ -375,7 +376,7 @@ func TestMaxIdleTimeStopsTheReflector(t *testing.T) {
}
// AddReference should start reflector.
store.AddReference("ns", "name")
store.AddReference("ns", "name", "pod")
if err := wait.Poll(10*time.Millisecond, 10*time.Second, itemExists); err != nil {
t.Errorf("item wasn't added to cache")
}
@ -467,7 +468,7 @@ func TestReflectorNotStoppedOnSlowInitialization(t *testing.T) {
}
// AddReference should start reflector.
store.AddReference("ns", "name")
store.AddReference("ns", "name", "pod")
if err := wait.Poll(10*time.Millisecond, 10*time.Second, itemExists); err != nil {
t.Errorf("item wasn't added to cache")
}

View File

@ -27,6 +27,7 @@ import (
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/apimachinery/pkg/watch"
"k8s.io/client-go/kubernetes"
@ -107,7 +108,7 @@ func TestWatchBasedManager(t *testing.T) {
for j := 0; j < 100; j++ {
name := fmt.Sprintf("s%d", i*100+j)
start := time.Now()
store.AddReference(testNamespace, name)
store.AddReference(testNamespace, name, types.UID(name))
err := wait.PollImmediate(10*time.Millisecond, 10*time.Second, func() (bool, error) {
obj, err := store.Get(testNamespace, name)
if err != nil {