diff --git a/federation/pkg/federation-controller/util/eventsink/eventsink.go b/federation/pkg/federation-controller/util/eventsink/eventsink.go new file mode 100644 index 00000000000..83e9924c4c2 --- /dev/null +++ b/federation/pkg/federation-controller/util/eventsink/eventsink.go @@ -0,0 +1,72 @@ +/* +Copyright 2016 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package eventsink + +import ( + federation_release_1_4 "k8s.io/kubernetes/federation/client/clientset_generated/federation_release_1_4" + api "k8s.io/kubernetes/pkg/api" + api_v1 "k8s.io/kubernetes/pkg/api/v1" + "k8s.io/kubernetes/pkg/client/record" +) + +// Implemnts k8s.io/kubernetes/pkg/client/record.EventSink. +type FederatedEventSink struct { + clientset federation_release_1_4.Interface +} + +// To check if all required functions are implemented. +var _ record.EventSink = &FederatedEventSink{} + +func NewFederatedEventSink(clientset federation_release_1_4.Interface) *FederatedEventSink { + return &FederatedEventSink{ + clientset: clientset, + } +} + +func (fes *FederatedEventSink) Create(event *api.Event) (*api.Event, error) { + return fes.executeOperation(event, func(eventV1 *api_v1.Event) (*api_v1.Event, error) { + return fes.clientset.Core().Events(event.Namespace).Create(eventV1) + }) +} + +func (fes *FederatedEventSink) Update(event *api.Event) (*api.Event, error) { + return fes.executeOperation(event, func(eventV1 *api_v1.Event) (*api_v1.Event, error) { + return fes.clientset.Core().Events(event.Namespace).Update(eventV1) + }) +} + +func (fes *FederatedEventSink) Patch(event *api.Event, data []byte) (*api.Event, error) { + return fes.executeOperation(event, func(eventV1 *api_v1.Event) (*api_v1.Event, error) { + return fes.clientset.Core().Events(event.Namespace).Patch(event.Name, api.StrategicMergePatchType, data) + }) +} + +func (fes *FederatedEventSink) executeOperation(event *api.Event, operation func(*api_v1.Event) (*api_v1.Event, error)) (*api.Event, error) { + var versionedEvent api_v1.Event + if err := api.Scheme.Convert(event, &versionedEvent, nil); err != nil { + return nil, err + } + versionedEventPtr, err := operation(&versionedEvent) + if err != nil { + return nil, err + } + var unversionedEvent api.Event + if err := api.Scheme.Convert(versionedEventPtr, &unversionedEvent, nil); err != nil { + return nil, err + } + return &unversionedEvent, nil +} diff --git a/federation/pkg/federation-controller/util/eventsink/eventsink_test.go b/federation/pkg/federation-controller/util/eventsink/eventsink_test.go new file mode 100644 index 00000000000..8e625dbc8a9 --- /dev/null +++ b/federation/pkg/federation-controller/util/eventsink/eventsink_test.go @@ -0,0 +1,71 @@ +/* +Copyright 2016 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package eventsink + +import ( + "testing" + + fake_federation_release_1_4 "k8s.io/kubernetes/federation/client/clientset_generated/federation_release_1_4/fake" + . "k8s.io/kubernetes/federation/pkg/federation-controller/util/test" + api "k8s.io/kubernetes/pkg/api" + api_v1 "k8s.io/kubernetes/pkg/api/v1" + "k8s.io/kubernetes/pkg/client/testing/core" + "k8s.io/kubernetes/pkg/runtime" + + "github.com/stretchr/testify/assert" +) + +func TestEventSink(t *testing.T) { + fakeFederationClient := &fake_federation_release_1_4.Clientset{} + createdChan := make(chan runtime.Object, 100) + fakeFederationClient.AddReactor("create", "events", func(action core.Action) (bool, runtime.Object, error) { + createAction := action.(core.CreateAction) + obj := createAction.GetObject() + createdChan <- obj + return true, obj, nil + }) + updateChan := make(chan runtime.Object, 100) + fakeFederationClient.AddReactor("update", "events", func(action core.Action) (bool, runtime.Object, error) { + updateAction := action.(core.UpdateAction) + obj := updateAction.GetObject() + updateChan <- obj + return true, obj, nil + }) + + event := api.Event{ + ObjectMeta: api.ObjectMeta{ + Name: "bzium", + Namespace: "ns", + }, + } + sink := NewFederatedEventSink(fakeFederationClient) + eventUpdated, err := sink.Create(&event) + assert.NoError(t, err) + eventV1 := GetObjectFromChan(createdChan).(*api_v1.Event) + assert.NotNil(t, eventV1) + // Just some simple sanity checks. + assert.Equal(t, event.Name, eventV1.Name) + assert.Equal(t, event.Name, eventUpdated.Name) + + eventUpdated, err = sink.Update(&event) + assert.NoError(t, err) + eventV1 = GetObjectFromChan(updateChan).(*api_v1.Event) + assert.NotNil(t, eventV1) + // Just some simple sanity checks. + assert.Equal(t, event.Name, eventV1.Name) + assert.Equal(t, event.Name, eventUpdated.Name) +}