Merge pull request #30353 from mwielgus/handlers-fed

Automatic merge from submit-queue

Federation - common libs - a set of handy handlers for informers

A common scenario for using informer handlers would be to put the object key in a queue
for global reconciliation.

cc: @quinton-hoole @wojtek-t @kubernetes/sig-cluster-federation

<!-- Reviewable:start -->
---
This change is [<img src="https://reviewable.kubernetes.io/review_button.svg" height="34" align="absmiddle" alt="Reviewable"/>](https://reviewable.kubernetes.io/reviews/kubernetes/kubernetes/30353)
<!-- Reviewable:end -->
This commit is contained in:
Kubernetes Submit Queue 2016-08-10 04:45:04 -07:00 committed by GitHub
commit ab68ce9676
2 changed files with 206 additions and 0 deletions

View File

@ -0,0 +1,95 @@
/*
Copyright 2016 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package util
import (
"fmt"
"reflect"
"k8s.io/kubernetes/pkg/controller/framework"
pkg_runtime "k8s.io/kubernetes/pkg/runtime"
)
// Returns framework.ResourceEventHandlerFuncs that trigger the given function
// on all object changes. Preproc perprocessing function is executed before each trigger and chec.
func NewTriggerOnAllChangesPreproc(triggerFunc func(pkg_runtime.Object), preproc func(obj pkg_runtime.Object)) *framework.ResourceEventHandlerFuncs {
return &framework.ResourceEventHandlerFuncs{
DeleteFunc: func(old interface{}) {
oldObj := old.(pkg_runtime.Object)
preproc(oldObj)
triggerFunc(oldObj)
},
AddFunc: func(cur interface{}) {
curObj := cur.(pkg_runtime.Object)
preproc(curObj)
triggerFunc(curObj)
},
UpdateFunc: func(old, cur interface{}) {
curObj := cur.(pkg_runtime.Object)
oldObj := cur.(pkg_runtime.Object)
preproc(curObj)
preproc(oldObj)
if !reflect.DeepEqual(old, cur) {
triggerFunc(curObj)
}
},
}
}
func NewTriggerOnAllChanges(triggerFunc func(pkg_runtime.Object)) *framework.ResourceEventHandlerFuncs {
return NewTriggerOnAllChangesPreproc(triggerFunc, func(obj pkg_runtime.Object) {})
}
// Returns framework.ResourceEventHandlerFuncs that trigger the given function
// on object add and delete as well as spec/object meta on update. Preproc preprocessing is executed
// before each trigger and check.
func NewTriggerOnMetaAndSpecChangesPreproc(triggerFunc func(pkg_runtime.Object), preproc func(obj pkg_runtime.Object)) *framework.ResourceEventHandlerFuncs {
getFieldOrPanic := func(obj interface{}, fieldName string) interface{} {
val := reflect.ValueOf(obj).Elem().FieldByName(fieldName)
if val.IsValid() {
return val.Interface()
} else {
panic(fmt.Errorf("field not found: %s", fieldName))
}
}
return &framework.ResourceEventHandlerFuncs{
DeleteFunc: func(old interface{}) {
oldObj := old.(pkg_runtime.Object)
preproc(oldObj)
triggerFunc(oldObj)
},
AddFunc: func(cur interface{}) {
curObj := cur.(pkg_runtime.Object)
preproc(curObj)
triggerFunc(curObj)
},
UpdateFunc: func(old, cur interface{}) {
curObj := cur.(pkg_runtime.Object)
oldObj := cur.(pkg_runtime.Object)
preproc(curObj)
preproc(oldObj)
if !reflect.DeepEqual(getFieldOrPanic(old, "ObjectMeta"), getFieldOrPanic(cur, "ObjectMeta")) ||
!reflect.DeepEqual(getFieldOrPanic(old, "Spec"), getFieldOrPanic(cur, "Spec")) {
triggerFunc(curObj)
}
},
}
}
func TriggerOnMetaAndSpecChanges(triggerFunc func(pkg_runtime.Object)) *framework.ResourceEventHandlerFuncs {
return NewTriggerOnAllChangesPreproc(triggerFunc, func(obj pkg_runtime.Object) {})
}

View File

@ -0,0 +1,111 @@
/*
Copyright 2016 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package util
import (
"testing"
api_v1 "k8s.io/kubernetes/pkg/api/v1"
pkg_runtime "k8s.io/kubernetes/pkg/runtime"
"github.com/stretchr/testify/assert"
)
func TestHandlers(t *testing.T) {
// There is a single service ns1/s1 in cluster mycluster.
service := api_v1.Service{
ObjectMeta: api_v1.ObjectMeta{
Namespace: "ns1",
Name: "s1",
},
}
service2 := api_v1.Service{
ObjectMeta: api_v1.ObjectMeta{
Namespace: "ns1",
Name: "s1",
Annotations: map[string]string{
"A": "B",
},
},
}
triggerChan := make(chan struct{}, 1)
triggered := func() bool {
select {
case <-triggerChan:
return true
default:
return false
}
}
trigger := NewTriggerOnAllChangesPreproc(
func(obj pkg_runtime.Object) {
triggerChan <- struct{}{}
},
func(obj pkg_runtime.Object) {
SetClusterName(obj, "mycluster")
})
trigger.OnAdd(&service)
assert.True(t, triggered())
name, err := GetClusterName(&service)
assert.NoError(t, err)
assert.Equal(t, "mycluster", name)
trigger.OnDelete(&service)
assert.True(t, triggered())
trigger.OnUpdate(&service, &service)
assert.False(t, triggered())
trigger.OnUpdate(&service, &service2)
assert.True(t, triggered())
trigger2 := NewTriggerOnMetaAndSpecChangesPreproc(
func(obj pkg_runtime.Object) {
triggerChan <- struct{}{}
},
func(obj pkg_runtime.Object) {
SetClusterName(obj, "mycluster")
})
service.Annotations = make(map[string]string)
trigger2.OnAdd(&service)
assert.True(t, triggered())
name, err = GetClusterName(&service)
assert.NoError(t, err)
assert.Equal(t, "mycluster", name)
trigger2.OnDelete(&service)
assert.True(t, triggered())
trigger2.OnUpdate(&service, &service)
assert.False(t, triggered())
trigger2.OnUpdate(&service, &service2)
assert.True(t, triggered())
service3 := api_v1.Service{
ObjectMeta: api_v1.ObjectMeta{
Namespace: "ns1",
Name: "s1",
},
Status: api_v1.ServiceStatus{
LoadBalancer: api_v1.LoadBalancerStatus{
Ingress: []api_v1.LoadBalancerIngress{{
Hostname: "A",
}},
},
},
}
trigger2.OnUpdate(&service, &service3)
assert.False(t, triggered())
}