Events for federated ingress controller

This commit is contained in:
Marcin Wielgus 2016-08-30 23:52:35 +02:00
parent 3fd14d97fb
commit e850d4f0a2
2 changed files with 21 additions and 1 deletions

View File

@ -23,10 +23,12 @@ import (
federation_api "k8s.io/kubernetes/federation/apis/federation/v1beta1"
federation_release_1_4 "k8s.io/kubernetes/federation/client/clientset_generated/federation_release_1_4"
"k8s.io/kubernetes/federation/pkg/federation-controller/util"
"k8s.io/kubernetes/federation/pkg/federation-controller/util/eventsink"
"k8s.io/kubernetes/pkg/api"
extensions_v1beta1 "k8s.io/kubernetes/pkg/apis/extensions/v1beta1"
"k8s.io/kubernetes/pkg/client/cache"
kube_release_1_4 "k8s.io/kubernetes/pkg/client/clientset_generated/release_1_4"
"k8s.io/kubernetes/pkg/client/record"
"k8s.io/kubernetes/pkg/controller"
"k8s.io/kubernetes/pkg/controller/framework"
pkg_runtime "k8s.io/kubernetes/pkg/runtime"
@ -67,6 +69,9 @@ type IngressController struct {
// Backoff manager for ingresses
ingressBackoff *flowcontrol.Backoff
// For events
eventRecorder record.EventRecorder
ingressReviewDelay time.Duration
clusterAvailableDelay time.Duration
smallDelay time.Duration
@ -75,6 +80,10 @@ type IngressController struct {
// NewIngressController returns a new ingress controller
func NewIngressController(client federation_release_1_4.Interface) *IngressController {
broadcaster := record.NewBroadcaster()
broadcaster.StartRecordingToSink(eventsink.NewFederatedEventSink(client))
recorder := broadcaster.NewRecorder(api.EventSource{Component: "federated-ingress-controller"})
ic := &IngressController{
federatedApiClient: client,
ingressReviewDelay: time.Second * 10,
@ -82,6 +91,7 @@ func NewIngressController(client federation_release_1_4.Interface) *IngressContr
smallDelay: time.Second * 3,
updateTimeout: time.Second * 30,
ingressBackoff: flowcontrol.NewBackOff(5*time.Second, time.Minute),
eventRecorder: recorder,
}
// Build deliverers for triggering reconcilations.
@ -287,6 +297,9 @@ func (ic *IngressController) reconcileIngress(ingress types.NamespacedName) {
// We can't supply server-created fields when creating a new object.
desiredIngress.ObjectMeta.ResourceVersion = ""
desiredIngress.ObjectMeta.UID = ""
ic.eventRecorder.Eventf(baseIngress, api.EventTypeNormal, "CreateInCluster",
"Creating ingress in cluster %s", cluster.Name)
// We always first create an ingress in the first available cluster. Once that ingress
// has been created and allocated a global IP (visible via an annotation),
// we record that annotation on the federated ingress, and create all other cluster
@ -332,6 +345,9 @@ func (ic *IngressController) reconcileIngress(ingress types.NamespacedName) {
for key, val := range baseIngress.ObjectMeta.Annotations {
desiredIngress.ObjectMeta.Annotations[key] = val
}
ic.eventRecorder.Eventf(baseIngress, api.EventTypeNormal, "UpdateCluster",
"Updating ingress in cluster %s", cluster.Name)
operations = append(operations, util.FederatedOperation{
Type: util.OperationTypeUpdate,
Obj: desiredIngress,
@ -346,7 +362,10 @@ func (ic *IngressController) reconcileIngress(ingress types.NamespacedName) {
return
}
glog.V(4).Infof("Calling federatedUpdater.Update() - operations: %v", operations)
err = ic.federatedUpdater.Update(operations, ic.updateTimeout)
err = ic.federatedUpdater.UpdateWithOnError(operations, ic.updateTimeout, func(op util.FederatedOperation, operror error) {
ic.eventRecorder.Eventf(baseIngress, api.EventTypeNormal, "FailedClusterUpdate",
"Update ingress in cluster %s failed: %v", op.ClusterName, operror)
})
if err != nil {
glog.Errorf("Failed to execute updates for %s: %v", ingress, err)
ic.deliverIngress(ingress, ic.ingressReviewDelay, true)

View File

@ -79,6 +79,7 @@ func TestIngressController(t *testing.T) {
ObjectMeta: api_v1.ObjectMeta{
Name: "test-ingress",
Namespace: "mynamespace",
SelfLink: "/api/v1/namespaces/mynamespaces/ingress/test-ingress",
},
}