From 231e48e9f39e621eee5cbc01e5e9663d1887f7a2 Mon Sep 17 00:00:00 2001 From: Yassine TIJANI Date: Tue, 3 Jul 2018 22:25:17 +0200 Subject: [PATCH] Implementing logic for v1beta1.Event API Signed-off-by: Yassine TIJANI Kubernetes-commit: 464a994a10b71c45583f3426fd970291f8a5b756 --- go.mod | 21 ++- go.sum | 34 ++-- tools/events/event_broadcaster.go | 285 ++++++++++++++++++++++++++++++ tools/events/event_recorder.go | 89 ++++++++++ tools/events/eventseries_test.go | 208 ++++++++++++++++++++++ tools/events/interfaces.go | 58 ++++++ 6 files changed, 663 insertions(+), 32 deletions(-) create mode 100644 tools/events/event_broadcaster.go create mode 100644 tools/events/event_recorder.go create mode 100644 tools/events/eventseries_test.go create mode 100644 tools/events/interfaces.go diff --git a/go.mod b/go.mod index 177618a6..bc20bde6 100644 --- a/go.mod +++ b/go.mod @@ -5,11 +5,12 @@ module k8s.io/client-go go 1.12 require ( - github.com/Azure/go-autorest v11.1.2+incompatible + cloud.google.com/go v0.0.0-20160913182117-3b1ae45394a2 // indirect + github.com/Azure/go-autorest v11.1.0+incompatible github.com/davecgh/go-spew v1.1.1 github.com/dgrijalva/jwt-go v0.0.0-20160705203006-01aeca54ebda // indirect github.com/evanphx/json-patch v0.0.0-20190203023257-5858425f7550 - github.com/gogo/protobuf v0.0.0-20171007142547-342cbe0a0415 + github.com/gogo/protobuf v1.1.1 github.com/golang/groupcache v0.0.0-20160516000752-02826c3e7903 github.com/golang/protobuf v1.2.0 github.com/google/btree v0.0.0-20160524151835-7d79101e329e // indirect @@ -23,20 +24,22 @@ require ( github.com/stretchr/testify v1.2.2 golang.org/x/crypto v0.0.0-20181025213731-e84da0312774 golang.org/x/net v0.0.0-20190206173232-65e2d4e15006 - golang.org/x/oauth2 v0.0.0-20190402181905-9f3314589c9a + golang.org/x/oauth2 v0.0.0-20170412232759-a6bd8cefa181 golang.org/x/time v0.0.0-20161028155119-f51c12702a4d google.golang.org/appengine v1.5.0 // indirect - k8s.io/api v0.0.0-20190515023547-db5a9d1c40eb - k8s.io/apimachinery v0.0.0-20190515023456-b74e4c97951f - k8s.io/klog v0.3.0 + k8s.io/api v0.0.0 + k8s.io/apimachinery v0.0.0 + k8s.io/klog v0.0.0-20190306015804-8e90cee79f82 k8s.io/utils v0.0.0-20190221042446-c2654d5206da sigs.k8s.io/yaml v1.1.0 ) replace ( - golang.org/x/sync => golang.org/x/sync v0.0.0-20181108010431-42b317875d0f + github.com/gogo/protobuf => github.com/gogo/protobuf v0.0.0-20171007142547-342cbe0a0415 + github.com/onsi/ginkgo => github.com/onsi/ginkgo v0.0.0-20170318221715-67b9df7f55fe golang.org/x/sys => golang.org/x/sys v0.0.0-20190209173611-3b5209105503 golang.org/x/tools => golang.org/x/tools v0.0.0-20190313210603-aa82965741a9 - k8s.io/api => k8s.io/api v0.0.0-20190515023547-db5a9d1c40eb - k8s.io/apimachinery => k8s.io/apimachinery v0.0.0-20190515023456-b74e4c97951f + k8s.io/api => ../api + k8s.io/apimachinery => ../apimachinery + k8s.io/client-go => ../client-go ) diff --git a/go.sum b/go.sum index 538b4678..e050deb3 100644 --- a/go.sum +++ b/go.sum @@ -1,7 +1,7 @@ -cloud.google.com/go v0.34.0 h1:eOI3/cP2VTU6uZLDYAoic+eyzzB9YyGmJ7eIjl8rOPg= -cloud.google.com/go v0.34.0/go.mod h1:aQUYkXzVsufM+DwF1aE+0xfcU+56JwCaLick0ClmMTw= -github.com/Azure/go-autorest v11.1.2+incompatible h1:viZ3tV5l4gE2Sw0xrasFHytCGtzYCrT+um/rrSQ1BfA= -github.com/Azure/go-autorest v11.1.2+incompatible/go.mod h1:r+4oMnoxhatjLLJ6zxSWATqVooLgysK6ZNox3g/xq24= +cloud.google.com/go v0.0.0-20160913182117-3b1ae45394a2 h1:BD5IDvMK5RQqnDi7Fbizwoad9Uus/Hs/xJ1zMYVXlS8= +cloud.google.com/go v0.0.0-20160913182117-3b1ae45394a2/go.mod h1:aQUYkXzVsufM+DwF1aE+0xfcU+56JwCaLick0ClmMTw= +github.com/Azure/go-autorest v11.1.0+incompatible h1:9DfMsQdUMEtg1jKRTjtkNZsvOuZXJOMl4dN1kiQwAc8= +github.com/Azure/go-autorest v11.1.0+incompatible/go.mod h1:r+4oMnoxhatjLLJ6zxSWATqVooLgysK6ZNox3g/xq24= github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/dgrijalva/jwt-go v0.0.0-20160705203006-01aeca54ebda h1:NyywMz59neOoVRFDz+ccfKWxn784fiHMDnZSy6T+JXY= @@ -12,7 +12,6 @@ github.com/elazarl/goproxy v0.0.0-20170405201442-c4fc26588b6e h1:p1yVGRW3nmb85p1 github.com/elazarl/goproxy v0.0.0-20170405201442-c4fc26588b6e/go.mod h1:/Zj4wYkgs4iZTTu3o/KG3Itv/qCCa8VVMlb3i9OVuzc= github.com/evanphx/json-patch v0.0.0-20190203023257-5858425f7550 h1:mV9jbLoSW/8m4VK16ZkHTozJa8sesK5u5kTMFysTYac= github.com/evanphx/json-patch v0.0.0-20190203023257-5858425f7550/go.mod h1:50XU6AFN0ol/bzJsmQLiYLvXMP4fmwYFNcr97nuDLSk= -github.com/fsnotify/fsnotify v1.4.7 h1:IXs+QLmnXW2CcXuY+8Mzv/fWEsPGWxqefPtCP5CnV9I= github.com/fsnotify/fsnotify v1.4.7/go.mod h1:jwhsz4b93w/PPRr/qN1Yymfu8t87LnFCMoQvtojpjFo= github.com/gogo/protobuf v0.0.0-20171007142547-342cbe0a0415 h1:WSBJMqJbLxsn+bTCPyPYZfqHdJmc8MK4wrBjMft6BAM= github.com/gogo/protobuf v0.0.0-20171007142547-342cbe0a0415/go.mod h1:r8qH/GZQm5c6nD/R0oafs1akxWv10x8SbQlK7atdtwQ= @@ -22,10 +21,9 @@ github.com/golang/protobuf v1.2.0 h1:P3YflyNX/ehuJFLhxviNdFxQPkGK5cDcApsge1SqnvM github.com/golang/protobuf v1.2.0/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U= github.com/google/btree v0.0.0-20160524151835-7d79101e329e h1:JHB7F/4TJCrYBW8+GZO8VkWDj1jxcWuCl6uxKODiyi4= github.com/google/btree v0.0.0-20160524151835-7d79101e329e/go.mod h1:lNA+9X1NB3Zf8V7Ke586lFgjr2dZNuvo3lPJSGZ5JPQ= -github.com/google/go-cmp v0.3.0 h1:crn/baboCvb5fXaQ0IJ1SGTsTVrWpDsCWC8EGETZijY= -github.com/google/go-cmp v0.3.0/go.mod h1:8QqcDgzrUqlUb/G2PQTWiueGozuR1884gddMywk6iLU= github.com/google/gofuzz v0.0.0-20170612174753-24818f796faf h1:+RRA9JqSOZFfKrOeqr2z77+8R2RKyh8PG66dcu1V0ck= github.com/google/gofuzz v0.0.0-20170612174753-24818f796faf/go.mod h1:HP5RmnzzSNb993RKQDq4+1A4ia9nllfqcQFTQJedwGI= +github.com/google/uuid v1.0.0 h1:b4Gk+7WdP/d3HZH8EJsZpvV7EtDOgaZLtnaNGIu1adA= github.com/google/uuid v1.0.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= github.com/googleapis/gnostic v0.0.0-20170729233727-0c5108395e2d h1:7XGaL1e6bYS1yIonGp9761ExpPPV1ui0SAC59Yube9k= github.com/googleapis/gnostic v0.0.0-20170729233727-0c5108395e2d/go.mod h1:sJBsCZ4ayReDTBIg8b9dl28c5xFWyhBTVRp3pOg5EKY= @@ -35,7 +33,6 @@ github.com/gregjones/httpcache v0.0.0-20170728041850-787624de3eb7 h1:6TSoaYExHpe github.com/gregjones/httpcache v0.0.0-20170728041850-787624de3eb7/go.mod h1:FecbI9+v66THATjSRHfNgh1IVFe/9kFxbXtjV0ctIMA= github.com/hashicorp/golang-lru v0.5.0 h1:CL2msUPvZTLb5O648aiLNJw3hnBxN2+1Jq8rCOH9wdo= github.com/hashicorp/golang-lru v0.5.0/go.mod h1:/m3WP610KZHVQ1SGc6re/UDhFvYD7pJ4Ao+sR/qLZy8= -github.com/hpcloud/tail v1.0.0 h1:nfCOvKYfkgYP8hkirhJocXT2+zOD8yUNjXaWfTlyFKI= github.com/hpcloud/tail v1.0.0/go.mod h1:ab1qPbhIpdTxEkNHXyeSf5vhxWSCs/tWer42PpOxQnU= github.com/imdario/mergo v0.3.5 h1:JboBksRwiiAJWvIYJVo46AfV+IAIKZpfrSzVKj42R4Q= github.com/imdario/mergo v0.3.5/go.mod h1:2EnlNZ0deacrJVfApfmtdGgDfMuh/nq6Ok1EcJh5FfA= @@ -46,8 +43,8 @@ github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd/go.mod h1:6dJ github.com/modern-go/reflect2 v1.0.1 h1:9f412s+6RmYXLWZSEzVVgPGK7C2PphHj5RJrvfx9AWI= github.com/modern-go/reflect2 v1.0.1/go.mod h1:bx2lNnkwVCuqBIxFjflWJWanXIb3RllmbCylyMrvgv0= github.com/mxk/go-flowrate v0.0.0-20140419014527-cca7078d478f/go.mod h1:ZdcZmHo+o7JKHSa8/e818NopupXU1YMK5fe1lsApnBw= -github.com/onsi/ginkgo v1.6.0 h1:Ix8l273rp3QzYgXSR+c8d1fTG7UPgYkOSELPhiY/YGw= -github.com/onsi/ginkgo v1.6.0/go.mod h1:lLunBs/Ym6LB5Z9jYTR76FiuTmxDTDusOGeTQH+WWjE= +github.com/onsi/ginkgo v0.0.0-20170318221715-67b9df7f55fe h1:QAfc0vzfRhbwvVHr25DYiE53mBfvVWgE2gxyzhqor08= +github.com/onsi/ginkgo v0.0.0-20170318221715-67b9df7f55fe/go.mod h1:lLunBs/Ym6LB5Z9jYTR76FiuTmxDTDusOGeTQH+WWjE= github.com/onsi/gomega v0.0.0-20190113212917-5533ce8a0da3 h1:EooPXg51Tn+xmWPXJUGCnJhJSpeuMlBmfJVcqIRmmv8= github.com/onsi/gomega v0.0.0-20190113212917-5533ce8a0da3/go.mod h1:ex+gbHU/CVuBBDIJjb2X0qEXbFg53c61hWP/1CpauHY= github.com/peterbourgon/diskv v2.0.1+incompatible h1:UBdAOUP5p4RWqPBg048CAvpKN+vxiaj6gdUUzhl4XmI= @@ -60,40 +57,31 @@ github.com/stretchr/testify v1.2.2 h1:bSDNvY7ZPG5RlJ8otE/7V6gMiyenm9RtJ7IUVIAoJ1 github.com/stretchr/testify v1.2.2/go.mod h1:a8OnRcib4nhh0OaRAV+Yts87kKdq0PP7pXfy6kDkUVs= golang.org/x/crypto v0.0.0-20181025213731-e84da0312774 h1:a4tQYYYuK9QdeO/+kEvNYyuR21S+7ve5EANok6hABhI= golang.org/x/crypto v0.0.0-20181025213731-e84da0312774/go.mod h1:6SG95UA2DQfeDnfUPMdvaQW0Q7yPrPDi9nlGo2tz2b4= -golang.org/x/net v0.0.0-20180724234803-3673e40ba225/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4= -golang.org/x/net v0.0.0-20180906233101-161cd47e91fd/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4= -golang.org/x/net v0.0.0-20190108225652-1e06a53dbb7e/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4= golang.org/x/net v0.0.0-20190206173232-65e2d4e15006 h1:bfLnR+k0tq5Lqt6dflRLcZiz6UaXCMt3vhYJ1l4FQ80= golang.org/x/net v0.0.0-20190206173232-65e2d4e15006/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4= -golang.org/x/oauth2 v0.0.0-20190402181905-9f3314589c9a h1:tImsplftrFpALCYumobsd0K86vlAs/eXGFms2txfJfA= -golang.org/x/oauth2 v0.0.0-20190402181905-9f3314589c9a/go.mod h1:gOpvHmFTYa4IltrdGE7lF6nIHvwfUNPOp7c8zoXwtLw= +golang.org/x/oauth2 v0.0.0-20170412232759-a6bd8cefa181 h1:/4OaQ4bC66Oq9JDhUnxTjBGt8XBhDuwgMRXHgvfcCUY= +golang.org/x/oauth2 v0.0.0-20170412232759-a6bd8cefa181/go.mod h1:N/0e6XlmueqKjAGxoOufVs8QHGRruUQn6yWY3a++T0U= golang.org/x/sync v0.0.0-20181108010431-42b317875d0f h1:Bl/8QSvNqXvPGPGXa2z5xUTmV7VDcZyvRZ+QQXkXTZQ= golang.org/x/sync v0.0.0-20181108010431-42b317875d0f/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sys v0.0.0-20190209173611-3b5209105503 h1:5SvYFrOM3W8Mexn9/oA44Ji7vhXAZQ9hiP+1Q/DMrWg= golang.org/x/sys v0.0.0-20190209173611-3b5209105503/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= -golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= golang.org/x/text v0.3.1-0.20181227161524-e6919f6577db h1:6/JqlYfC1CCaLnGceQTI+sDGhC9UBSPAsBqI0Gun6kU= golang.org/x/text v0.3.1-0.20181227161524-e6919f6577db/go.mod h1:bEr9sfX3Q8Zfm5fL9x+3itogRgK3+ptLWKqgva+5dAk= golang.org/x/time v0.0.0-20161028155119-f51c12702a4d h1:TnM+PKb3ylGmZvyPXmo9m/wktg7Jn/a/fNmr33HSj8g= golang.org/x/time v0.0.0-20161028155119-f51c12702a4d/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ= golang.org/x/tools v0.0.0-20190313210603-aa82965741a9/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= -google.golang.org/appengine v1.4.0/go.mod h1:xpcJRLb0r/rnEns0DIKYYv+WjYCduHsrkT7/EB5XEv4= google.golang.org/appengine v1.5.0 h1:KxkO13IPW4Lslp2bz+KHP2E3gtFlrIGNThxkZQ3g+4c= google.golang.org/appengine v1.5.0/go.mod h1:xpcJRLb0r/rnEns0DIKYYv+WjYCduHsrkT7/EB5XEv4= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405 h1:yhCVgyC4o1eVCa2tZl7eS0r+SDo693bJlVdllGtEeKM= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= -gopkg.in/fsnotify.v1 v1.4.7 h1:xOHLXZwVvI9hhs+cLKq5+I5onOuwQLhQwiu63xxlHs4= gopkg.in/fsnotify.v1 v1.4.7/go.mod h1:Tz8NjZHkW78fSQdbUxIjBTcgA1z1m8ZHf0WmKUhAMys= gopkg.in/inf.v0 v0.9.0 h1:3zYtXIO92bvsdS3ggAdA8Gb4Azj0YU+TVY1uGYNFA8o= gopkg.in/inf.v0 v0.9.0/go.mod h1:cWUDdTG/fYaXco+Dcufb5Vnc6Gp2YChqWtbxRZE0mXw= -gopkg.in/tomb.v1 v1.0.0-20141024135613-dd632973f1e7 h1:uRGJdciOHaEIrze2W8Q3AKkepLTh2hOroT7a+7czfdQ= gopkg.in/tomb.v1 v1.0.0-20141024135613-dd632973f1e7/go.mod h1:dt/ZhP58zS4L8KSrWDmTeBkI65Dw0HsyUHuEVlX15mw= gopkg.in/yaml.v2 v2.2.1 h1:mUhvW9EsL+naU5Q3cakzfE91YhliOondGd6ZrsDBHQE= gopkg.in/yaml.v2 v2.2.1/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= -k8s.io/api v0.0.0-20190515023547-db5a9d1c40eb/go.mod h1:fbdFiGtx7GQ3+vkBAYto3QsSImiYIJdpH3YfaclST/U= -k8s.io/apimachinery v0.0.0-20190515023456-b74e4c97951f/go.mod h1:Ew3b/24/JSgJdn4RsnrLskv3LvMZDlZ1Fl1xopsJftY= -k8s.io/klog v0.3.0 h1:0VPpR+sizsiivjIfIAQH/rl8tan6jvWkS7lU+0di3lE= -k8s.io/klog v0.3.0/go.mod h1:Gq+BEi5rUBO/HRz0bTSXDUcqjScdoY3a9IHpCEIOOfk= +k8s.io/klog v0.0.0-20190306015804-8e90cee79f82 h1:SHucoAy7lRb+w5oC/hbXyZg+zX+Wftn6hD4tGzHCVqA= +k8s.io/klog v0.0.0-20190306015804-8e90cee79f82/go.mod h1:Gq+BEi5rUBO/HRz0bTSXDUcqjScdoY3a9IHpCEIOOfk= k8s.io/kube-openapi v0.0.0-20190228160746-b3a7cee44a30 h1:TRb4wNWoBVrH9plmkp2q86FIDppkbrEXdXlxU3a3BMI= k8s.io/kube-openapi v0.0.0-20190228160746-b3a7cee44a30/go.mod h1:BXM9ceUBTj2QnfH2MK1odQs778ajze1RxcmP6S8RVVc= k8s.io/utils v0.0.0-20190221042446-c2654d5206da h1:ElyM7RPonbKnQqOcw7dG2IK5uvQQn3b/WPHqD5mBvP4= diff --git a/tools/events/event_broadcaster.go b/tools/events/event_broadcaster.go new file mode 100644 index 00000000..9d373dd8 --- /dev/null +++ b/tools/events/event_broadcaster.go @@ -0,0 +1,285 @@ +/* +Copyright 2019 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package events + +import ( + "os" + "sync" + "time" + + corev1 "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/api/errors" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/util/clock" + utilruntime "k8s.io/apimachinery/pkg/util/runtime" + "k8s.io/apimachinery/pkg/watch" + restclient "k8s.io/client-go/rest" + + "k8s.io/api/events/v1beta1" + "k8s.io/apimachinery/pkg/util/json" + "k8s.io/apimachinery/pkg/util/strategicpatch" + "k8s.io/apimachinery/pkg/util/wait" + "k8s.io/client-go/tools/record/util" + "k8s.io/klog" +) + +const ( + maxTriesPerEvent = 12 + finishTime = 6 * time.Minute + refreshTime = 30 * time.Minute + maxQueuedEvents = 1000 +) + +var defaultSleepDuration = 10 * time.Second + +// TODO: validate impact of copying and investigate hashing +type eventKey struct { + action string + reason string + reportingController string + reportingInstance string + regarding corev1.ObjectReference + related corev1.ObjectReference +} + +type eventBroadcasterImpl struct { + *watch.Broadcaster + mu sync.RWMutex + eventCache map[eventKey]*v1beta1.Event + sleepDuration time.Duration + sink EventSink +} + +// NewBroadcaster Creates a new event broadcaster. +func NewBroadcaster(sink EventSink) EventBroadcaster { + return newBroadcaster(sink, defaultSleepDuration) +} + +// NewBroadcasterForTest Creates a new event broadcaster for test purposes. +func newBroadcaster(sink EventSink, sleepDuration time.Duration) EventBroadcaster { + return &eventBroadcasterImpl{ + Broadcaster: watch.NewBroadcaster(maxQueuedEvents, watch.DropIfChannelFull), + eventCache: map[eventKey]*v1beta1.Event{}, + sleepDuration: sleepDuration, + sink: sink, + } +} + +// TODO: add test for refreshExistingEventSeries +func (e *eventBroadcasterImpl) refreshExistingEventSeries() { + // TODO: Investigate whether lock contention won't be a problem + e.mu.RLock() + defer e.mu.RUnlock() + for isomorphicKey, event := range e.eventCache { + if event.Series != nil { + if recordedEvent, retry := recordEvent(e.sink, event); !retry { + e.eventCache[isomorphicKey] = recordedEvent + } + } + } +} + +// TODO: add test for finishSeries +func (e *eventBroadcasterImpl) finishSeries() { + // TODO: Investigate whether lock contention won't be a problem + e.mu.Lock() + defer e.mu.Unlock() + for isomorphicKey, event := range e.eventCache { + eventSerie := event.Series + if eventSerie != nil { + if eventSerie.LastObservedTime.Time.Add(finishTime).Before(time.Now()) { + if _, retry := recordEvent(e.sink, event); !retry { + delete(e.eventCache, isomorphicKey) + } + } + } + } +} + +// NewRecorder returns an EventRecorder that records events with the given event source. +func (e *eventBroadcasterImpl) NewRecorder(scheme *runtime.Scheme, reportingController string) EventRecorder { + hostname, _ := os.Hostname() + reportingInstance := reportingController + "-" + hostname + return &recorderImpl{scheme, reportingController, reportingInstance, e.Broadcaster, clock.RealClock{}} +} + +func (e *eventBroadcasterImpl) recordToSink(event *v1beta1.Event, clock clock.Clock) { + // Make a copy before modification, because there could be multiple listeners. + eventCopy := event.DeepCopy() + go func() { + evToRecord := func() *v1beta1.Event { + e.mu.Lock() + defer e.mu.Unlock() + eventKey := getKey(eventCopy) + isomorphicEvent, isIsomorphic := e.eventCache[eventKey] + if isIsomorphic { + if isomorphicEvent.Series != nil { + isomorphicEvent.Series.Count++ + isomorphicEvent.EventTime = metav1.MicroTime{Time: clock.Now()} + return nil + } + isomorphicEvent.Series = &v1beta1.EventSeries{ + Count: 1, + LastObservedTime: metav1.MicroTime{Time: clock.Now()}, + } + return isomorphicEvent + } + e.eventCache[eventKey] = eventCopy + return eventCopy + }() + if evToRecord != nil { + recordedEvent := e.attemptRecording(evToRecord) + if recordedEvent != nil { + recordedEventKey := getKey(recordedEvent) + e.mu.Lock() + defer e.mu.Unlock() + e.eventCache[recordedEventKey] = recordedEvent + } + } + }() +} + +func (e *eventBroadcasterImpl) attemptRecording(event *v1beta1.Event) *v1beta1.Event { + tries := 0 + for { + if recordedEvent, retry := recordEvent(e.sink, event); !retry { + return recordedEvent + } + tries++ + if tries >= maxTriesPerEvent { + klog.Errorf("Unable to write event '%#v' (retry limit exceeded!)", event) + return nil + } + // Randomize sleep so that various clients won't all be + // synced up if the master goes down. + time.Sleep(wait.Jitter(e.sleepDuration, 0.25)) + } +} + +func recordEvent(sink EventSink, event *v1beta1.Event) (*v1beta1.Event, bool) { + var newEvent *v1beta1.Event + var err error + isEventSerie := event.Series != nil + if isEventSerie { + patch, err := createPatchBytesForSeries(event) + if err != nil { + klog.Errorf("Unable to calculate diff, no merge is possible: %v", err) + return nil, false + } + newEvent, err = sink.Patch(event, patch) + } + // Update can fail because the event may have been removed and it no longer exists. + if !isEventSerie || (isEventSerie && util.IsKeyNotFoundError(err)) { + // Making sure that ResourceVersion is empty on creation + event.ResourceVersion = "" + newEvent, err = sink.Create(event) + } + if err == nil { + return newEvent, false + } + // If we can't contact the server, then hold everything while we keep trying. + // Otherwise, something about the event is malformed and we should abandon it. + switch err.(type) { + case *restclient.RequestConstructionError: + // We will construct the request the same next time, so don't keep trying. + klog.Errorf("Unable to construct event '%#v': '%v' (will not retry!)", event, err) + return nil, false + case *errors.StatusError: + if errors.IsAlreadyExists(err) { + klog.V(5).Infof("Server rejected event '%#v': '%v' (will not retry!)", event, err) + } else { + klog.Errorf("Server rejected event '%#v': '%v' (will not retry!)", event, err) + } + return nil, false + case *errors.UnexpectedObjectError: + // We don't expect this; it implies the server's response didn't match a + // known pattern. Go ahead and retry. + default: + // This case includes actual http transport errors. Go ahead and retry. + } + klog.Errorf("Unable to write event: '%v' (may retry after sleeping)", err) + return nil, true +} + +func createPatchBytesForSeries(event *v1beta1.Event) ([]byte, error) { + oldEvent := event.DeepCopy() + oldEvent.Series = nil + oldData, err := json.Marshal(oldEvent) + if err != nil { + return nil, err + } + newData, err := json.Marshal(event) + if err != nil { + return nil, err + } + return strategicpatch.CreateTwoWayMergePatch(oldData, newData, v1beta1.Event{}) +} + +func getKey(event *v1beta1.Event) eventKey { + key := eventKey{ + action: event.Action, + reason: event.Reason, + reportingController: event.ReportingController, + reportingInstance: event.ReportingInstance, + regarding: event.Regarding, + } + if event.Related != nil { + key.related = *event.Related + } + return key +} + +// startEventWatcher starts sending events received from this EventBroadcaster to the given event handler function. +// The return value is used to stop recording +func (e *eventBroadcasterImpl) startEventWatcher(eventHandler func(event runtime.Object)) func() { + watcher := e.Watch() + go func() { + defer utilruntime.HandleCrash() + for { + watchEvent, ok := <-watcher.ResultChan() + if !ok { + return + } + eventHandler(watchEvent.Object) + } + }() + return watcher.Stop +} + +// StartRecordingToSink starts sending events received from the specified eventBroadcaster to the given sink. +func (e *eventBroadcasterImpl) StartRecordingToSink(stopCh <-chan struct{}) { + go wait.Until(func() { + e.refreshExistingEventSeries() + }, refreshTime, stopCh) + go wait.Until(func() { + e.finishSeries() + }, finishTime, stopCh) + eventHandler := func(obj runtime.Object) { + event, ok := obj.(*v1beta1.Event) + if !ok { + klog.Errorf("unexpected type, expected v1beta1.Event") + return + } + e.recordToSink(event, clock.RealClock{}) + } + stopWatcher := e.startEventWatcher(eventHandler) + go func() { + <-stopCh + stopWatcher() + }() +} diff --git a/tools/events/event_recorder.go b/tools/events/event_recorder.go new file mode 100644 index 00000000..c2c18fb3 --- /dev/null +++ b/tools/events/event_recorder.go @@ -0,0 +1,89 @@ +/* +Copyright 2019 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package events + +import ( + "fmt" + "time" + + v1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/util/clock" + utilruntime "k8s.io/apimachinery/pkg/util/runtime" + "k8s.io/apimachinery/pkg/watch" + "k8s.io/client-go/tools/reference" + + "k8s.io/api/events/v1beta1" + "k8s.io/client-go/tools/record/util" + "k8s.io/klog" +) + +type recorderImpl struct { + scheme *runtime.Scheme + reportingController string + reportingInstance string + *watch.Broadcaster + clock clock.Clock +} + +func (recorder *recorderImpl) Eventf(regarding runtime.Object, related runtime.Object, eventtype, reason, action, note string, args ...interface{}) { + timestamp := metav1.MicroTime{time.Now()} + message := fmt.Sprintf(note, args...) + refRegarding, err := reference.GetReference(recorder.scheme, regarding) + if err != nil { + klog.Errorf("Could not construct reference to: '%#v' due to: '%v'. Will not report event: '%v' '%v' '%v'", regarding, err, eventtype, reason, message) + return + } + refRelated, err := reference.GetReference(recorder.scheme, related) + if err != nil { + klog.Errorf("Could not construct reference to: '%#v' due to: '%v'.", related, err) + } + if !util.ValidateEventType(eventtype) { + klog.Errorf("Unsupported event type: '%v'", eventtype) + return + } + event := recorder.makeEvent(refRegarding, refRelated, timestamp, eventtype, reason, message, recorder.reportingController, recorder.reportingInstance, action) + go func() { + defer utilruntime.HandleCrash() + recorder.Action(watch.Added, event) + }() +} + +func (recorder *recorderImpl) makeEvent(refRegarding *v1.ObjectReference, refRelated *v1.ObjectReference, timestamp metav1.MicroTime, eventtype, reason, message string, reportingController string, reportingInstance string, action string) *v1beta1.Event { + t := metav1.Time{Time: recorder.clock.Now()} + namespace := refRegarding.Namespace + if namespace == "" { + namespace = metav1.NamespaceSystem + } + return &v1beta1.Event{ + ObjectMeta: metav1.ObjectMeta{ + Name: fmt.Sprintf("%v.%x", refRegarding.Name, t.UnixNano()), + Namespace: namespace, + }, + EventTime: timestamp, + Series: nil, + ReportingController: reportingController, + ReportingInstance: reportingInstance, + Action: action, + Reason: reason, + Regarding: *refRegarding, + Related: refRelated, + Note: message, + Type: eventtype, + } +} diff --git a/tools/events/eventseries_test.go b/tools/events/eventseries_test.go new file mode 100644 index 00000000..f4418922 --- /dev/null +++ b/tools/events/eventseries_test.go @@ -0,0 +1,208 @@ +/* +Copyright 2019 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package events + +import ( + "strconv" + "testing" + "time" + + "os" + "strings" + + v1 "k8s.io/api/core/v1" + "k8s.io/api/events/v1beta1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + k8sruntime "k8s.io/apimachinery/pkg/runtime" + "k8s.io/client-go/kubernetes/scheme" + ref "k8s.io/client-go/tools/reference" +) + +type testEventSeriesSink struct { + OnCreate func(e *v1beta1.Event) (*v1beta1.Event, error) + OnUpdate func(e *v1beta1.Event) (*v1beta1.Event, error) + OnPatch func(e *v1beta1.Event, p []byte) (*v1beta1.Event, error) +} + +// Create records the event for testing. +func (t *testEventSeriesSink) Create(e *v1beta1.Event) (*v1beta1.Event, error) { + if t.OnCreate != nil { + return t.OnCreate(e) + } + return e, nil +} + +// Update records the event for testing. +func (t *testEventSeriesSink) Update(e *v1beta1.Event) (*v1beta1.Event, error) { + if t.OnUpdate != nil { + return t.OnUpdate(e) + } + return e, nil +} + +// Patch records the event for testing. +func (t *testEventSeriesSink) Patch(e *v1beta1.Event, p []byte) (*v1beta1.Event, error) { + if t.OnPatch != nil { + return t.OnPatch(e, p) + } + return e, nil +} + +func TestEventSeriesf(t *testing.T) { + hostname, _ := os.Hostname() + + testPod := &v1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + SelfLink: "/api/version/pods/foo", + Name: "foo", + Namespace: "baz", + UID: "bar", + }, + } + + regarding, err := ref.GetPartialReference(scheme.Scheme, testPod, ".spec.containers[1]") + if err != nil { + t.Fatal(err) + } + + related, err := ref.GetPartialReference(scheme.Scheme, testPod, ".spec.containers[0]") + if err != nil { + t.Fatal(err) + } + + expectedEvent := &v1beta1.Event{ + ObjectMeta: metav1.ObjectMeta{ + Name: "foo", + Namespace: "baz", + }, + EventTime: metav1.MicroTime{time.Now()}, + ReportingController: "eventTest", + ReportingInstance: "eventTest-" + hostname, + Action: "started", + Reason: "test", + Regarding: *regarding, + Related: related, + Note: "some verbose message: 1", + Type: v1.EventTypeNormal, + } + + isomorphicEvent := expectedEvent.DeepCopy() + + nonIsomorphicEvent := expectedEvent.DeepCopy() + nonIsomorphicEvent.Action = "stopped" + + expectedEvent.Series = &v1beta1.EventSeries{Count: 1} + table := []struct { + regarding k8sruntime.Object + related k8sruntime.Object + actual *v1beta1.Event + elements []interface{} + expect *v1beta1.Event + expectUpdate bool + }{ + { + regarding: regarding, + related: related, + actual: isomorphicEvent, + elements: []interface{}{1}, + expect: expectedEvent, + expectUpdate: true, + }, + { + regarding: regarding, + related: related, + actual: nonIsomorphicEvent, + elements: []interface{}{1}, + expect: nonIsomorphicEvent, + expectUpdate: false, + }, + } + + stopCh := make(chan struct{}) + + createEvent := make(chan *v1beta1.Event) + updateEvent := make(chan *v1beta1.Event) + patchEvent := make(chan *v1beta1.Event) + + testEvents := testEventSeriesSink{ + OnCreate: func(event *v1beta1.Event) (*v1beta1.Event, error) { + createEvent <- event + return event, nil + }, + OnUpdate: func(event *v1beta1.Event) (*v1beta1.Event, error) { + updateEvent <- event + return event, nil + }, + OnPatch: func(event *v1beta1.Event, patch []byte) (*v1beta1.Event, error) { + // event we receive is already patched, usually the sink uses it only to retrieve the name and namespace, here + // we'll use it directly + patchEvent <- event + return event, nil + }, + } + eventBroadcaster := newBroadcaster(&testEvents, 0) + recorder := eventBroadcaster.NewRecorder(scheme.Scheme, "eventTest") + eventBroadcaster.StartRecordingToSink(stopCh) + recorder.Eventf(regarding, related, isomorphicEvent.Type, isomorphicEvent.Reason, isomorphicEvent.Action, isomorphicEvent.Note, []interface{}{1}) + // read from the chan as this was needed only to populate the cache + <-createEvent + for index, item := range table { + actual := item.actual + recorder.Eventf(item.regarding, item.related, actual.Type, actual.Reason, actual.Action, actual.Note, item.elements) + // validate event + if item.expectUpdate { + actualEvent := <-patchEvent + t.Logf("%v - validating event affected by patch request", index) + validateEventSerie(strconv.Itoa(index), true, actualEvent, item.expect, t) + } else { + actualEvent := <-createEvent + t.Logf("%v - validating event affected by a create request", index) + validateEventSerie(strconv.Itoa(index), false, actualEvent, item.expect, t) + } + } + close(stopCh) +} + +func validateEventSerie(messagePrefix string, expectedUpdate bool, actualEvent *v1beta1.Event, expectedEvent *v1beta1.Event, t *testing.T) { + recvEvent := *actualEvent + + // Just check that the timestamp was set. + if recvEvent.EventTime.IsZero() { + t.Errorf("%v - timestamp wasn't set: %#v", messagePrefix, recvEvent) + } + + if expectedUpdate { + if recvEvent.Series == nil { + t.Errorf("%v - Series was nil but expected: %#v", messagePrefix, recvEvent.Series) + + } else { + if recvEvent.Series.Count != expectedEvent.Series.Count { + t.Errorf("%v - Series mismatch actual was: %#v but expected: %#v", messagePrefix, recvEvent.Series, expectedEvent.Series) + } + } + + // Check that name has the right prefix. + if n, en := recvEvent.Name, expectedEvent.Name; !strings.HasPrefix(n, en) { + t.Errorf("%v - Name '%v' does not contain prefix '%v'", messagePrefix, n, en) + } + } else { + if recvEvent.Series != nil { + t.Errorf("%v - series was expected to be nil but was: %#v", messagePrefix, recvEvent.Series) + } + } + +} diff --git a/tools/events/interfaces.go b/tools/events/interfaces.go new file mode 100644 index 00000000..2c8032aa --- /dev/null +++ b/tools/events/interfaces.go @@ -0,0 +1,58 @@ +/* +Copyright 2019 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package events + +import ( + "k8s.io/api/events/v1beta1" + "k8s.io/apimachinery/pkg/runtime" +) + +// EventRecorder knows how to record events on behalf of an EventSource. +type EventRecorder interface { + // Eventf constructs an event from the given information and puts it in the queue for sending. + // 'regarding' is the object this event is about. Event will make a reference-- or you may also + // pass a reference to the object directly. + // 'related' is the secondary object for more complex actions. E.g. when regarding object triggers + // a creation or deletion of related object. + // 'type' of this event, and can be one of Normal, Warning. New types could be added in future + // 'reason' is the reason this event is generated. 'reason' should be short and unique; it + // should be in UpperCamelCase format (starting with a capital letter). "reason" will be used + // to automate handling of events, so imagine people writing switch statements to handle them. + // You want to make that easy. + // 'note' is intended to be human readable. + Eventf(regarding runtime.Object, related runtime.Object, eventtype, reason, action, note string, args ...interface{}) +} + +// EventBroadcaster knows how to receive events and send them to any EventSink, watcher, or log. +type EventBroadcaster interface { + // StartRecordingToSink starts sending events received from the specified eventBroadcaster. + StartRecordingToSink(stopCh <-chan struct{}) + + // NewRecorder returns an EventRecorder that can be used to send events to this EventBroadcaster + // with the event source set to the given event source. + NewRecorder(scheme *runtime.Scheme, reportingController string) EventRecorder +} + +// EventSink knows how to store events (client-go implements it.) +// EventSink must respect the namespace that will be embedded in 'event'. +// It is assumed that EventSink will return the same sorts of errors as +// client-go's REST client. +type EventSink interface { + Create(event *v1beta1.Event) (*v1beta1.Event, error) + Update(event *v1beta1.Event) (*v1beta1.Event, error) + Patch(oldEvent *v1beta1.Event, data []byte) (*v1beta1.Event, error) +}