diff --git a/federation/pkg/federation-controller/util/handlers.go b/federation/pkg/federation-controller/util/handlers.go new file mode 100644 index 00000000000..08267d448bd --- /dev/null +++ b/federation/pkg/federation-controller/util/handlers.go @@ -0,0 +1,95 @@ +/* +Copyright 2016 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package util + +import ( + "fmt" + "reflect" + + "k8s.io/kubernetes/pkg/controller/framework" + pkg_runtime "k8s.io/kubernetes/pkg/runtime" +) + +// Returns framework.ResourceEventHandlerFuncs that trigger the given function +// on all object changes. Preproc perprocessing function is executed before each trigger and chec. +func NewTriggerOnAllChangesPreproc(triggerFunc func(pkg_runtime.Object), preproc func(obj pkg_runtime.Object)) *framework.ResourceEventHandlerFuncs { + return &framework.ResourceEventHandlerFuncs{ + DeleteFunc: func(old interface{}) { + oldObj := old.(pkg_runtime.Object) + preproc(oldObj) + triggerFunc(oldObj) + }, + AddFunc: func(cur interface{}) { + curObj := cur.(pkg_runtime.Object) + preproc(curObj) + triggerFunc(curObj) + }, + UpdateFunc: func(old, cur interface{}) { + curObj := cur.(pkg_runtime.Object) + oldObj := cur.(pkg_runtime.Object) + preproc(curObj) + preproc(oldObj) + if !reflect.DeepEqual(old, cur) { + triggerFunc(curObj) + } + }, + } +} + +func NewTriggerOnAllChanges(triggerFunc func(pkg_runtime.Object)) *framework.ResourceEventHandlerFuncs { + return NewTriggerOnAllChangesPreproc(triggerFunc, func(obj pkg_runtime.Object) {}) +} + +// Returns framework.ResourceEventHandlerFuncs that trigger the given function +// on object add and delete as well as spec/object meta on update. Preproc preprocessing is executed +// before each trigger and check. +func NewTriggerOnMetaAndSpecChangesPreproc(triggerFunc func(pkg_runtime.Object), preproc func(obj pkg_runtime.Object)) *framework.ResourceEventHandlerFuncs { + getFieldOrPanic := func(obj interface{}, fieldName string) interface{} { + val := reflect.ValueOf(obj).Elem().FieldByName(fieldName) + if val.IsValid() { + return val.Interface() + } else { + panic(fmt.Errorf("field not found: %s", fieldName)) + } + } + return &framework.ResourceEventHandlerFuncs{ + DeleteFunc: func(old interface{}) { + oldObj := old.(pkg_runtime.Object) + preproc(oldObj) + triggerFunc(oldObj) + }, + AddFunc: func(cur interface{}) { + curObj := cur.(pkg_runtime.Object) + preproc(curObj) + triggerFunc(curObj) + }, + UpdateFunc: func(old, cur interface{}) { + curObj := cur.(pkg_runtime.Object) + oldObj := cur.(pkg_runtime.Object) + preproc(curObj) + preproc(oldObj) + if !reflect.DeepEqual(getFieldOrPanic(old, "ObjectMeta"), getFieldOrPanic(cur, "ObjectMeta")) || + !reflect.DeepEqual(getFieldOrPanic(old, "Spec"), getFieldOrPanic(cur, "Spec")) { + triggerFunc(curObj) + } + }, + } +} + +func TriggerOnMetaAndSpecChanges(triggerFunc func(pkg_runtime.Object)) *framework.ResourceEventHandlerFuncs { + return NewTriggerOnAllChangesPreproc(triggerFunc, func(obj pkg_runtime.Object) {}) +} diff --git a/federation/pkg/federation-controller/util/handlers_test.go b/federation/pkg/federation-controller/util/handlers_test.go new file mode 100644 index 00000000000..0add2f1828f --- /dev/null +++ b/federation/pkg/federation-controller/util/handlers_test.go @@ -0,0 +1,111 @@ +/* +Copyright 2016 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package util + +import ( + "testing" + + api_v1 "k8s.io/kubernetes/pkg/api/v1" + pkg_runtime "k8s.io/kubernetes/pkg/runtime" + + "github.com/stretchr/testify/assert" +) + +func TestHandlers(t *testing.T) { + // There is a single service ns1/s1 in cluster mycluster. + service := api_v1.Service{ + ObjectMeta: api_v1.ObjectMeta{ + Namespace: "ns1", + Name: "s1", + }, + } + service2 := api_v1.Service{ + ObjectMeta: api_v1.ObjectMeta{ + Namespace: "ns1", + Name: "s1", + Annotations: map[string]string{ + "A": "B", + }, + }, + } + triggerChan := make(chan struct{}, 1) + triggered := func() bool { + select { + case <-triggerChan: + return true + default: + return false + } + } + + trigger := NewTriggerOnAllChangesPreproc( + func(obj pkg_runtime.Object) { + triggerChan <- struct{}{} + }, + func(obj pkg_runtime.Object) { + SetClusterName(obj, "mycluster") + }) + + trigger.OnAdd(&service) + assert.True(t, triggered()) + name, err := GetClusterName(&service) + assert.NoError(t, err) + assert.Equal(t, "mycluster", name) + trigger.OnDelete(&service) + assert.True(t, triggered()) + trigger.OnUpdate(&service, &service) + assert.False(t, triggered()) + trigger.OnUpdate(&service, &service2) + assert.True(t, triggered()) + + trigger2 := NewTriggerOnMetaAndSpecChangesPreproc( + func(obj pkg_runtime.Object) { + triggerChan <- struct{}{} + }, + func(obj pkg_runtime.Object) { + SetClusterName(obj, "mycluster") + }) + + service.Annotations = make(map[string]string) + trigger2.OnAdd(&service) + assert.True(t, triggered()) + name, err = GetClusterName(&service) + assert.NoError(t, err) + assert.Equal(t, "mycluster", name) + trigger2.OnDelete(&service) + assert.True(t, triggered()) + trigger2.OnUpdate(&service, &service) + assert.False(t, triggered()) + trigger2.OnUpdate(&service, &service2) + assert.True(t, triggered()) + + service3 := api_v1.Service{ + ObjectMeta: api_v1.ObjectMeta{ + Namespace: "ns1", + Name: "s1", + }, + Status: api_v1.ServiceStatus{ + LoadBalancer: api_v1.LoadBalancerStatus{ + Ingress: []api_v1.LoadBalancerIngress{{ + Hostname: "A", + }}, + }, + }, + } + trigger2.OnUpdate(&service, &service3) + assert.False(t, triggered()) +}