Remove Set/GetClusterName hack from federated libs:

This commit is contained in:
Marcin Wielgus 2016-08-16 20:43:43 +02:00
parent c75cefa296
commit 23ca79bd53
7 changed files with 25 additions and 176 deletions

View File

@ -116,9 +116,8 @@ func NewNamespaceController(client federation_release_1_4.Interface) *NamespaceC
controller.NoResyncPeriodFunc(), controller.NoResyncPeriodFunc(),
// Trigger reconcilation whenever something in federated cluster is changed. In most cases it // Trigger reconcilation whenever something in federated cluster is changed. In most cases it
// would be just confirmation that some namespace opration suceeded. // would be just confirmation that some namespace opration suceeded.
util.NewTriggerOnMetaAndSpecChangesPreproc( util.NewTriggerOnMetaAndSpecChanges(
func(obj pkg_runtime.Object) { nc.deliverNamespaceObj(obj, nc.namespaceReviewDelay, false) }, func(obj pkg_runtime.Object) { nc.deliverNamespaceObj(obj, nc.namespaceReviewDelay, false) },
func(obj pkg_runtime.Object) { util.SetClusterName(obj, cluster.Name) },
)) ))
}, },
@ -265,12 +264,12 @@ func (nc *NamespaceController) reconcileNamespace(namespace string) {
ObjectMeta: baseNamespace.ObjectMeta, ObjectMeta: baseNamespace.ObjectMeta,
Spec: baseNamespace.Spec, Spec: baseNamespace.Spec,
} }
util.SetClusterName(desiredNamespace, cluster.Name)
if !found { if !found {
operations = append(operations, util.FederatedOperation{ operations = append(operations, util.FederatedOperation{
Type: util.OperationTypeAdd, Type: util.OperationTypeAdd,
Obj: desiredNamespace, Obj: desiredNamespace,
ClusterName: cluster.Name,
}) })
} else { } else {
clusterNamespace := clusterNamespaceObj.(*api_v1.Namespace) clusterNamespace := clusterNamespaceObj.(*api_v1.Namespace)
@ -281,6 +280,7 @@ func (nc *NamespaceController) reconcileNamespace(namespace string) {
operations = append(operations, util.FederatedOperation{ operations = append(operations, util.FederatedOperation{
Type: util.OperationTypeUpdate, Type: util.OperationTypeUpdate,
Obj: desiredNamespace, Obj: desiredNamespace,
ClusterName: cluster.Name,
}) })
} }
} }

View File

@ -1,59 +0,0 @@
/*
Copyright 2016 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package util
import (
"fmt"
"k8s.io/kubernetes/pkg/api/meta"
pkg_runtime "k8s.io/kubernetes/pkg/runtime"
)
const (
//TODO: This will be removed once cluster name field is added to ObjectMeta.
ClusterNameAnnotation = "federation.io/name"
)
// TODO: This will be refactored once cluster name field is added to ObjectMeta.
func GetClusterName(obj pkg_runtime.Object) (string, error) {
accessor, err := meta.Accessor(obj)
if err != nil {
return "", err
}
annotations := accessor.GetAnnotations()
if annotations != nil {
if value, found := annotations[ClusterNameAnnotation]; found {
return value, nil
}
}
return "", fmt.Errorf("Cluster information not available")
}
// TODO: This will be removed once cluster name field is added to ObjectMeta.
func SetClusterName(obj pkg_runtime.Object, clusterName string) error {
accessor, err := meta.Accessor(obj)
if err != nil {
return err
}
annotations := accessor.GetAnnotations()
if annotations == nil {
annotations = make(map[string]string)
accessor.SetAnnotations(annotations)
}
annotations[ClusterNameAnnotation] = clusterName
return nil
}

View File

@ -1,55 +0,0 @@
/*
Copyright 2016 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package util
import (
"testing"
api_v1 "k8s.io/kubernetes/pkg/api/v1"
"github.com/stretchr/testify/assert"
)
func TestGetClusterName(t *testing.T) {
// There is a single service ns1/s1 in cluster mycluster.
service := api_v1.Service{
ObjectMeta: api_v1.ObjectMeta{
Namespace: "ns1",
Name: "s1",
Annotations: map[string]string{
ClusterNameAnnotation: "mycluster",
},
},
}
name, err := GetClusterName(&service)
assert.NoError(t, err)
assert.Equal(t, "mycluster", name)
}
func TestSetClusterName(t *testing.T) {
// There is a single service ns1/s1 in cluster mycluster.
service := api_v1.Service{
ObjectMeta: api_v1.ObjectMeta{
Namespace: "ns1",
Name: "s1",
},
}
err := SetClusterName(&service, "mytestname")
assert.NoError(t, err)
clusterName := service.Annotations[ClusterNameAnnotation]
assert.Equal(t, "mytestname", clusterName)
}

View File

@ -36,6 +36,7 @@ const (
// FederatedOperation definition contains type (add/update/delete) and the object itself. // FederatedOperation definition contains type (add/update/delete) and the object itself.
type FederatedOperation struct { type FederatedOperation struct {
Type FederatedOperationType Type FederatedOperationType
ClusterName string
Obj pkg_runtime.Object Obj pkg_runtime.Object
} }
@ -72,11 +73,7 @@ func (fu *federatedUpdaterImpl) Update(ops []FederatedOperation, timeout time.Du
done := make(chan error, len(ops)) done := make(chan error, len(ops))
for _, op := range ops { for _, op := range ops {
go func(op FederatedOperation) { go func(op FederatedOperation) {
clusterName, err := GetClusterName(op.Obj) clusterName := op.ClusterName
if err != nil {
done <- err
return
}
// TODO: Ensure that the clientset has reasonable timeout. // TODO: Ensure that the clientset has reasonable timeout.
clientset, err := fu.federation.GetClientsetForCluster(clusterName) clientset, err := fu.federation.GetClientsetForCluster(clusterName)

View File

@ -56,13 +56,13 @@ func TestFederatedUpdaterOK(t *testing.T) {
updater := NewFederatedUpdater(&fakeFederationView{}, updater := NewFederatedUpdater(&fakeFederationView{},
func(_ federation_release_1_4.Interface, obj pkg_runtime.Object) error { func(_ federation_release_1_4.Interface, obj pkg_runtime.Object) error {
clusterName, _ := GetClusterName(obj) service := obj.(*api_v1.Service)
addChan <- clusterName addChan <- service.Name
return nil return nil
}, },
func(_ federation_release_1_4.Interface, obj pkg_runtime.Object) error { func(_ federation_release_1_4.Interface, obj pkg_runtime.Object) error {
clusterName, _ := GetClusterName(obj) service := obj.(*api_v1.Service)
updateChan <- clusterName updateChan <- service.Name
return nil return nil
}, },
noop) noop)
@ -74,14 +74,14 @@ func TestFederatedUpdaterOK(t *testing.T) {
}, },
{ {
Type: OperationTypeUpdate, Type: OperationTypeUpdate,
Obj: makeService("B", "s1"), Obj: makeService("B", "s2"),
}, },
}, time.Minute) }, time.Minute)
assert.NoError(t, err) assert.NoError(t, err)
add := <-addChan add := <-addChan
update := <-updateChan update := <-updateChan
assert.Equal(t, "A", add) assert.Equal(t, "s1", add)
assert.Equal(t, "B", update) assert.Equal(t, "s2", update)
} }
func TestFederatedUpdaterError(t *testing.T) { func TestFederatedUpdaterError(t *testing.T) {
@ -132,9 +132,6 @@ func makeService(cluster, name string) *api_v1.Service {
ObjectMeta: api_v1.ObjectMeta{ ObjectMeta: api_v1.ObjectMeta{
Namespace: "ns1", Namespace: "ns1",
Name: name, Name: name,
Annotations: map[string]string{
ClusterNameAnnotation: cluster,
},
}, },
} }
} }

View File

@ -25,24 +25,19 @@ import (
) )
// Returns framework.ResourceEventHandlerFuncs that trigger the given function // Returns framework.ResourceEventHandlerFuncs that trigger the given function
// on all object changes. Preproc perprocessing function is executed before each trigger and chec. // on all object changes.
func NewTriggerOnAllChangesPreproc(triggerFunc func(pkg_runtime.Object), preproc func(obj pkg_runtime.Object)) *framework.ResourceEventHandlerFuncs { func NewTriggerOnAllChanges(triggerFunc func(pkg_runtime.Object)) *framework.ResourceEventHandlerFuncs {
return &framework.ResourceEventHandlerFuncs{ return &framework.ResourceEventHandlerFuncs{
DeleteFunc: func(old interface{}) { DeleteFunc: func(old interface{}) {
oldObj := old.(pkg_runtime.Object) oldObj := old.(pkg_runtime.Object)
preproc(oldObj)
triggerFunc(oldObj) triggerFunc(oldObj)
}, },
AddFunc: func(cur interface{}) { AddFunc: func(cur interface{}) {
curObj := cur.(pkg_runtime.Object) curObj := cur.(pkg_runtime.Object)
preproc(curObj)
triggerFunc(curObj) triggerFunc(curObj)
}, },
UpdateFunc: func(old, cur interface{}) { UpdateFunc: func(old, cur interface{}) {
curObj := cur.(pkg_runtime.Object) curObj := cur.(pkg_runtime.Object)
oldObj := cur.(pkg_runtime.Object)
preproc(curObj)
preproc(oldObj)
if !reflect.DeepEqual(old, cur) { if !reflect.DeepEqual(old, cur) {
triggerFunc(curObj) triggerFunc(curObj)
} }
@ -50,14 +45,9 @@ func NewTriggerOnAllChangesPreproc(triggerFunc func(pkg_runtime.Object), preproc
} }
} }
func NewTriggerOnAllChanges(triggerFunc func(pkg_runtime.Object)) *framework.ResourceEventHandlerFuncs {
return NewTriggerOnAllChangesPreproc(triggerFunc, func(obj pkg_runtime.Object) {})
}
// Returns framework.ResourceEventHandlerFuncs that trigger the given function // Returns framework.ResourceEventHandlerFuncs that trigger the given function
// on object add and delete as well as spec/object meta on update. Preproc preprocessing is executed // on object add and delete as well as spec/object meta on update.
// before each trigger and check. func NewTriggerOnMetaAndSpecChanges(triggerFunc func(pkg_runtime.Object)) *framework.ResourceEventHandlerFuncs {
func NewTriggerOnMetaAndSpecChangesPreproc(triggerFunc func(pkg_runtime.Object), preproc func(obj pkg_runtime.Object)) *framework.ResourceEventHandlerFuncs {
getFieldOrPanic := func(obj interface{}, fieldName string) interface{} { getFieldOrPanic := func(obj interface{}, fieldName string) interface{} {
val := reflect.ValueOf(obj).Elem().FieldByName(fieldName) val := reflect.ValueOf(obj).Elem().FieldByName(fieldName)
if val.IsValid() { if val.IsValid() {
@ -69,19 +59,14 @@ func NewTriggerOnMetaAndSpecChangesPreproc(triggerFunc func(pkg_runtime.Object),
return &framework.ResourceEventHandlerFuncs{ return &framework.ResourceEventHandlerFuncs{
DeleteFunc: func(old interface{}) { DeleteFunc: func(old interface{}) {
oldObj := old.(pkg_runtime.Object) oldObj := old.(pkg_runtime.Object)
preproc(oldObj)
triggerFunc(oldObj) triggerFunc(oldObj)
}, },
AddFunc: func(cur interface{}) { AddFunc: func(cur interface{}) {
curObj := cur.(pkg_runtime.Object) curObj := cur.(pkg_runtime.Object)
preproc(curObj)
triggerFunc(curObj) triggerFunc(curObj)
}, },
UpdateFunc: func(old, cur interface{}) { UpdateFunc: func(old, cur interface{}) {
curObj := cur.(pkg_runtime.Object) curObj := cur.(pkg_runtime.Object)
oldObj := cur.(pkg_runtime.Object)
preproc(curObj)
preproc(oldObj)
if !reflect.DeepEqual(getFieldOrPanic(old, "ObjectMeta"), getFieldOrPanic(cur, "ObjectMeta")) || if !reflect.DeepEqual(getFieldOrPanic(old, "ObjectMeta"), getFieldOrPanic(cur, "ObjectMeta")) ||
!reflect.DeepEqual(getFieldOrPanic(old, "Spec"), getFieldOrPanic(cur, "Spec")) { !reflect.DeepEqual(getFieldOrPanic(old, "Spec"), getFieldOrPanic(cur, "Spec")) {
triggerFunc(curObj) triggerFunc(curObj)
@ -89,7 +74,3 @@ func NewTriggerOnMetaAndSpecChangesPreproc(triggerFunc func(pkg_runtime.Object),
}, },
} }
} }
func TriggerOnMetaAndSpecChanges(triggerFunc func(pkg_runtime.Object)) *framework.ResourceEventHandlerFuncs {
return NewTriggerOnAllChangesPreproc(triggerFunc, func(obj pkg_runtime.Object) {})
}

View File

@ -52,19 +52,13 @@ func TestHandlers(t *testing.T) {
} }
} }
trigger := NewTriggerOnAllChangesPreproc( trigger := NewTriggerOnAllChanges(
func(obj pkg_runtime.Object) { func(obj pkg_runtime.Object) {
triggerChan <- struct{}{} triggerChan <- struct{}{}
},
func(obj pkg_runtime.Object) {
SetClusterName(obj, "mycluster")
}) })
trigger.OnAdd(&service) trigger.OnAdd(&service)
assert.True(t, triggered()) assert.True(t, triggered())
name, err := GetClusterName(&service)
assert.NoError(t, err)
assert.Equal(t, "mycluster", name)
trigger.OnDelete(&service) trigger.OnDelete(&service)
assert.True(t, triggered()) assert.True(t, triggered())
trigger.OnUpdate(&service, &service) trigger.OnUpdate(&service, &service)
@ -72,20 +66,14 @@ func TestHandlers(t *testing.T) {
trigger.OnUpdate(&service, &service2) trigger.OnUpdate(&service, &service2)
assert.True(t, triggered()) assert.True(t, triggered())
trigger2 := NewTriggerOnMetaAndSpecChangesPreproc( trigger2 := NewTriggerOnMetaAndSpecChanges(
func(obj pkg_runtime.Object) { func(obj pkg_runtime.Object) {
triggerChan <- struct{}{} triggerChan <- struct{}{}
}, },
func(obj pkg_runtime.Object) { )
SetClusterName(obj, "mycluster")
})
service.Annotations = make(map[string]string)
trigger2.OnAdd(&service) trigger2.OnAdd(&service)
assert.True(t, triggered()) assert.True(t, triggered())
name, err = GetClusterName(&service)
assert.NoError(t, err)
assert.Equal(t, "mycluster", name)
trigger2.OnDelete(&service) trigger2.OnDelete(&service)
assert.True(t, triggered()) assert.True(t, triggered())
trigger2.OnUpdate(&service, &service) trigger2.OnUpdate(&service, &service)