Federation - common libs - a set of handy handlers for informers

This commit is contained in:
Marcin Wielgus 2016-08-10 12:18:35 +02:00
parent 93f802b1a6
commit 99ba03f423
2 changed files with 206 additions and 0 deletions

View File

@ -0,0 +1,95 @@
/*
Copyright 2016 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package util
import (
"fmt"
"reflect"
"k8s.io/kubernetes/pkg/controller/framework"
pkg_runtime "k8s.io/kubernetes/pkg/runtime"
)
// Returns framework.ResourceEventHandlerFuncs that trigger the given function
// on all object changes. Preproc perprocessing function is executed before each trigger and chec.
func NewTriggerOnAllChangesPreproc(triggerFunc func(pkg_runtime.Object), preproc func(obj pkg_runtime.Object)) *framework.ResourceEventHandlerFuncs {
return &framework.ResourceEventHandlerFuncs{
DeleteFunc: func(old interface{}) {
oldObj := old.(pkg_runtime.Object)
preproc(oldObj)
triggerFunc(oldObj)
},
AddFunc: func(cur interface{}) {
curObj := cur.(pkg_runtime.Object)
preproc(curObj)
triggerFunc(curObj)
},
UpdateFunc: func(old, cur interface{}) {
curObj := cur.(pkg_runtime.Object)
oldObj := cur.(pkg_runtime.Object)
preproc(curObj)
preproc(oldObj)
if !reflect.DeepEqual(old, cur) {
triggerFunc(curObj)
}
},
}
}
func NewTriggerOnAllChanges(triggerFunc func(pkg_runtime.Object)) *framework.ResourceEventHandlerFuncs {
return NewTriggerOnAllChangesPreproc(triggerFunc, func(obj pkg_runtime.Object) {})
}
// Returns framework.ResourceEventHandlerFuncs that trigger the given function
// on object add and delete as well as spec/object meta on update. Preproc preprocessing is executed
// before each trigger and check.
func NewTriggerOnMetaAndSpecChangesPreproc(triggerFunc func(pkg_runtime.Object), preproc func(obj pkg_runtime.Object)) *framework.ResourceEventHandlerFuncs {
getFieldOrPanic := func(obj interface{}, fieldName string) interface{} {
val := reflect.ValueOf(obj).Elem().FieldByName(fieldName)
if val.IsValid() {
return val.Interface()
} else {
panic(fmt.Errorf("field not found: %s", fieldName))
}
}
return &framework.ResourceEventHandlerFuncs{
DeleteFunc: func(old interface{}) {
oldObj := old.(pkg_runtime.Object)
preproc(oldObj)
triggerFunc(oldObj)
},
AddFunc: func(cur interface{}) {
curObj := cur.(pkg_runtime.Object)
preproc(curObj)
triggerFunc(curObj)
},
UpdateFunc: func(old, cur interface{}) {
curObj := cur.(pkg_runtime.Object)
oldObj := cur.(pkg_runtime.Object)
preproc(curObj)
preproc(oldObj)
if !reflect.DeepEqual(getFieldOrPanic(old, "ObjectMeta"), getFieldOrPanic(cur, "ObjectMeta")) ||
!reflect.DeepEqual(getFieldOrPanic(old, "Spec"), getFieldOrPanic(cur, "Spec")) {
triggerFunc(curObj)
}
},
}
}
func TriggerOnMetaAndSpecChanges(triggerFunc func(pkg_runtime.Object)) *framework.ResourceEventHandlerFuncs {
return NewTriggerOnAllChangesPreproc(triggerFunc, func(obj pkg_runtime.Object) {})
}

View File

@ -0,0 +1,111 @@
/*
Copyright 2016 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package util
import (
"testing"
api_v1 "k8s.io/kubernetes/pkg/api/v1"
pkg_runtime "k8s.io/kubernetes/pkg/runtime"
"github.com/stretchr/testify/assert"
)
func TestHandlers(t *testing.T) {
// There is a single service ns1/s1 in cluster mycluster.
service := api_v1.Service{
ObjectMeta: api_v1.ObjectMeta{
Namespace: "ns1",
Name: "s1",
},
}
service2 := api_v1.Service{
ObjectMeta: api_v1.ObjectMeta{
Namespace: "ns1",
Name: "s1",
Annotations: map[string]string{
"A": "B",
},
},
}
triggerChan := make(chan struct{}, 1)
triggered := func() bool {
select {
case <-triggerChan:
return true
default:
return false
}
}
trigger := NewTriggerOnAllChangesPreproc(
func(obj pkg_runtime.Object) {
triggerChan <- struct{}{}
},
func(obj pkg_runtime.Object) {
SetClusterName(obj, "mycluster")
})
trigger.OnAdd(&service)
assert.True(t, triggered())
name, err := GetClusterName(&service)
assert.NoError(t, err)
assert.Equal(t, "mycluster", name)
trigger.OnDelete(&service)
assert.True(t, triggered())
trigger.OnUpdate(&service, &service)
assert.False(t, triggered())
trigger.OnUpdate(&service, &service2)
assert.True(t, triggered())
trigger2 := NewTriggerOnMetaAndSpecChangesPreproc(
func(obj pkg_runtime.Object) {
triggerChan <- struct{}{}
},
func(obj pkg_runtime.Object) {
SetClusterName(obj, "mycluster")
})
service.Annotations = make(map[string]string)
trigger2.OnAdd(&service)
assert.True(t, triggered())
name, err = GetClusterName(&service)
assert.NoError(t, err)
assert.Equal(t, "mycluster", name)
trigger2.OnDelete(&service)
assert.True(t, triggered())
trigger2.OnUpdate(&service, &service)
assert.False(t, triggered())
trigger2.OnUpdate(&service, &service2)
assert.True(t, triggered())
service3 := api_v1.Service{
ObjectMeta: api_v1.ObjectMeta{
Namespace: "ns1",
Name: "s1",
},
Status: api_v1.ServiceStatus{
LoadBalancer: api_v1.LoadBalancerStatus{
Ingress: []api_v1.LoadBalancerIngress{{
Hostname: "A",
}},
},
},
}
trigger2.OnUpdate(&service, &service3)
assert.False(t, triggered())
}