mirror of
https://github.com/kubernetes/client-go.git
synced 2025-09-05 17:10:27 +00:00
Create TransformingInformer
TransformingInfomer is like a regular Informer, but allows for applying custom transform functions on the objects received via list/watch API calls. Kubernetes-commit: efd3490076c391823095b4c8bd6e847ae18eb012
This commit is contained in:
committed by
Kubernetes Publisher
parent
f1b4ce1ec0
commit
b100ecfc06
123
tools/cache/controller_test.go
vendored
123
tools/cache/controller_test.go
vendored
@@ -24,6 +24,7 @@ import (
|
||||
"time"
|
||||
|
||||
"k8s.io/api/core/v1"
|
||||
apiequality "k8s.io/apimachinery/pkg/api/equality"
|
||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||
"k8s.io/apimachinery/pkg/runtime"
|
||||
"k8s.io/apimachinery/pkg/util/sets"
|
||||
@@ -451,3 +452,125 @@ func TestPanicPropagated(t *testing.T) {
|
||||
t.Errorf("timeout: the panic failed to propagate from the controller run method!")
|
||||
}
|
||||
}
|
||||
|
||||
func TestTransformingInformer(t *testing.T) {
|
||||
// source simulates an apiserver object endpoint.
|
||||
source := fcache.NewFakeControllerSource()
|
||||
|
||||
makePod := func(name, generation string) *v1.Pod {
|
||||
return &v1.Pod{
|
||||
ObjectMeta: metav1.ObjectMeta{
|
||||
Name: name,
|
||||
Namespace: "namespace",
|
||||
Labels: map[string]string{"generation": generation},
|
||||
},
|
||||
Spec: v1.PodSpec{
|
||||
Hostname: "hostname",
|
||||
Subdomain: "subdomain",
|
||||
},
|
||||
}
|
||||
}
|
||||
expectedPod := func(name, generation string) *v1.Pod {
|
||||
pod := makePod(name, generation)
|
||||
pod.Spec.Hostname = "new-hostname"
|
||||
pod.Spec.Subdomain = ""
|
||||
pod.Spec.NodeName = "nodename"
|
||||
return pod
|
||||
}
|
||||
|
||||
source.Add(makePod("pod1", "1"))
|
||||
source.Modify(makePod("pod1", "2"))
|
||||
|
||||
type event struct {
|
||||
eventType watch.EventType
|
||||
previous interface{}
|
||||
current interface{}
|
||||
}
|
||||
events := make(chan event, 10)
|
||||
recordEvent := func(eventType watch.EventType, previous, current interface{}) {
|
||||
events <- event{eventType: eventType, previous: previous, current: current}
|
||||
}
|
||||
verifyEvent := func(eventType watch.EventType, previous, current interface{}) {
|
||||
select {
|
||||
case event := <-events:
|
||||
if event.eventType != eventType {
|
||||
t.Errorf("expected type %v, got %v", eventType, event.eventType)
|
||||
}
|
||||
if !apiequality.Semantic.DeepEqual(event.previous, previous) {
|
||||
t.Errorf("expected previous object %#v, got %#v", previous, event.previous)
|
||||
}
|
||||
if !apiequality.Semantic.DeepEqual(event.current, current) {
|
||||
t.Errorf("expected object %#v, got %#v", current, event.current)
|
||||
}
|
||||
case <-time.After(wait.ForeverTestTimeout):
|
||||
t.Errorf("failed to get event")
|
||||
}
|
||||
}
|
||||
|
||||
podTransformer := func(obj interface{}) (interface{}, error) {
|
||||
pod, ok := obj.(*v1.Pod)
|
||||
if !ok {
|
||||
return nil, fmt.Errorf("unexpected object type: %T", obj)
|
||||
}
|
||||
pod.Spec.Hostname = "new-hostname"
|
||||
pod.Spec.Subdomain = ""
|
||||
pod.Spec.NodeName = "nodename"
|
||||
|
||||
// Clear out ResourceVersion to simplify comparisons.
|
||||
pod.ResourceVersion = ""
|
||||
|
||||
return pod, nil
|
||||
}
|
||||
|
||||
store, controller := NewTransformingInformer(
|
||||
source,
|
||||
&v1.Pod{},
|
||||
0,
|
||||
ResourceEventHandlerFuncs{
|
||||
AddFunc: func(obj interface{}) { recordEvent(watch.Added, nil, obj) },
|
||||
UpdateFunc: func(oldObj, newObj interface{}) { recordEvent(watch.Modified, oldObj, newObj) },
|
||||
DeleteFunc: func(obj interface{}) { recordEvent(watch.Deleted, obj, nil) },
|
||||
},
|
||||
podTransformer,
|
||||
)
|
||||
|
||||
verifyStore := func(expectedItems []interface{}) {
|
||||
items := store.List()
|
||||
if len(items) != len(expectedItems) {
|
||||
t.Errorf("unexpected items %v, expected %v", items, expectedItems)
|
||||
}
|
||||
for _, expectedItem := range expectedItems {
|
||||
found := false
|
||||
for _, item := range items {
|
||||
if apiequality.Semantic.DeepEqual(item, expectedItem) {
|
||||
found = true
|
||||
}
|
||||
}
|
||||
if !found {
|
||||
t.Errorf("expected item %v not found in %v", expectedItem, items)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
stopCh := make(chan struct{})
|
||||
go controller.Run(stopCh)
|
||||
|
||||
verifyEvent(watch.Added, nil, expectedPod("pod1", "2"))
|
||||
verifyStore([]interface{}{expectedPod("pod1", "2")})
|
||||
|
||||
source.Add(makePod("pod2", "1"))
|
||||
verifyEvent(watch.Added, nil, expectedPod("pod2", "1"))
|
||||
verifyStore([]interface{}{expectedPod("pod1", "2"), expectedPod("pod2", "1")})
|
||||
|
||||
source.Add(makePod("pod3", "1"))
|
||||
verifyEvent(watch.Added, nil, expectedPod("pod3", "1"))
|
||||
|
||||
source.Modify(makePod("pod2", "2"))
|
||||
verifyEvent(watch.Modified, expectedPod("pod2", "1"), expectedPod("pod2", "2"))
|
||||
|
||||
source.Delete(makePod("pod1", "2"))
|
||||
verifyEvent(watch.Deleted, expectedPod("pod1", "2"), nil)
|
||||
verifyStore([]interface{}{expectedPod("pod2", "2"), expectedPod("pod3", "1")})
|
||||
|
||||
close(stopCh)
|
||||
}
|
||||
|
Reference in New Issue
Block a user