diff --git a/federation/pkg/federation-controller/util/federated_object.go b/federation/pkg/federation-controller/util/federated_object.go new file mode 100644 index 00000000000..76b0dde5cce --- /dev/null +++ b/federation/pkg/federation-controller/util/federated_object.go @@ -0,0 +1,59 @@ +/* +Copyright 2016 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package util + +import ( + "fmt" + + "k8s.io/kubernetes/pkg/api/meta" + pkg_runtime "k8s.io/kubernetes/pkg/runtime" +) + +const ( + //TODO: This will be removed once cluster name field is added to ObjectMeta. + ClusterNameAnnotation = "federation.io/name" +) + +// TODO: This will be refactored once cluster name field is added to ObjectMeta. +func GetClusterName(obj pkg_runtime.Object) (string, error) { + accessor, err := meta.Accessor(obj) + if err != nil { + return "", err + } + annotations := accessor.GetAnnotations() + if annotations != nil { + if value, found := annotations[ClusterNameAnnotation]; found { + return value, nil + } + } + return "", fmt.Errorf("Cluster information not available") +} + +// TODO: This will be removed once cluster name field is added to ObjectMeta. +func SetClusterName(obj pkg_runtime.Object, clusterName string) error { + accessor, err := meta.Accessor(obj) + if err != nil { + return err + } + annotations := accessor.GetAnnotations() + if annotations == nil { + annotations = make(map[string]string) + accessor.SetAnnotations(annotations) + } + annotations[ClusterNameAnnotation] = clusterName + return nil +} diff --git a/federation/pkg/federation-controller/util/federated_object_test.go b/federation/pkg/federation-controller/util/federated_object_test.go new file mode 100644 index 00000000000..51bf3e92fa4 --- /dev/null +++ b/federation/pkg/federation-controller/util/federated_object_test.go @@ -0,0 +1,55 @@ +/* +Copyright 2016 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package util + +import ( + "testing" + + api_v1 "k8s.io/kubernetes/pkg/api/v1" + + "github.com/stretchr/testify/assert" +) + +func TestGetClusterName(t *testing.T) { + // There is a single service ns1/s1 in cluster mycluster. + service := api_v1.Service{ + ObjectMeta: api_v1.ObjectMeta{ + Namespace: "ns1", + Name: "s1", + Annotations: map[string]string{ + ClusterNameAnnotation: "mycluster", + }, + }, + } + name, err := GetClusterName(&service) + assert.NoError(t, err) + assert.Equal(t, "mycluster", name) +} + +func TestSetClusterName(t *testing.T) { + // There is a single service ns1/s1 in cluster mycluster. + service := api_v1.Service{ + ObjectMeta: api_v1.ObjectMeta{ + Namespace: "ns1", + Name: "s1", + }, + } + err := SetClusterName(&service, "mytestname") + assert.NoError(t, err) + clusterName := service.Annotations[ClusterNameAnnotation] + assert.Equal(t, "mytestname", clusterName) +} diff --git a/federation/pkg/federation-controller/util/federated_updater.go b/federation/pkg/federation-controller/util/federated_updater.go new file mode 100644 index 00000000000..f1e640582d8 --- /dev/null +++ b/federation/pkg/federation-controller/util/federated_updater.go @@ -0,0 +1,116 @@ +/* +Copyright 2016 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package util + +import ( + "fmt" + "time" + + federation_release_1_4 "k8s.io/kubernetes/federation/client/clientset_generated/federation_release_1_4" + pkg_runtime "k8s.io/kubernetes/pkg/runtime" +) + +// Type of the operation that can be executed in Federated. +type FederatedOperationType string + +const ( + OperationTypeAdd = "add" + OperationTypeUpdate = "update" + OperationTypeDelete = "delete" +) + +// FederatedOperation definition contains type (add/update/delete) and the object itself. +type FederatedOperation struct { + Type FederatedOperationType + Obj pkg_runtime.Object +} + +// A helper that executes the given set of updates on federation, in parallel. +type FederatedUpdater interface { + // Executes the given set of operations within the specified timeout. + // Timeout is best-effort. There is no guarantee that the underlying operations are + // stopped when it is reached. However the function will return after the timeout + // with a non-nil error. + Update([]FederatedOperation, time.Duration) error +} + +// A function that executes some operation using the passed client and object. +type FederatedOperationHandler func(federation_release_1_4.Interface, pkg_runtime.Object) error + +type federatedUpdaterImpl struct { + federation FederationView + + addFunction FederatedOperationHandler + updateFunction FederatedOperationHandler + deleteFunction FederatedOperationHandler +} + +func NewFederatedUpdater(federation FederationView, add, update, del FederatedOperationHandler) FederatedUpdater { + return &federatedUpdaterImpl{ + federation: federation, + addFunction: add, + updateFunction: update, + deleteFunction: del, + } +} + +func (fu *federatedUpdaterImpl) Update(ops []FederatedOperation, timeout time.Duration) error { + done := make(chan error, len(ops)) + for _, op := range ops { + go func(op FederatedOperation) { + clusterName, err := GetClusterName(op.Obj) + if err != nil { + done <- err + return + } + + // TODO: Ensure that the clientset has reasonable timeout. + clientset, err := fu.federation.GetClientsetForCluster(clusterName) + if err != nil { + done <- err + return + } + + switch op.Type { + case OperationTypeAdd: + err = fu.addFunction(clientset, op.Obj) + case OperationTypeUpdate: + err = fu.updateFunction(clientset, op.Obj) + case OperationTypeDelete: + err = fu.deleteFunction(clientset, op.Obj) + } + done <- err + }(op) + } + start := time.Now() + for i := 0; i < len(ops); i++ { + now := time.Now() + if !now.Before(start.Add(timeout)) { + return fmt.Errorf("failed to finish all operations in %v", timeout) + } + select { + case err := <-done: + if err != nil { + return err + } + case <-time.After(start.Add(timeout).Sub(now)): + return fmt.Errorf("failed to finish all operations in %v", timeout) + } + } + // All operations finished in time. + return nil +} diff --git a/federation/pkg/federation-controller/util/federated_updater_test.go b/federation/pkg/federation-controller/util/federated_updater_test.go new file mode 100644 index 00000000000..6e29902befd --- /dev/null +++ b/federation/pkg/federation-controller/util/federated_updater_test.go @@ -0,0 +1,144 @@ +/* +Copyright 2016 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package util + +import ( + "fmt" + "testing" + "time" + + federation_api "k8s.io/kubernetes/federation/apis/federation/v1beta1" + federation_release_1_4 "k8s.io/kubernetes/federation/client/clientset_generated/federation_release_1_4" + fake_federation_release_1_4 "k8s.io/kubernetes/federation/client/clientset_generated/federation_release_1_4/fake" + api_v1 "k8s.io/kubernetes/pkg/api/v1" + pkg_runtime "k8s.io/kubernetes/pkg/runtime" + + "github.com/stretchr/testify/assert" +) + +// Fake federation view. +type fakeFederationView struct { +} + +func (f fakeFederationView) GetClientsetForCluster(clusterName string) (federation_release_1_4.Interface, error) { + return &fake_federation_release_1_4.Clientset{}, nil +} + +func (f *fakeFederationView) GetReadyClusters() ([]*federation_api.Cluster, error) { + return []*federation_api.Cluster{}, nil +} + +func (f *fakeFederationView) GetReadyCluster(name string) (*federation_api.Cluster, bool, error) { + return nil, false, nil +} + +func (f *fakeFederationView) ClustersSynced() bool { + return true +} + +func TestFederatedUpdaterOK(t *testing.T) { + addChan := make(chan string, 5) + updateChan := make(chan string, 5) + + updater := NewFederatedUpdater(&fakeFederationView{}, + func(_ federation_release_1_4.Interface, obj pkg_runtime.Object) error { + clusterName, _ := GetClusterName(obj) + addChan <- clusterName + return nil + }, + func(_ federation_release_1_4.Interface, obj pkg_runtime.Object) error { + clusterName, _ := GetClusterName(obj) + updateChan <- clusterName + return nil + }, + noop) + + err := updater.Update([]FederatedOperation{ + { + Type: OperationTypeAdd, + Obj: makeService("A", "s1"), + }, + { + Type: OperationTypeUpdate, + Obj: makeService("B", "s1"), + }, + }, time.Minute) + assert.NoError(t, err) + add := <-addChan + update := <-updateChan + assert.Equal(t, "A", add) + assert.Equal(t, "B", update) +} + +func TestFederatedUpdaterError(t *testing.T) { + updater := NewFederatedUpdater(&fakeFederationView{}, + func(_ federation_release_1_4.Interface, obj pkg_runtime.Object) error { + return fmt.Errorf("boom") + }, noop, noop) + + err := updater.Update([]FederatedOperation{ + { + Type: OperationTypeAdd, + Obj: makeService("A", "s1"), + }, + { + Type: OperationTypeUpdate, + Obj: makeService("B", "s1"), + }, + }, time.Minute) + assert.Error(t, err) +} + +func TestFederatedUpdaterTimeout(t *testing.T) { + start := time.Now() + updater := NewFederatedUpdater(&fakeFederationView{}, + func(_ federation_release_1_4.Interface, obj pkg_runtime.Object) error { + time.Sleep(time.Minute) + return nil + }, + noop, noop) + + err := updater.Update([]FederatedOperation{ + { + Type: OperationTypeAdd, + Obj: makeService("A", "s1"), + }, + { + Type: OperationTypeUpdate, + Obj: makeService("B", "s1"), + }, + }, time.Second) + end := time.Now() + assert.Error(t, err) + assert.True(t, start.Add(10*time.Second).After(end)) +} + +func makeService(cluster, name string) *api_v1.Service { + return &api_v1.Service{ + ObjectMeta: api_v1.ObjectMeta{ + Namespace: "ns1", + Name: name, + Annotations: map[string]string{ + ClusterNameAnnotation: cluster, + }, + }, + } +} + +func noop(_ federation_release_1_4.Interface, _ pkg_runtime.Object) error { + return nil +}