mirror of
https://github.com/k3s-io/kubernetes.git
synced 2025-07-30 15:05:27 +00:00
Merge pull request #30704 from mwielgus/no-cluster-name
Automatic merge from submit-queue Support for no object-native cluster name in federated libs As the deadlines are near and #28921 is still in review and may in the end not entirely suit the lib needs (like ClusterName only in FederatedApiServer) it may be better to not depend on it the libs. So this chanegd: - makes federated informer return a pair obj + cluster name when it is relevant - removes preprocessing handler which caused locking/race troubles anyway The impact on controllers that are still in review is minimal (just couple lines of code). cc: @quinton-hoole @wojtek-t @kubernetes/sig-cluster-federation
This commit is contained in:
commit
d576486208
@ -116,9 +116,8 @@ func NewNamespaceController(client federation_release_1_4.Interface) *NamespaceC
|
||||
controller.NoResyncPeriodFunc(),
|
||||
// Trigger reconcilation whenever something in federated cluster is changed. In most cases it
|
||||
// would be just confirmation that some namespace opration suceeded.
|
||||
util.NewTriggerOnMetaAndSpecChangesPreproc(
|
||||
util.NewTriggerOnMetaAndSpecChanges(
|
||||
func(obj pkg_runtime.Object) { nc.deliverNamespaceObj(obj, nc.namespaceReviewDelay, false) },
|
||||
func(obj pkg_runtime.Object) { util.SetClusterName(obj, cluster.Name) },
|
||||
))
|
||||
},
|
||||
|
||||
@ -265,12 +264,12 @@ func (nc *NamespaceController) reconcileNamespace(namespace string) {
|
||||
ObjectMeta: baseNamespace.ObjectMeta,
|
||||
Spec: baseNamespace.Spec,
|
||||
}
|
||||
util.SetClusterName(desiredNamespace, cluster.Name)
|
||||
|
||||
if !found {
|
||||
operations = append(operations, util.FederatedOperation{
|
||||
Type: util.OperationTypeAdd,
|
||||
Obj: desiredNamespace,
|
||||
Type: util.OperationTypeAdd,
|
||||
Obj: desiredNamespace,
|
||||
ClusterName: cluster.Name,
|
||||
})
|
||||
} else {
|
||||
clusterNamespace := clusterNamespaceObj.(*api_v1.Namespace)
|
||||
@ -279,8 +278,9 @@ func (nc *NamespaceController) reconcileNamespace(namespace string) {
|
||||
if !reflect.DeepEqual(desiredNamespace.ObjectMeta, clusterNamespace.ObjectMeta) ||
|
||||
!reflect.DeepEqual(desiredNamespace.Spec, clusterNamespace.Spec) {
|
||||
operations = append(operations, util.FederatedOperation{
|
||||
Type: util.OperationTypeUpdate,
|
||||
Obj: desiredNamespace,
|
||||
Type: util.OperationTypeUpdate,
|
||||
Obj: desiredNamespace,
|
||||
ClusterName: cluster.Name,
|
||||
})
|
||||
}
|
||||
}
|
||||
|
@ -40,10 +40,16 @@ const (
|
||||
userAgentName = "federation-service-controller"
|
||||
)
|
||||
|
||||
// An object with an origin information.
|
||||
type FederatedObject struct {
|
||||
Object interface{}
|
||||
ClusterName string
|
||||
}
|
||||
|
||||
// FederatedReadOnlyStore is an overlay over multiple stores created in federated clusters.
|
||||
type FederatedReadOnlyStore interface {
|
||||
// Returns all items in the store.
|
||||
List() ([]interface{}, error)
|
||||
List() ([]FederatedObject, error)
|
||||
|
||||
// Returns all items from a cluster.
|
||||
ListFromCluster(clusterName string) ([]interface{}, error)
|
||||
@ -52,7 +58,7 @@ type FederatedReadOnlyStore interface {
|
||||
GetByKey(clusterName string, key string) (interface{}, bool, error)
|
||||
|
||||
// Returns the items stored under the given key in all clusters.
|
||||
GetFromAllClusters(key string) ([]interface{}, error)
|
||||
GetFromAllClusters(key string) ([]FederatedObject, error)
|
||||
|
||||
// Checks whether stores for all clusters form the lists (and only these) are there and
|
||||
// are synced. This is only a basic check whether the data inside of the store is usable.
|
||||
@ -382,14 +388,15 @@ func (f *federatedInformerImpl) GetTargetStore() FederatedReadOnlyStore {
|
||||
}
|
||||
|
||||
// Returns all items in the store.
|
||||
func (fs *federatedStoreImpl) List() ([]interface{}, error) {
|
||||
func (fs *federatedStoreImpl) List() ([]FederatedObject, error) {
|
||||
fs.federatedInformer.Lock()
|
||||
defer fs.federatedInformer.Unlock()
|
||||
|
||||
result := make([]interface{}, 0)
|
||||
for _, targetInformer := range fs.federatedInformer.targetInformers {
|
||||
values := targetInformer.store.List()
|
||||
result = append(result, values...)
|
||||
result := make([]FederatedObject, 0)
|
||||
for clusterName, targetInformer := range fs.federatedInformer.targetInformers {
|
||||
for _, value := range targetInformer.store.List() {
|
||||
result = append(result, FederatedObject{ClusterName: clusterName, Object: value})
|
||||
}
|
||||
}
|
||||
return result, nil
|
||||
}
|
||||
@ -418,18 +425,18 @@ func (fs *federatedStoreImpl) GetByKey(clusterName string, key string) (interfac
|
||||
}
|
||||
|
||||
// Returns the items stored under the given key in all clusters.
|
||||
func (fs *federatedStoreImpl) GetFromAllClusters(key string) ([]interface{}, error) {
|
||||
func (fs *federatedStoreImpl) GetFromAllClusters(key string) ([]FederatedObject, error) {
|
||||
fs.federatedInformer.Lock()
|
||||
defer fs.federatedInformer.Unlock()
|
||||
|
||||
result := make([]interface{}, 0)
|
||||
for _, targetInformer := range fs.federatedInformer.targetInformers {
|
||||
result := make([]FederatedObject, 0)
|
||||
for clusterName, targetInformer := range fs.federatedInformer.targetInformers {
|
||||
value, exist, err := targetInformer.store.GetByKey(key)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if exist {
|
||||
result = append(result, value)
|
||||
result = append(result, FederatedObject{ClusterName: clusterName, Object: value})
|
||||
}
|
||||
}
|
||||
return result, nil
|
||||
|
@ -121,7 +121,8 @@ func TestFederatedInformer(t *testing.T) {
|
||||
assert.Contains(t, readyClusters, &cluster)
|
||||
serviceList, err := informer.GetTargetStore().List()
|
||||
assert.NoError(t, err)
|
||||
assert.Contains(t, serviceList, &service)
|
||||
federatedService := FederatedObject{ClusterName: "mycluster", Object: &service}
|
||||
assert.Contains(t, serviceList, federatedService)
|
||||
service1, found, err := informer.GetTargetStore().GetByKey("mycluster", "ns1/s1")
|
||||
assert.NoError(t, err)
|
||||
assert.True(t, found)
|
||||
|
@ -1,59 +0,0 @@
|
||||
/*
|
||||
Copyright 2016 The Kubernetes Authors.
|
||||
|
||||
Licensed under the Apache License, Version 2.0 (the "License");
|
||||
you may not use this file except in compliance with the License.
|
||||
You may obtain a copy of the License at
|
||||
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
|
||||
Unless required by applicable law or agreed to in writing, software
|
||||
distributed under the License is distributed on an "AS IS" BASIS,
|
||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
*/
|
||||
|
||||
package util
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
|
||||
"k8s.io/kubernetes/pkg/api/meta"
|
||||
pkg_runtime "k8s.io/kubernetes/pkg/runtime"
|
||||
)
|
||||
|
||||
const (
|
||||
//TODO: This will be removed once cluster name field is added to ObjectMeta.
|
||||
ClusterNameAnnotation = "federation.io/name"
|
||||
)
|
||||
|
||||
// TODO: This will be refactored once cluster name field is added to ObjectMeta.
|
||||
func GetClusterName(obj pkg_runtime.Object) (string, error) {
|
||||
accessor, err := meta.Accessor(obj)
|
||||
if err != nil {
|
||||
return "", err
|
||||
}
|
||||
annotations := accessor.GetAnnotations()
|
||||
if annotations != nil {
|
||||
if value, found := annotations[ClusterNameAnnotation]; found {
|
||||
return value, nil
|
||||
}
|
||||
}
|
||||
return "", fmt.Errorf("Cluster information not available")
|
||||
}
|
||||
|
||||
// TODO: This will be removed once cluster name field is added to ObjectMeta.
|
||||
func SetClusterName(obj pkg_runtime.Object, clusterName string) error {
|
||||
accessor, err := meta.Accessor(obj)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
annotations := accessor.GetAnnotations()
|
||||
if annotations == nil {
|
||||
annotations = make(map[string]string)
|
||||
accessor.SetAnnotations(annotations)
|
||||
}
|
||||
annotations[ClusterNameAnnotation] = clusterName
|
||||
return nil
|
||||
}
|
@ -1,55 +0,0 @@
|
||||
/*
|
||||
Copyright 2016 The Kubernetes Authors.
|
||||
|
||||
Licensed under the Apache License, Version 2.0 (the "License");
|
||||
you may not use this file except in compliance with the License.
|
||||
You may obtain a copy of the License at
|
||||
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
|
||||
Unless required by applicable law or agreed to in writing, software
|
||||
distributed under the License is distributed on an "AS IS" BASIS,
|
||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
*/
|
||||
|
||||
package util
|
||||
|
||||
import (
|
||||
"testing"
|
||||
|
||||
api_v1 "k8s.io/kubernetes/pkg/api/v1"
|
||||
|
||||
"github.com/stretchr/testify/assert"
|
||||
)
|
||||
|
||||
func TestGetClusterName(t *testing.T) {
|
||||
// There is a single service ns1/s1 in cluster mycluster.
|
||||
service := api_v1.Service{
|
||||
ObjectMeta: api_v1.ObjectMeta{
|
||||
Namespace: "ns1",
|
||||
Name: "s1",
|
||||
Annotations: map[string]string{
|
||||
ClusterNameAnnotation: "mycluster",
|
||||
},
|
||||
},
|
||||
}
|
||||
name, err := GetClusterName(&service)
|
||||
assert.NoError(t, err)
|
||||
assert.Equal(t, "mycluster", name)
|
||||
}
|
||||
|
||||
func TestSetClusterName(t *testing.T) {
|
||||
// There is a single service ns1/s1 in cluster mycluster.
|
||||
service := api_v1.Service{
|
||||
ObjectMeta: api_v1.ObjectMeta{
|
||||
Namespace: "ns1",
|
||||
Name: "s1",
|
||||
},
|
||||
}
|
||||
err := SetClusterName(&service, "mytestname")
|
||||
assert.NoError(t, err)
|
||||
clusterName := service.Annotations[ClusterNameAnnotation]
|
||||
assert.Equal(t, "mytestname", clusterName)
|
||||
}
|
@ -35,8 +35,9 @@ const (
|
||||
|
||||
// FederatedOperation definition contains type (add/update/delete) and the object itself.
|
||||
type FederatedOperation struct {
|
||||
Type FederatedOperationType
|
||||
Obj pkg_runtime.Object
|
||||
Type FederatedOperationType
|
||||
ClusterName string
|
||||
Obj pkg_runtime.Object
|
||||
}
|
||||
|
||||
// A helper that executes the given set of updates on federation, in parallel.
|
||||
@ -72,11 +73,7 @@ func (fu *federatedUpdaterImpl) Update(ops []FederatedOperation, timeout time.Du
|
||||
done := make(chan error, len(ops))
|
||||
for _, op := range ops {
|
||||
go func(op FederatedOperation) {
|
||||
clusterName, err := GetClusterName(op.Obj)
|
||||
if err != nil {
|
||||
done <- err
|
||||
return
|
||||
}
|
||||
clusterName := op.ClusterName
|
||||
|
||||
// TODO: Ensure that the clientset has reasonable timeout.
|
||||
clientset, err := fu.federation.GetClientsetForCluster(clusterName)
|
||||
|
@ -56,13 +56,13 @@ func TestFederatedUpdaterOK(t *testing.T) {
|
||||
|
||||
updater := NewFederatedUpdater(&fakeFederationView{},
|
||||
func(_ federation_release_1_4.Interface, obj pkg_runtime.Object) error {
|
||||
clusterName, _ := GetClusterName(obj)
|
||||
addChan <- clusterName
|
||||
service := obj.(*api_v1.Service)
|
||||
addChan <- service.Name
|
||||
return nil
|
||||
},
|
||||
func(_ federation_release_1_4.Interface, obj pkg_runtime.Object) error {
|
||||
clusterName, _ := GetClusterName(obj)
|
||||
updateChan <- clusterName
|
||||
service := obj.(*api_v1.Service)
|
||||
updateChan <- service.Name
|
||||
return nil
|
||||
},
|
||||
noop)
|
||||
@ -74,14 +74,14 @@ func TestFederatedUpdaterOK(t *testing.T) {
|
||||
},
|
||||
{
|
||||
Type: OperationTypeUpdate,
|
||||
Obj: makeService("B", "s1"),
|
||||
Obj: makeService("B", "s2"),
|
||||
},
|
||||
}, time.Minute)
|
||||
assert.NoError(t, err)
|
||||
add := <-addChan
|
||||
update := <-updateChan
|
||||
assert.Equal(t, "A", add)
|
||||
assert.Equal(t, "B", update)
|
||||
assert.Equal(t, "s1", add)
|
||||
assert.Equal(t, "s2", update)
|
||||
}
|
||||
|
||||
func TestFederatedUpdaterError(t *testing.T) {
|
||||
@ -132,9 +132,6 @@ func makeService(cluster, name string) *api_v1.Service {
|
||||
ObjectMeta: api_v1.ObjectMeta{
|
||||
Namespace: "ns1",
|
||||
Name: name,
|
||||
Annotations: map[string]string{
|
||||
ClusterNameAnnotation: cluster,
|
||||
},
|
||||
},
|
||||
}
|
||||
}
|
||||
|
@ -25,24 +25,19 @@ import (
|
||||
)
|
||||
|
||||
// Returns framework.ResourceEventHandlerFuncs that trigger the given function
|
||||
// on all object changes. Preproc perprocessing function is executed before each trigger and chec.
|
||||
func NewTriggerOnAllChangesPreproc(triggerFunc func(pkg_runtime.Object), preproc func(obj pkg_runtime.Object)) *framework.ResourceEventHandlerFuncs {
|
||||
// on all object changes.
|
||||
func NewTriggerOnAllChanges(triggerFunc func(pkg_runtime.Object)) *framework.ResourceEventHandlerFuncs {
|
||||
return &framework.ResourceEventHandlerFuncs{
|
||||
DeleteFunc: func(old interface{}) {
|
||||
oldObj := old.(pkg_runtime.Object)
|
||||
preproc(oldObj)
|
||||
triggerFunc(oldObj)
|
||||
},
|
||||
AddFunc: func(cur interface{}) {
|
||||
curObj := cur.(pkg_runtime.Object)
|
||||
preproc(curObj)
|
||||
triggerFunc(curObj)
|
||||
},
|
||||
UpdateFunc: func(old, cur interface{}) {
|
||||
curObj := cur.(pkg_runtime.Object)
|
||||
oldObj := cur.(pkg_runtime.Object)
|
||||
preproc(curObj)
|
||||
preproc(oldObj)
|
||||
if !reflect.DeepEqual(old, cur) {
|
||||
triggerFunc(curObj)
|
||||
}
|
||||
@ -50,14 +45,9 @@ func NewTriggerOnAllChangesPreproc(triggerFunc func(pkg_runtime.Object), preproc
|
||||
}
|
||||
}
|
||||
|
||||
func NewTriggerOnAllChanges(triggerFunc func(pkg_runtime.Object)) *framework.ResourceEventHandlerFuncs {
|
||||
return NewTriggerOnAllChangesPreproc(triggerFunc, func(obj pkg_runtime.Object) {})
|
||||
}
|
||||
|
||||
// Returns framework.ResourceEventHandlerFuncs that trigger the given function
|
||||
// on object add and delete as well as spec/object meta on update. Preproc preprocessing is executed
|
||||
// before each trigger and check.
|
||||
func NewTriggerOnMetaAndSpecChangesPreproc(triggerFunc func(pkg_runtime.Object), preproc func(obj pkg_runtime.Object)) *framework.ResourceEventHandlerFuncs {
|
||||
// on object add and delete as well as spec/object meta on update.
|
||||
func NewTriggerOnMetaAndSpecChanges(triggerFunc func(pkg_runtime.Object)) *framework.ResourceEventHandlerFuncs {
|
||||
getFieldOrPanic := func(obj interface{}, fieldName string) interface{} {
|
||||
val := reflect.ValueOf(obj).Elem().FieldByName(fieldName)
|
||||
if val.IsValid() {
|
||||
@ -69,19 +59,14 @@ func NewTriggerOnMetaAndSpecChangesPreproc(triggerFunc func(pkg_runtime.Object),
|
||||
return &framework.ResourceEventHandlerFuncs{
|
||||
DeleteFunc: func(old interface{}) {
|
||||
oldObj := old.(pkg_runtime.Object)
|
||||
preproc(oldObj)
|
||||
triggerFunc(oldObj)
|
||||
},
|
||||
AddFunc: func(cur interface{}) {
|
||||
curObj := cur.(pkg_runtime.Object)
|
||||
preproc(curObj)
|
||||
triggerFunc(curObj)
|
||||
},
|
||||
UpdateFunc: func(old, cur interface{}) {
|
||||
curObj := cur.(pkg_runtime.Object)
|
||||
oldObj := cur.(pkg_runtime.Object)
|
||||
preproc(curObj)
|
||||
preproc(oldObj)
|
||||
if !reflect.DeepEqual(getFieldOrPanic(old, "ObjectMeta"), getFieldOrPanic(cur, "ObjectMeta")) ||
|
||||
!reflect.DeepEqual(getFieldOrPanic(old, "Spec"), getFieldOrPanic(cur, "Spec")) {
|
||||
triggerFunc(curObj)
|
||||
@ -89,7 +74,3 @@ func NewTriggerOnMetaAndSpecChangesPreproc(triggerFunc func(pkg_runtime.Object),
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
func TriggerOnMetaAndSpecChanges(triggerFunc func(pkg_runtime.Object)) *framework.ResourceEventHandlerFuncs {
|
||||
return NewTriggerOnAllChangesPreproc(triggerFunc, func(obj pkg_runtime.Object) {})
|
||||
}
|
||||
|
@ -52,19 +52,13 @@ func TestHandlers(t *testing.T) {
|
||||
}
|
||||
}
|
||||
|
||||
trigger := NewTriggerOnAllChangesPreproc(
|
||||
trigger := NewTriggerOnAllChanges(
|
||||
func(obj pkg_runtime.Object) {
|
||||
triggerChan <- struct{}{}
|
||||
},
|
||||
func(obj pkg_runtime.Object) {
|
||||
SetClusterName(obj, "mycluster")
|
||||
})
|
||||
|
||||
trigger.OnAdd(&service)
|
||||
assert.True(t, triggered())
|
||||
name, err := GetClusterName(&service)
|
||||
assert.NoError(t, err)
|
||||
assert.Equal(t, "mycluster", name)
|
||||
trigger.OnDelete(&service)
|
||||
assert.True(t, triggered())
|
||||
trigger.OnUpdate(&service, &service)
|
||||
@ -72,20 +66,14 @@ func TestHandlers(t *testing.T) {
|
||||
trigger.OnUpdate(&service, &service2)
|
||||
assert.True(t, triggered())
|
||||
|
||||
trigger2 := NewTriggerOnMetaAndSpecChangesPreproc(
|
||||
trigger2 := NewTriggerOnMetaAndSpecChanges(
|
||||
func(obj pkg_runtime.Object) {
|
||||
triggerChan <- struct{}{}
|
||||
},
|
||||
func(obj pkg_runtime.Object) {
|
||||
SetClusterName(obj, "mycluster")
|
||||
})
|
||||
)
|
||||
|
||||
service.Annotations = make(map[string]string)
|
||||
trigger2.OnAdd(&service)
|
||||
assert.True(t, triggered())
|
||||
name, err = GetClusterName(&service)
|
||||
assert.NoError(t, err)
|
||||
assert.Equal(t, "mycluster", name)
|
||||
trigger2.OnDelete(&service)
|
||||
assert.True(t, triggered())
|
||||
trigger2.OnUpdate(&service, &service)
|
||||
|
Loading…
Reference in New Issue
Block a user