Compare commits

...

65 Commits

Author SHA1 Message Date
Kubernetes Publisher
bcaa470f9d Update dependencies to v0.15.9 tag 2020-01-21 19:36:04 +00:00
Kubernetes Publisher
73fd2ddc91 Merge pull request #86457 from weinong/automated-cherry-pick-of-#86412-upstream-release-1.15
Automated cherry pick of #86412: It fixes a bug where AAD token obtained by kubectl is

Kubernetes-commit: 19a1e169ff18178213ab6e5b2e9069c04d11748e
2019-12-24 23:51:39 -08:00
Weinong Wang
e213ccc306 It fixes a bug where AAD token obtained by kubectl is incompatible with on-behalf-of flow and oidc.
Kubernetes-commit: fcb2225e6772e3e6f1baa2d59dcbe930f49cad2d
2019-12-18 20:21:21 -08:00
Kubernetes Publisher
5f2132fc43 Merge pull request #83919 from liggitt/automated-cherry-pick-of-#83911-upstream-release-1.15
Automated cherry pick of #83911: Remove check causing informers to miss notifications

Kubernetes-commit: 0f71449be978a5e9efca3e7431941cc90211b339
2019-10-28 19:14:42 -07:00
matte21
699a46738f Remove check causing informers to miss notifications
Fix DeltaFIFO bug that caused the sync delta created by a relist
for object ID X to be dropped if the DeltaFIFO already stored a
Delete delta for X. This caused SharedIndexInformer to miss create
notifications. Also, add unit test to expose the bug.

Kubernetes-commit: d9930d43559b24743b21f2cc27b3d2c457e650a3
2019-10-14 18:19:05 +02:00
Kubernetes Publisher
b2f42092e3 Merge pull request #83434 from liggitt/automated-cherry-pick-of-#83261-upstream-release-1.15
[1.15] Automated cherry pick of #83261: bump gopkg.in/yaml.v2 v2.2.4

Kubernetes-commit: 9a83f46a506124685802559c4a3981f4f415d1de
2019-10-04 12:04:15 +00:00
Jordan Liggitt
539d9f1f03 bump gopkg.in/yaml.v2 v2.2.4
Kubernetes-commit: 8df7ee083fe4dcb39a871b6086a638401bad98c1
2019-10-02 14:46:08 -04:00
Kubernetes Publisher
637fc595d1 Merge pull request #81522 from cblecker/1.15-x/net
Update golang/x/net dependency on release-1.15

Kubernetes-commit: 2d3c76f9091b6bec110a5e63777c332469e0cba2
2019-08-17 02:15:27 +00:00
Christoph Blecker
069aa55f4a Update golang/x/net dependency
Kubernetes-commit: 842ee1f5889a4e31a48a5cd9c3cbf1edec7c6848
2019-08-16 11:01:55 -07:00
Kubernetes Publisher
44c2c549a5 Merge pull request #81355 from wojtek-t/automated-cherry-pick-of-#80978-upstream-release-1.15
Manual cherry pick of #80978 upstream release 1.15

Kubernetes-commit: 32884077729c30b123806aa5cad4af3bce63631c
2019-08-16 06:15:17 +00:00
wojtekt
96d4b4ca2c Fix events test
Kubernetes-commit: 18fdcbb5da09d64ba963b1cefcf91597a926e4d0
2019-08-13 16:14:46 +02:00
wojtekt
5abdc64bf8 Fix GetReference function
Kubernetes-commit: 363ec38db2fb9ca975411b9f864b0f5596c889c0
2019-08-05 08:37:55 +02:00
Kubernetes Publisher
e4cdb82809 Merge remote-tracking branch 'origin/master' into release-1.15
Kubernetes-commit: 6127bfb1bbd447e15fd839f3987abe1b8e5188b3
2019-06-12 21:03:32 +00:00
Krzysztof Siedlecki
3c67f637e2 Revert "Bump klog to v0.3.2"
Kubernetes-commit: 7dcec919a2e5e8e23da3d0dd61276d86c9b0e4b6
2019-06-12 10:27:41 +02:00
Kubernetes Publisher
935aad3790 Merge remote-tracking branch 'origin/master' into release-1.15
Kubernetes-commit: 3c5152b9a54ed2273710900be24e5d0828bff79b
2019-06-02 13:04:12 +00:00
Yassine TIJANI
f0e0a63eea set deprecatedEventSource to be backward compatible
Signed-off-by: Yassine TIJANI <ytijani@vmware.com>

Kubernetes-commit: 41e384397cf8505f226dc18f4250b297f98937ab
2019-05-30 13:03:20 +00:00
Kubernetes Publisher
3e37d3cec6 Merge remote-tracking branch 'origin/master' into release-1.15
Kubernetes-commit: 6be12759ea726aa785ba3a5937bbc75e479c1efd
2019-05-31 03:04:30 +00:00
Kubernetes Publisher
e85d69084a Merge pull request #78482 from tedyu/evt-expansion
Check namespaces match in UpdateWithEventNamespace

Kubernetes-commit: b3981a2f9acce4ee57f55131873ceaccf1707598
2019-05-31 13:29:21 +00:00
Kubernetes Publisher
e4ab670a95 Merge pull request #78465 from yuwenma/bump-klog
Bump klog to v0.3.2

Kubernetes-commit: b094dd9bc3a4617b587b04993931a6110691ddc0
2019-05-31 13:29:20 +00:00
Kubernetes Publisher
f6af94938b Merge remote-tracking branch 'origin/master' into release-1.15
Kubernetes-commit: 67f7c9f6ff8e61b44bd62db9ba6e0660b38c0617
2019-05-30 04:57:07 +00:00
Kubernetes Publisher
b34ddfd55a Merge pull request #78037 from yastij/fix-event-cleanup
clean singleton event when calling finishSeries

Kubernetes-commit: b0a81349effc68e19a6e063c8b7ebe0a4c8774e3
2019-05-31 13:29:18 +00:00
Kubernetes Publisher
46aef4b64f Merge pull request #78176 from yastij/event-expansion-client
add event_expansion for v1beta1 Events

Kubernetes-commit: 3978efc2167d9b3bcf266b7c2f3e7fb579f04e20
2019-05-31 13:29:17 +00:00
Kubernetes Publisher
0f8ad595c9 Merge pull request #77434 from mikedanese/watching
NewIndexerInformerWatcher: fix goroutine leak

Kubernetes-commit: 4fed75302a869c1c633e482af5e9d5fa3966fba6
2019-05-31 13:29:17 +00:00
Ted Yu
ea09349e3b Check namespaces match in UpdateWithEventNamespace
Kubernetes-commit: 5990a7d5070400616192c09e3d68f5cebcf9db2f
2019-05-29 08:40:02 -07:00
yuwenma
f78d31a2bc Bump klog to v0.3.2
Kubernetes-commit: 5cef37433e55827226f20981598ddfa2c6511809
2019-05-28 22:45:19 -07:00
Kubernetes Publisher
1d2c64df00 Merge remote-tracking branch 'origin/master' into release-1.15. Deleting CHANGELOG-1.14.md
Kubernetes-commit: 5991262b24f5e834654716148c939624536cce3a
2019-05-23 09:39:02 +00:00
Kubernetes Publisher
0810080103 Merge pull request #77793 from SataQiu/fix-golint-client-go-20190513
Fix golint failures of client-go/tools/auth client-go/tools/portforward

Kubernetes-commit: 5666982b2745e88d2009e4155d4a22f6c01061df
2019-05-28 15:52:23 +00:00
Michael Taufen
0976f4afe2 Update klog to v0.3.1
Includes recent fixes, notably https://github.com/kubernetes/klog/pull/66

Kubernetes-commit: ee7bcc53a206f669b057e38a477b51b3477aab23
2019-05-22 10:51:33 -07:00
Kubernetes Publisher
0673906c42 Merge remote-tracking branch 'origin/master' into release-1.15
Kubernetes-commit: 5b4de8c6ac4a34323a8582765a969d5ecfcf5467
2019-05-21 08:03:21 +00:00
Kubernetes Publisher
a2e13ccd49 Merge pull request #78019 from mrkm4ntr/use-constant
Change to use a constant

Kubernetes-commit: 15d88d19d33ffd50bc54bcb7f95946ecedb4e42b
2019-05-28 15:52:20 +00:00
Kubernetes Publisher
127c8ce629 Merge pull request #77991 from mikedanese/ledoc
cleanup some leader election client doc

Kubernetes-commit: 69c90d8cca62d4dbb1acb966608d5100bada5500
2019-05-28 15:52:19 +00:00
Kubernetes Publisher
26011c05ab Merge pull request #78063 from tedyu/evt-lock
Change lock type to write lock in eventBroadcasterImpl

Kubernetes-commit: 667c3ed94cdf52fb7dd26cfa9de7759df639c7d2
2019-05-28 15:52:18 +00:00
Kubernetes Publisher
9bcc910985 Merge pull request #77170 from smarterclayton/delay_queue_reentrant
DelayingQueue.ShutDown() should be reentrant

Kubernetes-commit: 4891eaa3adbfafb61d2bd264d2f0daef124ee8b9
2019-05-28 15:52:17 +00:00
Kubernetes Publisher
3ed0010e30 Merge remote-tracking branch 'origin/master' into release-1.15. Deleting CHANGELOG-1.14.md
Kubernetes-commit: 19e1375a042a0d5309e37c01e19af5835f78fce0
2019-05-18 03:23:38 +00:00
Kubernetes Publisher
c0015c0cc1 Merge pull request #78041 from yastij/fix-lastTimeObserved
update LastObservedTime instead of eventTime

Kubernetes-commit: b6f51d16d8a26558356b2eb876f7f3fdb0265aa9
2019-05-28 15:52:15 +00:00
Ted Yu
22ef81a898 Change lock type to write lock in eventBroadcasterImpl
Kubernetes-commit: 94af465819bd41a97b00acf997d5a7e077cbb654
2019-05-17 19:19:42 -07:00
Yassine TIJANI
7d76947ccd update LastObservedTime instead of eventTime
Signed-off-by: Yassine TIJANI <ytijani@vmware.com>

Kubernetes-commit: c6b224b16cff3c69b7a0d5b98b02c89777277971
2019-05-17 18:56:38 +02:00
Ted Yu
7615511cc0 Use lock in eventBroadcasterImpl#refreshExistingEventSeries
Kubernetes-commit: 32241b0751c9b9bd5d061eae9a42bd88970d8478
2019-05-17 09:30:54 -07:00
Kubernetes Publisher
a99ac36e1a Merge pull request #65782 from yastij/eventv2-eventf
Implementing logic for v1beta1.Event API

Kubernetes-commit: e67c266a72bf7b379a2b283a9989804f2282bac5
2019-05-28 15:52:14 +00:00
Kubernetes Publisher
424bde0e01 Merge pull request #77945 from michaelfig/client-go-namespaced-dynamicinformer
Honour NewFilteredDynamicSharedInformerFactory namespace argument

Kubernetes-commit: e0f28e5d0f03392e2970742426438c05a1778cb4
2019-05-28 15:52:13 +00:00
Kubernetes Publisher
958c03a8dd Merge pull request #77936 from liggitt/shorten-cert-wait
Interrupt WaitForCertificate if desired kubelet serving cert changes

Kubernetes-commit: a6b546eb72481165bc4d799d74c2ee8ae903e254
2019-05-28 15:52:12 +00:00
Kubernetes Publisher
9a62005bf9 Merge pull request #77925 from liggitt/icc-tokenfile
honor overridden tokenfile, add InClusterConfig override tests

Kubernetes-commit: 56683a2f1f4a16f592634306633a6bb3e252f051
2019-05-28 15:52:11 +00:00
Yassine TIJANI
0f3c6ef2dc clean singleton event when calling finishSeries
Signed-off-by: Yassine TIJANI <ytijani@vmware.com>

Kubernetes-commit: b6663e8d4fceb6f7de7267149341d51966276df8
2019-05-17 16:55:45 +02:00
Yassine TIJANI
4deb1a7db2 add event_expansion for v1beta1 Events
Signed-off-by: Yassine TIJANI <ytijani@vmware.com>

Kubernetes-commit: e29cb0fb7fbf5470e7caf2b3f1fc3d039bdda4ca
2019-05-17 12:12:19 +02:00
Shintaro Murakami
d64f019211 Change to use a constant
Kubernetes-commit: 2fcb248354ed5a3bf3154b1a8a0aafa237b2b37c
2019-05-17 18:40:03 +09:00
Mike Danese
73e7057101 cleanup some leader election client doc
Kubernetes-commit: 5df7f529f063ab354b050418b40c1f1dd2600175
2019-05-16 12:54:50 -07:00
Kubernetes Publisher
2c91b666c2 Merge remote-tracking branch 'origin/master' into release-1.15
Kubernetes-commit: 396d7ad1f381e46b52cc30e2fb7de8c27ede6fd3
2019-05-16 16:46:12 +00:00
Kubernetes Publisher
94836903b2 Merge pull request #77874 from yuchengwu/fix-CVE-2019-11244
fix CVE-2019-11244: `kubectl --http-cache=<world-accessible dir>` cre…

Kubernetes-commit: 730bc968b95842022c4c81d607bf6411b459a675
2019-05-28 15:52:09 +00:00
Kubernetes Publisher
4ea001dc73 Merge pull request #77585 from andyxning/fix_leader_election_start
enhance leader election doc

Kubernetes-commit: c2847e8b689c92d5cc4f1bb00d96df9dcc2cebfe
2019-05-28 15:52:08 +00:00
Kubernetes Publisher
f7ff47eb37 Merge pull request #76442 from viegasdom/fix-golint-utils-bandwith
Fix golint failures of util/bandwith/*.go

Kubernetes-commit: 37281a400d29ec4f515fb8a590d4f012689d89d7
2019-05-28 15:52:07 +00:00
viegasdom
45ef4fc0dc sync: squashed up to merge 69bd30507559be3dea905686b46bc3295c951f45 in 37281a400d29ec4f515fb8a590d4f012689d89d7 2019-05-28 15:52:07 +00:00
Kubernetes Publisher
c428844852 Merge remote-tracking branch 'origin/master' into release-1.15
Kubernetes-commit: 5d5e3fbd84b30d8bf59bfdf4c14d01196869b92f
2019-05-16 03:34:24 +00:00
Kubernetes Publisher
a0a9a3744f Merge pull request #77613 from mikedanese/fixinclusterconfig
BoundServiceAccountTokenVolume: fix InClusterConfig

Kubernetes-commit: 5c4b6528c2e9fa989bb6af9dea15d28ca6ac4ef3
2019-05-28 15:52:06 +00:00
Michael FIG
fa64ba9235 Honour NewFilteredDynamicSharedInformerFactory namespace argument
Kubernetes-commit: 2ed3272a5e428690c066a7c09e5c6a2fad3d74b6
2019-05-15 11:48:17 -06:00
Jordan Liggitt
ed62839de3 Interrupt WaitForCertificate if desired kubelet serving cert changes
Kubernetes-commit: 739a75fc32c5337ddbd13691e9bf6648fb13ff0d
2019-05-15 11:47:23 -04:00
Jordan Liggitt
85e546b378 honor overridden tokenfile, add InClusterConfig override tests
Kubernetes-commit: 7306fb7a89739a2fb48bfeb74595a5daa25060bd
2019-05-15 08:15:02 -04:00
Mike Danese
465fab1523 BoundServiceAccountTokenVolume: fix InClusterConfig
Kubernetes-commit: 4198f28855cbda6dac61408fcba6f2d576a9347c
2019-05-14 09:29:16 -07:00
Yucheng Wu
c076c6e063 fix CVE-2019-11244: kubectl --http-cache=<world-accessible dir> creates world-writeable cached schema files
Kubernetes-commit: f228ae3364729caed59087e23c42868454bc3ff4
2019-05-14 14:49:38 +08:00
Andy Xie
171f146b90 enhance leader election doc
Kubernetes-commit: 95f33ce39957e86cc3e81f4c8f325ecd133cbe9d
2019-05-14 10:36:22 +08:00
SataQiu
98cbc2c6ac fix golint failures of client-go/tools/auth client-go/tools/portforward
Kubernetes-commit: a89d75e3b822a13377373b0ef1c1969c69c616ec
2019-05-13 11:13:38 +08:00
Mike Danese
5551f5649d NewIndexerInformerWatcher: fix goroutine leak
There was some weird queuing going on. The queue was implement by
spawning goroutines that would block on the "ticketer" until it was
their turn to write to the output channel. If N events where in the
watch when the consumer of the watch stopped reading events, N
goroutines would leak. In unit tests of the certificate manager, this
was causing ~10k goroutines to leak.

Fix it with a buffering event processor that uses only one routine and
cancels correctly.

Kubernetes-commit: cafc640bfa0f7362b178b1b896085962d018afe3
2019-05-03 23:50:53 -04:00
Clayton Coleman
501db86fbe DelayingQueue.ShutDown() should be reentrant
All queue ShutDown() calls should be able to be invoked multiple times.

```
Observed a panic: "close of closed channel" (close of closed channel)
/go/src/github.com/openshift/cluster-version-operator/vendor/k8s.io/apimachinery/pkg/util/runtime/runtime.go:76
/go/src/github.com/openshift/cluster-version-operator/vendor/k8s.io/apimachinery/pkg/util/runtime/runtime.go:65
/go/src/github.com/openshift/cluster-version-operator/vendor/k8s.io/apimachinery/pkg/util/runtime/runtime.go:51
/usr/local/go/src/runtime/asm_amd64.s:573
/usr/local/go/src/runtime/panic.go:502
/usr/local/go/src/runtime/chan.go:333
/go/src/github.com/openshift/cluster-version-operator/vendor/k8s.io/client-go/util/workqueue/delaying_queue.go:137
```

Use sync.Once to guarantee a single close.

Kubernetes-commit: d2f7eb5235a93556261c8947e7a87342aeeaee2b
2019-04-27 16:16:55 -04:00
Mike Danese
ce329ac04e vendor github.com/google/go-cmp
Kubernetes-commit: 76f683a8f3dc2977846e16b2ea14208a51c2cb6b
2019-04-22 21:41:46 -07:00
Kubernetes Publisher
dd7f3ad83f Merge pull request #77456 from MikeSpreitzer/fix-run-comment
Fix comment on SharedInformer.Run

Kubernetes-commit: e5fec6507bf1642917c34d84ef453ec53cd47dbc
2019-05-14 18:40:34 +00:00
Yassine TIJANI
ba984c792f Implementing logic for v1beta1.Event API
Signed-off-by: Yassine TIJANI <ytijani@vmware.com>

Kubernetes-commit: 464a994a10b71c45583f3426fd970291f8a5b756
2018-07-03 22:25:17 +02:00
33 changed files with 1450 additions and 235 deletions

14
Godeps/Godeps.json generated
View File

@@ -54,6 +54,10 @@
"ImportPath": "github.com/google/btree",
"Rev": "7d79101e329e"
},
{
"ImportPath": "github.com/google/go-cmp",
"Rev": "v0.3.0"
},
{
"ImportPath": "github.com/google/gofuzz",
"Rev": "24818f796faf"
@@ -132,7 +136,7 @@
},
{
"ImportPath": "golang.org/x/net",
"Rev": "65e2d4e15006"
"Rev": "cdfb69ac37fc"
},
{
"ImportPath": "golang.org/x/oauth2",
@@ -180,19 +184,19 @@
},
{
"ImportPath": "gopkg.in/yaml.v2",
"Rev": "v2.2.1"
"Rev": "v2.2.4"
},
{
"ImportPath": "k8s.io/api",
"Rev": "e63b5755afac"
"Rev": "v0.15.9"
},
{
"ImportPath": "k8s.io/apimachinery",
"Rev": "ad85901afca0"
"Rev": "v0.15.9"
},
{
"ImportPath": "k8s.io/klog",
"Rev": "v0.3.0"
"Rev": "v0.3.1"
},
{
"ImportPath": "k8s.io/kube-openapi",

View File

@@ -172,7 +172,7 @@ func (d *CachedDiscoveryClient) getCachedFile(filename string) ([]byte, error) {
}
func (d *CachedDiscoveryClient) writeCachedFile(filename string, obj runtime.Object) error {
if err := os.MkdirAll(filepath.Dir(filename), 0755); err != nil {
if err := os.MkdirAll(filepath.Dir(filename), 0750); err != nil {
return err
}
@@ -191,7 +191,7 @@ func (d *CachedDiscoveryClient) writeCachedFile(filename string, obj runtime.Obj
return err
}
err = os.Chmod(f.Name(), 0755)
err = os.Chmod(f.Name(), 0660)
if err != nil {
return err
}

View File

@@ -19,6 +19,7 @@ package disk
import (
"io/ioutil"
"os"
"path/filepath"
"testing"
"time"
@@ -96,6 +97,32 @@ func TestNewCachedDiscoveryClient_TTL(t *testing.T) {
assert.Equal(c.groupCalls, 2)
}
func TestNewCachedDiscoveryClient_PathPerm(t *testing.T) {
assert := assert.New(t)
d, err := ioutil.TempDir("", "")
assert.NoError(err)
os.RemoveAll(d)
defer os.RemoveAll(d)
c := fakeDiscoveryClient{}
cdc := newCachedDiscoveryClient(&c, d, 1*time.Nanosecond)
cdc.ServerGroups()
err = filepath.Walk(d, func(path string, info os.FileInfo, err error) error {
if err != nil {
return err
}
if info.IsDir() {
assert.Equal(os.FileMode(0750), info.Mode().Perm())
} else {
assert.Equal(os.FileMode(0660), info.Mode().Perm())
}
return nil
})
assert.NoError(err)
}
type fakeDiscoveryClient struct {
groupCalls int
resourceCalls int

View File

@@ -18,6 +18,7 @@ package disk
import (
"net/http"
"os"
"path/filepath"
"github.com/gregjones/httpcache"
@@ -35,6 +36,8 @@ type cacheRoundTripper struct {
// corresponding requests.
func newCacheRoundTripper(cacheDir string, rt http.RoundTripper) http.RoundTripper {
d := diskv.New(diskv.Options{
PathPerm: os.FileMode(0750),
FilePerm: os.FileMode(0660),
BasePath: cacheDir,
TempDir: filepath.Join(cacheDir, ".diskv-temp"),
})

View File

@@ -22,7 +22,10 @@ import (
"net/http"
"net/url"
"os"
"path/filepath"
"testing"
"github.com/stretchr/testify/assert"
)
// copied from k8s.io/client-go/transport/round_trippers_test.go
@@ -93,3 +96,52 @@ func TestCacheRoundTripper(t *testing.T) {
t.Errorf("Invalid content read from cache %q", string(content))
}
}
func TestCacheRoundTripperPathPerm(t *testing.T) {
assert := assert.New(t)
rt := &testRoundTripper{}
cacheDir, err := ioutil.TempDir("", "cache-rt")
os.RemoveAll(cacheDir)
defer os.RemoveAll(cacheDir)
if err != nil {
t.Fatal(err)
}
cache := newCacheRoundTripper(cacheDir, rt)
// First call, caches the response
req := &http.Request{
Method: http.MethodGet,
URL: &url.URL{Host: "localhost"},
}
rt.Response = &http.Response{
Header: http.Header{"ETag": []string{`"123456"`}},
Body: ioutil.NopCloser(bytes.NewReader([]byte("Content"))),
StatusCode: http.StatusOK,
}
resp, err := cache.RoundTrip(req)
if err != nil {
t.Fatal(err)
}
content, err := ioutil.ReadAll(resp.Body)
if err != nil {
t.Fatal(err)
}
if string(content) != "Content" {
t.Errorf(`Expected Body to be "Content", got %q`, string(content))
}
err = filepath.Walk(cacheDir, func(path string, info os.FileInfo, err error) error {
if err != nil {
return err
}
if info.IsDir() {
assert.Equal(os.FileMode(0750), info.Mode().Perm())
} else {
assert.Equal(os.FileMode(0660), info.Mode().Perm())
}
return nil
})
assert.NoError(err)
}

View File

@@ -42,7 +42,7 @@ func NewFilteredDynamicSharedInformerFactory(client dynamic.Interface, defaultRe
return &dynamicSharedInformerFactory{
client: client,
defaultResync: defaultResync,
namespace: metav1.NamespaceAll,
namespace: namespace,
informers: map[schema.GroupVersionResource]informers.GenericInformer{},
startedInformers: make(map[schema.GroupVersionResource]bool),
tweakListOptions: tweakListOptions,

12
go.mod
View File

@@ -22,13 +22,13 @@ require (
github.com/spf13/pflag v1.0.1
github.com/stretchr/testify v1.2.2
golang.org/x/crypto v0.0.0-20181025213731-e84da0312774
golang.org/x/net v0.0.0-20190206173232-65e2d4e15006
golang.org/x/net v0.0.0-20190812203447-cdfb69ac37fc
golang.org/x/oauth2 v0.0.0-20190402181905-9f3314589c9a
golang.org/x/time v0.0.0-20161028155119-f51c12702a4d
google.golang.org/appengine v1.5.0 // indirect
k8s.io/api v0.0.0-20190511023547-e63b5755afac
k8s.io/apimachinery v0.0.0-20190511023455-ad85901afca0
k8s.io/klog v0.3.0
k8s.io/api v0.15.9
k8s.io/apimachinery v0.15.9
k8s.io/klog v0.3.1
k8s.io/utils v0.0.0-20190221042446-c2654d5206da
sigs.k8s.io/yaml v1.1.0
)
@@ -37,6 +37,6 @@ replace (
golang.org/x/sync => golang.org/x/sync v0.0.0-20181108010431-42b317875d0f
golang.org/x/sys => golang.org/x/sys v0.0.0-20190209173611-3b5209105503
golang.org/x/tools => golang.org/x/tools v0.0.0-20190313210603-aa82965741a9
k8s.io/api => k8s.io/api v0.0.0-20190511023547-e63b5755afac
k8s.io/apimachinery => k8s.io/apimachinery v0.0.0-20190511023455-ad85901afca0
k8s.io/api => k8s.io/api v0.15.9
k8s.io/apimachinery => k8s.io/apimachinery v0.15.9
)

17
go.sum
View File

@@ -22,8 +22,11 @@ github.com/golang/protobuf v1.2.0 h1:P3YflyNX/ehuJFLhxviNdFxQPkGK5cDcApsge1SqnvM
github.com/golang/protobuf v1.2.0/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U=
github.com/google/btree v0.0.0-20160524151835-7d79101e329e h1:JHB7F/4TJCrYBW8+GZO8VkWDj1jxcWuCl6uxKODiyi4=
github.com/google/btree v0.0.0-20160524151835-7d79101e329e/go.mod h1:lNA+9X1NB3Zf8V7Ke586lFgjr2dZNuvo3lPJSGZ5JPQ=
github.com/google/go-cmp v0.3.0 h1:crn/baboCvb5fXaQ0IJ1SGTsTVrWpDsCWC8EGETZijY=
github.com/google/go-cmp v0.3.0/go.mod h1:8QqcDgzrUqlUb/G2PQTWiueGozuR1884gddMywk6iLU=
github.com/google/gofuzz v0.0.0-20170612174753-24818f796faf h1:+RRA9JqSOZFfKrOeqr2z77+8R2RKyh8PG66dcu1V0ck=
github.com/google/gofuzz v0.0.0-20170612174753-24818f796faf/go.mod h1:HP5RmnzzSNb993RKQDq4+1A4ia9nllfqcQFTQJedwGI=
github.com/google/uuid v1.0.0 h1:b4Gk+7WdP/d3HZH8EJsZpvV7EtDOgaZLtnaNGIu1adA=
github.com/google/uuid v1.0.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
github.com/googleapis/gnostic v0.0.0-20170729233727-0c5108395e2d h1:7XGaL1e6bYS1yIonGp9761ExpPPV1ui0SAC59Yube9k=
github.com/googleapis/gnostic v0.0.0-20170729233727-0c5108395e2d/go.mod h1:sJBsCZ4ayReDTBIg8b9dl28c5xFWyhBTVRp3pOg5EKY=
@@ -61,8 +64,8 @@ golang.org/x/crypto v0.0.0-20181025213731-e84da0312774/go.mod h1:6SG95UA2DQfeDnf
golang.org/x/net v0.0.0-20180724234803-3673e40ba225/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4=
golang.org/x/net v0.0.0-20180906233101-161cd47e91fd/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4=
golang.org/x/net v0.0.0-20190108225652-1e06a53dbb7e/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4=
golang.org/x/net v0.0.0-20190206173232-65e2d4e15006 h1:bfLnR+k0tq5Lqt6dflRLcZiz6UaXCMt3vhYJ1l4FQ80=
golang.org/x/net v0.0.0-20190206173232-65e2d4e15006/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4=
golang.org/x/net v0.0.0-20190812203447-cdfb69ac37fc h1:gkKoSkUmnU6bpS/VhkuO27bzQeSA51uaEfbOW5dNb68=
golang.org/x/net v0.0.0-20190812203447-cdfb69ac37fc/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4=
golang.org/x/oauth2 v0.0.0-20190402181905-9f3314589c9a h1:tImsplftrFpALCYumobsd0K86vlAs/eXGFms2txfJfA=
golang.org/x/oauth2 v0.0.0-20190402181905-9f3314589c9a/go.mod h1:gOpvHmFTYa4IltrdGE7lF6nIHvwfUNPOp7c8zoXwtLw=
golang.org/x/sync v0.0.0-20181108010431-42b317875d0f h1:Bl/8QSvNqXvPGPGXa2z5xUTmV7VDcZyvRZ+QQXkXTZQ=
@@ -88,10 +91,12 @@ gopkg.in/tomb.v1 v1.0.0-20141024135613-dd632973f1e7 h1:uRGJdciOHaEIrze2W8Q3AKkep
gopkg.in/tomb.v1 v1.0.0-20141024135613-dd632973f1e7/go.mod h1:dt/ZhP58zS4L8KSrWDmTeBkI65Dw0HsyUHuEVlX15mw=
gopkg.in/yaml.v2 v2.2.1 h1:mUhvW9EsL+naU5Q3cakzfE91YhliOondGd6ZrsDBHQE=
gopkg.in/yaml.v2 v2.2.1/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
k8s.io/api v0.0.0-20190511023547-e63b5755afac/go.mod h1:f265Ep4XvHtyggfaaNhtP0AtAFilBaXq2fPjm5kYxwI=
k8s.io/apimachinery v0.0.0-20190511023455-ad85901afca0/go.mod h1:5CBnzrKYGHzv9ZsSKmQ8wHt4XI4/TUBPDwYM9FlZMyw=
k8s.io/klog v0.3.0 h1:0VPpR+sizsiivjIfIAQH/rl8tan6jvWkS7lU+0di3lE=
k8s.io/klog v0.3.0/go.mod h1:Gq+BEi5rUBO/HRz0bTSXDUcqjScdoY3a9IHpCEIOOfk=
gopkg.in/yaml.v2 v2.2.4 h1:/eiJrUcujPVeJ3xlSWaiNi3uSVmDGBK1pDHUHAnao1I=
gopkg.in/yaml.v2 v2.2.4/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
k8s.io/api v0.15.9/go.mod h1:k1xN2kcg4y3Yn8leWHxx5KW/tfWls8cd7L/0H/bzLdM=
k8s.io/apimachinery v0.15.9/go.mod h1:Xc10RHc1U+F/e9GCloJ8QAeCGevSVP5xhOhqlE+e1kM=
k8s.io/klog v0.3.1 h1:RVgyDHY/kFKtLqh67NvEWIgkMneNoIrdkN0CxDSQc68=
k8s.io/klog v0.3.1/go.mod h1:Gq+BEi5rUBO/HRz0bTSXDUcqjScdoY3a9IHpCEIOOfk=
k8s.io/kube-openapi v0.0.0-20190228160746-b3a7cee44a30 h1:TRb4wNWoBVrH9plmkp2q86FIDppkbrEXdXlxU3a3BMI=
k8s.io/kube-openapi v0.0.0-20190228160746-b3a7cee44a30/go.mod h1:BXM9ceUBTj2QnfH2MK1odQs778ajze1RxcmP6S8RVVc=
k8s.io/utils v0.0.0-20190221042446-c2654d5206da h1:ElyM7RPonbKnQqOcw7dG2IK5uvQQn3b/WPHqD5mBvP4=

View File

@@ -0,0 +1,98 @@
/*
Copyright 2019 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package v1beta1
import (
"fmt"
"k8s.io/api/events/v1beta1"
"k8s.io/apimachinery/pkg/types"
)
// The EventExpansion interface allows manually adding extra methods to the EventInterface.
// TODO: Add querying functions to the event expansion
type EventExpansion interface {
// CreateWithEventNamespace is the same as a Create
// except that it sends the request to the event.Namespace.
CreateWithEventNamespace(event *v1beta1.Event) (*v1beta1.Event, error)
// UpdateWithEventNamespace is the same as a Update
// except that it sends the request to the event.Namespace.
UpdateWithEventNamespace(event *v1beta1.Event) (*v1beta1.Event, error)
// PatchWithEventNamespace is the same as an Update
// except that it sends the request to the event.Namespace.
PatchWithEventNamespace(event *v1beta1.Event, data []byte) (*v1beta1.Event, error)
}
// CreateWithEventNamespace makes a new event.
// Returns the copy of the event the server returns, or an error.
// The namespace to create the event within is deduced from the event.
// it must either match this event client's namespace, or this event client must
// have been created with the "" namespace.
func (e *events) CreateWithEventNamespace(event *v1beta1.Event) (*v1beta1.Event, error) {
if e.ns != "" && event.Namespace != e.ns {
return nil, fmt.Errorf("can't create an event with namespace '%v' in namespace '%v'", event.Namespace, e.ns)
}
result := &v1beta1.Event{}
err := e.client.Post().
NamespaceIfScoped(event.Namespace, len(event.Namespace) > 0).
Resource("events").
Body(event).
Do().
Into(result)
return result, err
}
// UpdateWithEventNamespace modifies an existing event.
// It returns the copy of the event that the server returns, or an error.
// The namespace and key to update the event within is deduced from the event.
// The namespace must either match this event client's namespace, or this event client must have been
// created with the "" namespace.
// Update also requires the ResourceVersion to be set in the event object.
func (e *events) UpdateWithEventNamespace(event *v1beta1.Event) (*v1beta1.Event, error) {
if e.ns != "" && event.Namespace != e.ns {
return nil, fmt.Errorf("can't update an event with namespace '%v' in namespace '%v'", event.Namespace, e.ns)
}
result := &v1beta1.Event{}
err := e.client.Put().
NamespaceIfScoped(event.Namespace, len(event.Namespace) > 0).
Resource("events").
Name(event.Name).
Body(event).
Do().
Into(result)
return result, err
}
// PatchWithEventNamespace modifies an existing event.
// It returns the copy of the event that the server returns, or an error.
// The namespace and name of the target event is deduced from the event.
// The namespace must either match this event client's namespace, or this event client must
// have been created with the "" namespace.
func (e *events) PatchWithEventNamespace(event *v1beta1.Event, data []byte) (*v1beta1.Event, error) {
if e.ns != "" && event.Namespace != e.ns {
return nil, fmt.Errorf("can't patch an event with namespace '%v' in namespace '%v'", event.Namespace, e.ns)
}
result := &v1beta1.Event{}
err := e.client.Patch(types.StrategicMergePatchType).
NamespaceIfScoped(event.Namespace, len(event.Namespace) > 0).
Resource("events").
Name(event.Name).
Body(data).
Do().
Into(result)
return result, err
}

View File

@@ -0,0 +1,66 @@
/*
Copyright 2014 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package fake
import (
v1beta1 "k8s.io/api/events/v1beta1"
types "k8s.io/apimachinery/pkg/types"
core "k8s.io/client-go/testing"
)
// CreateWithEventNamespace creats a new event. Returns the copy of the event the server returns, or an error.
func (c *FakeEvents) CreateWithEventNamespace(event *v1beta1.Event) (*v1beta1.Event, error) {
action := core.NewRootCreateAction(eventsResource, event)
if c.ns != "" {
action = core.NewCreateAction(eventsResource, c.ns, event)
}
obj, err := c.Fake.Invokes(action, event)
if obj == nil {
return nil, err
}
return obj.(*v1beta1.Event), err
}
// UpdateWithEventNamespace replaces an existing event. Returns the copy of the event the server returns, or an error.
func (c *FakeEvents) UpdateWithEventNamespace(event *v1beta1.Event) (*v1beta1.Event, error) {
action := core.NewRootUpdateAction(eventsResource, event)
if c.ns != "" {
action = core.NewUpdateAction(eventsResource, c.ns, event)
}
obj, err := c.Fake.Invokes(action, event)
if obj == nil {
return nil, err
}
return obj.(*v1beta1.Event), err
}
// PatchWithEventNamespace patches an existing event. Returns the copy of the event the server returns, or an error.
func (c *FakeEvents) PatchWithEventNamespace(event *v1beta1.Event, data []byte) (*v1beta1.Event, error) {
pt := types.StrategicMergePatchType
action := core.NewRootPatchAction(eventsResource, event.Name, pt, data)
if c.ns != "" {
action = core.NewPatchAction(eventsResource, c.ns, event.Name, pt, data)
}
obj, err := c.Fake.Invokes(action, event)
if obj == nil {
return nil, err
}
return obj.(*v1beta1.Event), err
}

View File

@@ -17,5 +17,3 @@ limitations under the License.
// Code generated by client-gen. DO NOT EDIT.
package v1beta1
type EventExpansion interface{}

View File

@@ -287,7 +287,7 @@ func (ts *azureTokenSource) refreshToken(token *azureToken) (*azureToken, error)
return nil, err
}
oauthConfig, err := adal.NewOAuthConfig(env.ActiveDirectoryEndpoint, token.tenantID)
oauthConfig, err := adal.NewOAuthConfigWithAPIVersion(env.ActiveDirectoryEndpoint, token.tenantID, nil)
if err != nil {
return nil, fmt.Errorf("building the OAuth configuration for token refresh: %v", err)
}
@@ -344,7 +344,7 @@ func newAzureTokenSourceDeviceCode(environment azure.Environment, clientID strin
}
func (ts *azureTokenSourceDeviceCode) Token() (*azureToken, error) {
oauthConfig, err := adal.NewOAuthConfig(ts.environment.ActiveDirectoryEndpoint, ts.tenantID)
oauthConfig, err := adal.NewOAuthConfigWithAPIVersion(ts.environment.ActiveDirectoryEndpoint, ts.tenantID, nil)
if err != nil {
return nil, fmt.Errorf("building the OAuth configuration for device code authentication: %v", err)
}

View File

@@ -74,9 +74,10 @@ func (c *Config) TransportConfig() (*transport.Config, error) {
KeyFile: c.KeyFile,
KeyData: c.KeyData,
},
Username: c.Username,
Password: c.Password,
BearerToken: c.BearerToken,
Username: c.Username,
Password: c.Password,
BearerToken: c.BearerToken,
BearerTokenFile: c.BearerTokenFile,
Impersonate: transport.ImpersonationConfig{
UserName: c.Impersonate.UserName,
Groups: c.Impersonate.Groups,

View File

@@ -105,7 +105,7 @@ func LoadFromFile(path string) (*Info, error) {
// The fields of client.Config with a corresponding field in the Info are set
// with the value from the Info.
func (info Info) MergeWithConfig(c restclient.Config) (restclient.Config, error) {
var config restclient.Config = c
var config = c
config.Username = info.User
config.Password = info.Password
config.CAFile = info.CAFile
@@ -118,6 +118,7 @@ func (info Info) MergeWithConfig(c restclient.Config) (restclient.Config, error)
return config, nil
}
// Complete returns true if the Kubernetes API authorization info is complete.
func (info Info) Complete() bool {
return len(info.User) > 0 ||
len(info.CertFile) > 0 ||

View File

@@ -295,13 +295,6 @@ func isDeletionDup(a, b *Delta) *Delta {
return b
}
// willObjectBeDeletedLocked returns true only if the last delta for the
// given object is Delete. Caller must lock first.
func (f *DeltaFIFO) willObjectBeDeletedLocked(id string) bool {
deltas := f.items[id]
return len(deltas) > 0 && deltas[len(deltas)-1].Type == Deleted
}
// queueActionLocked appends to the delta list for the object.
// Caller must lock first.
func (f *DeltaFIFO) queueActionLocked(actionType DeltaType, obj interface{}) error {
@@ -310,13 +303,6 @@ func (f *DeltaFIFO) queueActionLocked(actionType DeltaType, obj interface{}) err
return KeyError{obj, err}
}
// If object is supposed to be deleted (last event is Deleted),
// then we should ignore Sync events, because it would result in
// recreation of this object.
if actionType == Sync && f.willObjectBeDeletedLocked(id) {
return nil
}
newDeltas := append(f.items[id], Delta{actionType, obj})
newDeltas = dedupDeltas(newDeltas)

View File

@@ -85,6 +85,33 @@ func TestDeltaFIFO_basic(t *testing.T) {
}
}
// TestDeltaFIFO_replaceWithDeleteDeltaIn tests that a `Sync` delta for an
// object `O` with ID `X` is added when .Replace is called and `O` is among the
// replacement objects even if the DeltaFIFO already stores in terminal position
// a delta of type `Delete` for ID `X`. Not adding the `Sync` delta causes
// SharedIndexInformers to miss `O`'s create notification, see https://github.com/kubernetes/kubernetes/issues/83810
// for more details.
func TestDeltaFIFO_replaceWithDeleteDeltaIn(t *testing.T) {
oldObj := mkFifoObj("foo", 1)
newObj := mkFifoObj("foo", 2)
f := NewDeltaFIFO(testFifoObjectKeyFunc, keyLookupFunc(func() []testFifoObject {
return []testFifoObject{oldObj}
}))
f.Delete(oldObj)
f.Replace([]interface{}{newObj}, "")
actualDeltas := Pop(f)
expectedDeltas := Deltas{
Delta{Type: Deleted, Object: oldObj},
Delta{Type: Sync, Object: newObj},
}
if !reflect.DeepEqual(expectedDeltas, actualDeltas) {
t.Errorf("expected %#v, got %#v", expectedDeltas, actualDeltas)
}
}
func TestDeltaFIFO_requeueOnPop(t *testing.T) {
f := NewDeltaFIFO(testFifoObjectKeyFunc, nil)

View File

@@ -83,7 +83,7 @@ var (
// NewNamespaceKeyedIndexerAndReflector creates an Indexer and a Reflector
// The indexer is configured to key on namespace
func NewNamespaceKeyedIndexerAndReflector(lw ListerWatcher, expectedType interface{}, resyncPeriod time.Duration) (indexer Indexer, reflector *Reflector) {
indexer = NewIndexer(MetaNamespaceKeyFunc, Indexers{"namespace": MetaNamespaceIndexFunc})
indexer = NewIndexer(MetaNamespaceKeyFunc, Indexers{NamespaceIndex: MetaNamespaceIndexFunc})
reflector = NewReflector(lw, expectedType, indexer, resyncPeriod)
return indexer, reflector
}

View File

@@ -228,6 +228,7 @@ func (config *DirectClientConfig) getUserIdentificationPartialConfig(configAuthI
// blindly overwrite existing values based on precedence
if len(configAuthInfo.Token) > 0 {
mergedConfig.BearerToken = configAuthInfo.Token
mergedConfig.BearerTokenFile = configAuthInfo.TokenFile
} else if len(configAuthInfo.TokenFile) > 0 {
tokenBytes, err := ioutil.ReadFile(configAuthInfo.TokenFile)
if err != nil {
@@ -489,8 +490,9 @@ func (config *inClusterClientConfig) ClientConfig() (*restclient.Config, error)
if server := config.overrides.ClusterInfo.Server; len(server) > 0 {
icc.Host = server
}
if token := config.overrides.AuthInfo.Token; len(token) > 0 {
icc.BearerToken = token
if len(config.overrides.AuthInfo.Token) > 0 || len(config.overrides.AuthInfo.TokenFile) > 0 {
icc.BearerToken = config.overrides.AuthInfo.Token
icc.BearerTokenFile = config.overrides.AuthInfo.TokenFile
}
if certificateAuthorityFile := config.overrides.ClusterInfo.CertificateAuthority; len(certificateAuthorityFile) > 0 {
icc.TLSClientConfig.CAFile = certificateAuthorityFile

View File

@@ -548,6 +548,30 @@ func TestInClusterClientConfigPrecedence(t *testing.T) {
},
},
},
{
overrides: &ConfigOverrides{
ClusterInfo: clientcmdapi.Cluster{
Server: "https://host-from-overrides.com",
CertificateAuthority: "/path/to/ca-from-overrides.crt",
},
AuthInfo: clientcmdapi.AuthInfo{
Token: "token-from-override",
TokenFile: "tokenfile-from-override",
},
},
},
{
overrides: &ConfigOverrides{
ClusterInfo: clientcmdapi.Cluster{
Server: "https://host-from-overrides.com",
CertificateAuthority: "/path/to/ca-from-overrides.crt",
},
AuthInfo: clientcmdapi.AuthInfo{
Token: "",
TokenFile: "tokenfile-from-override",
},
},
},
{
overrides: &ConfigOverrides{},
},
@@ -556,13 +580,15 @@ func TestInClusterClientConfigPrecedence(t *testing.T) {
for _, tc := range tt {
expectedServer := "https://host-from-cluster.com"
expectedToken := "token-from-cluster"
expectedTokenFile := "tokenfile-from-cluster"
expectedCAFile := "/path/to/ca-from-cluster.crt"
icc := &inClusterClientConfig{
inClusterConfigProvider: func() (*restclient.Config, error) {
return &restclient.Config{
Host: expectedServer,
BearerToken: expectedToken,
Host: expectedServer,
BearerToken: expectedToken,
BearerTokenFile: expectedTokenFile,
TLSClientConfig: restclient.TLSClientConfig{
CAFile: expectedCAFile,
},
@@ -579,8 +605,9 @@ func TestInClusterClientConfigPrecedence(t *testing.T) {
if overridenServer := tc.overrides.ClusterInfo.Server; len(overridenServer) > 0 {
expectedServer = overridenServer
}
if overridenToken := tc.overrides.AuthInfo.Token; len(overridenToken) > 0 {
expectedToken = overridenToken
if len(tc.overrides.AuthInfo.Token) > 0 || len(tc.overrides.AuthInfo.TokenFile) > 0 {
expectedToken = tc.overrides.AuthInfo.Token
expectedTokenFile = tc.overrides.AuthInfo.TokenFile
}
if overridenCAFile := tc.overrides.ClusterInfo.CertificateAuthority; len(overridenCAFile) > 0 {
expectedCAFile = overridenCAFile
@@ -592,6 +619,9 @@ func TestInClusterClientConfigPrecedence(t *testing.T) {
if clientConfig.BearerToken != expectedToken {
t.Errorf("Expected token %v, got %v", expectedToken, clientConfig.BearerToken)
}
if clientConfig.BearerTokenFile != expectedTokenFile {
t.Errorf("Expected tokenfile %v, got %v", expectedTokenFile, clientConfig.BearerTokenFile)
}
if clientConfig.TLSClientConfig.CAFile != expectedCAFile {
t.Errorf("Expected Certificate Authority %v, got %v", expectedCAFile, clientConfig.TLSClientConfig.CAFile)
}

View File

@@ -0,0 +1,312 @@
/*
Copyright 2019 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package events
import (
"os"
"sync"
"time"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/util/clock"
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
"k8s.io/apimachinery/pkg/watch"
restclient "k8s.io/client-go/rest"
"k8s.io/api/events/v1beta1"
"k8s.io/apimachinery/pkg/util/json"
"k8s.io/apimachinery/pkg/util/strategicpatch"
"k8s.io/apimachinery/pkg/util/wait"
typedv1beta1 "k8s.io/client-go/kubernetes/typed/events/v1beta1"
"k8s.io/client-go/tools/record/util"
"k8s.io/klog"
)
const (
maxTriesPerEvent = 12
finishTime = 6 * time.Minute
refreshTime = 30 * time.Minute
maxQueuedEvents = 1000
)
var defaultSleepDuration = 10 * time.Second
// TODO: validate impact of copying and investigate hashing
type eventKey struct {
action string
reason string
reportingController string
reportingInstance string
regarding corev1.ObjectReference
related corev1.ObjectReference
}
type eventBroadcasterImpl struct {
*watch.Broadcaster
mu sync.Mutex
eventCache map[eventKey]*v1beta1.Event
sleepDuration time.Duration
sink EventSink
}
// EventSinkImpl wraps EventInterface to implement EventSink.
// TODO: this makes it easier for testing purpose and masks the logic of performing API calls.
// Note that rollbacking to raw clientset should also be transparent.
type EventSinkImpl struct {
Interface typedv1beta1.EventInterface
}
// Create is the same as CreateWithEventNamespace of the EventExpansion
func (e *EventSinkImpl) Create(event *v1beta1.Event) (*v1beta1.Event, error) {
return e.Interface.CreateWithEventNamespace(event)
}
// Update is the same as UpdateithEventNamespace of the EventExpansion
func (e *EventSinkImpl) Update(event *v1beta1.Event) (*v1beta1.Event, error) {
return e.Interface.UpdateWithEventNamespace(event)
}
// Patch is the same as PatchWithEventNamespace of the EventExpansion
func (e *EventSinkImpl) Patch(event *v1beta1.Event, data []byte) (*v1beta1.Event, error) {
return e.Interface.PatchWithEventNamespace(event, data)
}
// NewBroadcaster Creates a new event broadcaster.
func NewBroadcaster(sink EventSink) EventBroadcaster {
return newBroadcaster(sink, defaultSleepDuration, map[eventKey]*v1beta1.Event{})
}
// NewBroadcasterForTest Creates a new event broadcaster for test purposes.
func newBroadcaster(sink EventSink, sleepDuration time.Duration, eventCache map[eventKey]*v1beta1.Event) EventBroadcaster {
return &eventBroadcasterImpl{
Broadcaster: watch.NewBroadcaster(maxQueuedEvents, watch.DropIfChannelFull),
eventCache: eventCache,
sleepDuration: sleepDuration,
sink: sink,
}
}
// refreshExistingEventSeries refresh events TTL
func (e *eventBroadcasterImpl) refreshExistingEventSeries() {
// TODO: Investigate whether lock contention won't be a problem
e.mu.Lock()
defer e.mu.Unlock()
for isomorphicKey, event := range e.eventCache {
if event.Series != nil {
if recordedEvent, retry := recordEvent(e.sink, event); !retry {
e.eventCache[isomorphicKey] = recordedEvent
}
}
}
}
// finishSeries checks if a series has ended and either:
// - write final count to the apiserver
// - delete a singleton event (i.e. series field is nil) from the cache
func (e *eventBroadcasterImpl) finishSeries() {
// TODO: Investigate whether lock contention won't be a problem
e.mu.Lock()
defer e.mu.Unlock()
for isomorphicKey, event := range e.eventCache {
eventSerie := event.Series
if eventSerie != nil {
if eventSerie.LastObservedTime.Time.Before(time.Now().Add(-finishTime)) {
if _, retry := recordEvent(e.sink, event); !retry {
delete(e.eventCache, isomorphicKey)
}
}
} else if event.EventTime.Time.Before(time.Now().Add(-finishTime)) {
delete(e.eventCache, isomorphicKey)
}
}
}
// NewRecorder returns an EventRecorder that records events with the given event source.
func (e *eventBroadcasterImpl) NewRecorder(scheme *runtime.Scheme, reportingController string) EventRecorder {
hostname, _ := os.Hostname()
reportingInstance := reportingController + "-" + hostname
return &recorderImpl{scheme, reportingController, reportingInstance, e.Broadcaster, clock.RealClock{}}
}
func (e *eventBroadcasterImpl) recordToSink(event *v1beta1.Event, clock clock.Clock) {
// Make a copy before modification, because there could be multiple listeners.
eventCopy := event.DeepCopy()
go func() {
evToRecord := func() *v1beta1.Event {
e.mu.Lock()
defer e.mu.Unlock()
eventKey := getKey(eventCopy)
isomorphicEvent, isIsomorphic := e.eventCache[eventKey]
if isIsomorphic {
if isomorphicEvent.Series != nil {
isomorphicEvent.Series.Count++
isomorphicEvent.Series.LastObservedTime = metav1.MicroTime{Time: clock.Now()}
return nil
}
isomorphicEvent.Series = &v1beta1.EventSeries{
Count: 1,
LastObservedTime: metav1.MicroTime{Time: clock.Now()},
}
return isomorphicEvent
}
e.eventCache[eventKey] = eventCopy
return eventCopy
}()
if evToRecord != nil {
recordedEvent := e.attemptRecording(evToRecord)
if recordedEvent != nil {
recordedEventKey := getKey(recordedEvent)
e.mu.Lock()
defer e.mu.Unlock()
e.eventCache[recordedEventKey] = recordedEvent
}
}
}()
}
func (e *eventBroadcasterImpl) attemptRecording(event *v1beta1.Event) *v1beta1.Event {
tries := 0
for {
if recordedEvent, retry := recordEvent(e.sink, event); !retry {
return recordedEvent
}
tries++
if tries >= maxTriesPerEvent {
klog.Errorf("Unable to write event '%#v' (retry limit exceeded!)", event)
return nil
}
// Randomize sleep so that various clients won't all be
// synced up if the master goes down.
time.Sleep(wait.Jitter(e.sleepDuration, 0.25))
}
}
func recordEvent(sink EventSink, event *v1beta1.Event) (*v1beta1.Event, bool) {
var newEvent *v1beta1.Event
var err error
isEventSeries := event.Series != nil
if isEventSeries {
patch, err := createPatchBytesForSeries(event)
if err != nil {
klog.Errorf("Unable to calculate diff, no merge is possible: %v", err)
return nil, false
}
newEvent, err = sink.Patch(event, patch)
}
// Update can fail because the event may have been removed and it no longer exists.
if !isEventSeries || (isEventSeries && util.IsKeyNotFoundError(err)) {
// Making sure that ResourceVersion is empty on creation
event.ResourceVersion = ""
newEvent, err = sink.Create(event)
}
if err == nil {
return newEvent, false
}
// If we can't contact the server, then hold everything while we keep trying.
// Otherwise, something about the event is malformed and we should abandon it.
switch err.(type) {
case *restclient.RequestConstructionError:
// We will construct the request the same next time, so don't keep trying.
klog.Errorf("Unable to construct event '%#v': '%v' (will not retry!)", event, err)
return nil, false
case *errors.StatusError:
if errors.IsAlreadyExists(err) {
klog.V(5).Infof("Server rejected event '%#v': '%v' (will not retry!)", event, err)
} else {
klog.Errorf("Server rejected event '%#v': '%v' (will not retry!)", event, err)
}
return nil, false
case *errors.UnexpectedObjectError:
// We don't expect this; it implies the server's response didn't match a
// known pattern. Go ahead and retry.
default:
// This case includes actual http transport errors. Go ahead and retry.
}
klog.Errorf("Unable to write event: '%v' (may retry after sleeping)", err)
return nil, true
}
func createPatchBytesForSeries(event *v1beta1.Event) ([]byte, error) {
oldEvent := event.DeepCopy()
oldEvent.Series = nil
oldData, err := json.Marshal(oldEvent)
if err != nil {
return nil, err
}
newData, err := json.Marshal(event)
if err != nil {
return nil, err
}
return strategicpatch.CreateTwoWayMergePatch(oldData, newData, v1beta1.Event{})
}
func getKey(event *v1beta1.Event) eventKey {
key := eventKey{
action: event.Action,
reason: event.Reason,
reportingController: event.ReportingController,
reportingInstance: event.ReportingInstance,
regarding: event.Regarding,
}
if event.Related != nil {
key.related = *event.Related
}
return key
}
// startEventWatcher starts sending events received from this EventBroadcaster to the given event handler function.
// The return value is used to stop recording
func (e *eventBroadcasterImpl) startEventWatcher(eventHandler func(event runtime.Object)) func() {
watcher := e.Watch()
go func() {
defer utilruntime.HandleCrash()
for {
watchEvent, ok := <-watcher.ResultChan()
if !ok {
return
}
eventHandler(watchEvent.Object)
}
}()
return watcher.Stop
}
// StartRecordingToSink starts sending events received from the specified eventBroadcaster to the given sink.
func (e *eventBroadcasterImpl) StartRecordingToSink(stopCh <-chan struct{}) {
go wait.Until(func() {
e.refreshExistingEventSeries()
}, refreshTime, stopCh)
go wait.Until(func() {
e.finishSeries()
}, finishTime, stopCh)
eventHandler := func(obj runtime.Object) {
event, ok := obj.(*v1beta1.Event)
if !ok {
klog.Errorf("unexpected type, expected v1beta1.Event")
return
}
e.recordToSink(event, clock.RealClock{})
}
stopWatcher := e.startEventWatcher(eventHandler)
go func() {
<-stopCh
stopWatcher()
}()
}

View File

@@ -0,0 +1,92 @@
/*
Copyright 2019 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package events
import (
"fmt"
"time"
v1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/util/clock"
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
"k8s.io/apimachinery/pkg/watch"
"k8s.io/client-go/tools/reference"
"k8s.io/api/events/v1beta1"
"k8s.io/client-go/tools/record/util"
"k8s.io/klog"
)
type recorderImpl struct {
scheme *runtime.Scheme
reportingController string
reportingInstance string
*watch.Broadcaster
clock clock.Clock
}
func (recorder *recorderImpl) Eventf(regarding runtime.Object, related runtime.Object, eventtype, reason, action, note string, args ...interface{}) {
timestamp := metav1.MicroTime{time.Now()}
message := fmt.Sprintf(note, args...)
refRegarding, err := reference.GetReference(recorder.scheme, regarding)
if err != nil {
klog.Errorf("Could not construct reference to: '%#v' due to: '%v'. Will not report event: '%v' '%v' '%v'", regarding, err, eventtype, reason, message)
return
}
refRelated, err := reference.GetReference(recorder.scheme, related)
if err != nil {
klog.Errorf("Could not construct reference to: '%#v' due to: '%v'.", related, err)
}
if !util.ValidateEventType(eventtype) {
klog.Errorf("Unsupported event type: '%v'", eventtype)
return
}
event := recorder.makeEvent(refRegarding, refRelated, timestamp, eventtype, reason, message, recorder.reportingController, recorder.reportingInstance, action)
go func() {
defer utilruntime.HandleCrash()
recorder.Action(watch.Added, event)
}()
}
func (recorder *recorderImpl) makeEvent(refRegarding *v1.ObjectReference, refRelated *v1.ObjectReference, timestamp metav1.MicroTime, eventtype, reason, message string, reportingController string, reportingInstance string, action string) *v1beta1.Event {
t := metav1.Time{Time: recorder.clock.Now()}
namespace := refRegarding.Namespace
if namespace == "" {
namespace = metav1.NamespaceSystem
}
return &v1beta1.Event{
ObjectMeta: metav1.ObjectMeta{
Name: fmt.Sprintf("%v.%x", refRegarding.Name, t.UnixNano()),
Namespace: namespace,
},
EventTime: timestamp,
Series: nil,
ReportingController: reportingController,
ReportingInstance: reportingInstance,
Action: action,
Reason: reason,
Regarding: *refRegarding,
Related: refRelated,
Note: message,
Type: eventtype,
// TODO: remove this when we change conversion to convert eventSource
// to reportingController
DeprecatedSource: v1.EventSource{Component: reportingController},
}
}

View File

@@ -0,0 +1,349 @@
/*
Copyright 2019 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package events
import (
"strconv"
"testing"
"time"
"os"
"strings"
v1 "k8s.io/api/core/v1"
"k8s.io/api/events/v1beta1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
k8sruntime "k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/client-go/kubernetes/scheme"
ref "k8s.io/client-go/tools/reference"
)
type testEventSeriesSink struct {
OnCreate func(e *v1beta1.Event) (*v1beta1.Event, error)
OnUpdate func(e *v1beta1.Event) (*v1beta1.Event, error)
OnPatch func(e *v1beta1.Event, p []byte) (*v1beta1.Event, error)
}
// Create records the event for testing.
func (t *testEventSeriesSink) Create(e *v1beta1.Event) (*v1beta1.Event, error) {
if t.OnCreate != nil {
return t.OnCreate(e)
}
return e, nil
}
// Update records the event for testing.
func (t *testEventSeriesSink) Update(e *v1beta1.Event) (*v1beta1.Event, error) {
if t.OnUpdate != nil {
return t.OnUpdate(e)
}
return e, nil
}
// Patch records the event for testing.
func (t *testEventSeriesSink) Patch(e *v1beta1.Event, p []byte) (*v1beta1.Event, error) {
if t.OnPatch != nil {
return t.OnPatch(e, p)
}
return e, nil
}
func TestEventSeriesf(t *testing.T) {
hostname, _ := os.Hostname()
testPod := &v1.Pod{
ObjectMeta: metav1.ObjectMeta{
SelfLink: "/api/version/pods/foo",
Name: "foo",
Namespace: "baz",
UID: "bar",
},
}
regarding, err := ref.GetPartialReference(scheme.Scheme, testPod, ".spec.containers[1]")
if err != nil {
t.Fatal(err)
}
related, err := ref.GetPartialReference(scheme.Scheme, testPod, ".spec.containers[0]")
if err != nil {
t.Fatal(err)
}
expectedEvent := &v1beta1.Event{
ObjectMeta: metav1.ObjectMeta{
Name: "foo",
Namespace: "baz",
},
EventTime: metav1.MicroTime{time.Now()},
ReportingController: "eventTest",
ReportingInstance: "eventTest-" + hostname,
Action: "started",
Reason: "test",
Regarding: *regarding,
Related: related,
Note: "some verbose message: 1",
Type: v1.EventTypeNormal,
}
isomorphicEvent := expectedEvent.DeepCopy()
nonIsomorphicEvent := expectedEvent.DeepCopy()
nonIsomorphicEvent.Action = "stopped"
expectedEvent.Series = &v1beta1.EventSeries{Count: 1}
table := []struct {
regarding k8sruntime.Object
related k8sruntime.Object
actual *v1beta1.Event
elements []interface{}
expect *v1beta1.Event
expectUpdate bool
}{
{
regarding: regarding,
related: related,
actual: isomorphicEvent,
elements: []interface{}{1},
expect: expectedEvent,
expectUpdate: true,
},
{
regarding: regarding,
related: related,
actual: nonIsomorphicEvent,
elements: []interface{}{1},
expect: nonIsomorphicEvent,
expectUpdate: false,
},
}
stopCh := make(chan struct{})
createEvent := make(chan *v1beta1.Event)
updateEvent := make(chan *v1beta1.Event)
patchEvent := make(chan *v1beta1.Event)
testEvents := testEventSeriesSink{
OnCreate: func(event *v1beta1.Event) (*v1beta1.Event, error) {
createEvent <- event
return event, nil
},
OnUpdate: func(event *v1beta1.Event) (*v1beta1.Event, error) {
updateEvent <- event
return event, nil
},
OnPatch: func(event *v1beta1.Event, patch []byte) (*v1beta1.Event, error) {
// event we receive is already patched, usually the sink uses it only to retrieve the name and namespace, here
// we'll use it directly
patchEvent <- event
return event, nil
},
}
eventBroadcaster := newBroadcaster(&testEvents, 0, map[eventKey]*v1beta1.Event{})
recorder := eventBroadcaster.NewRecorder(scheme.Scheme, "eventTest")
eventBroadcaster.StartRecordingToSink(stopCh)
recorder.Eventf(regarding, related, isomorphicEvent.Type, isomorphicEvent.Reason, isomorphicEvent.Action, isomorphicEvent.Note, []interface{}{1})
// read from the chan as this was needed only to populate the cache
<-createEvent
for index, item := range table {
actual := item.actual
recorder.Eventf(item.regarding, item.related, actual.Type, actual.Reason, actual.Action, actual.Note, item.elements)
// validate event
if item.expectUpdate {
actualEvent := <-patchEvent
t.Logf("%v - validating event affected by patch request", index)
validateEventSerie(strconv.Itoa(index), true, actualEvent, item.expect, t)
} else {
actualEvent := <-createEvent
t.Logf("%v - validating event affected by a create request", index)
validateEventSerie(strconv.Itoa(index), false, actualEvent, item.expect, t)
}
}
close(stopCh)
}
func validateEventSerie(messagePrefix string, expectedUpdate bool, actualEvent *v1beta1.Event, expectedEvent *v1beta1.Event, t *testing.T) {
recvEvent := *actualEvent
// Just check that the timestamp was set.
if recvEvent.EventTime.IsZero() {
t.Errorf("%v - timestamp wasn't set: %#v", messagePrefix, recvEvent)
}
if expectedUpdate {
if recvEvent.Series == nil {
t.Errorf("%v - Series was nil but expected: %#v", messagePrefix, recvEvent.Series)
} else {
if recvEvent.Series.Count != expectedEvent.Series.Count {
t.Errorf("%v - Series mismatch actual was: %#v but expected: %#v", messagePrefix, recvEvent.Series, expectedEvent.Series)
}
}
// Check that name has the right prefix.
if n, en := recvEvent.Name, expectedEvent.Name; !strings.HasPrefix(n, en) {
t.Errorf("%v - Name '%v' does not contain prefix '%v'", messagePrefix, n, en)
}
} else {
if recvEvent.Series != nil {
t.Errorf("%v - series was expected to be nil but was: %#v", messagePrefix, recvEvent.Series)
}
}
}
func TestFinishSeries(t *testing.T) {
hostname, _ := os.Hostname()
testPod := &v1.Pod{
ObjectMeta: metav1.ObjectMeta{
SelfLink: "/api/version/pods/foo",
Name: "foo",
Namespace: "baz",
UID: "bar",
},
}
regarding, err := ref.GetPartialReference(scheme.Scheme, testPod, ".spec.containers[1]")
if err != nil {
t.Fatal(err)
}
related, err := ref.GetPartialReference(scheme.Scheme, testPod, ".spec.containers[0]")
if err != nil {
t.Fatal(err)
}
LastObservedTime := metav1.MicroTime{Time: time.Now().Add(-9 * time.Minute)}
createEvent := make(chan *v1beta1.Event, 10)
updateEvent := make(chan *v1beta1.Event, 10)
patchEvent := make(chan *v1beta1.Event, 10)
testEvents := testEventSeriesSink{
OnCreate: func(event *v1beta1.Event) (*v1beta1.Event, error) {
createEvent <- event
return event, nil
},
OnUpdate: func(event *v1beta1.Event) (*v1beta1.Event, error) {
updateEvent <- event
return event, nil
},
OnPatch: func(event *v1beta1.Event, patch []byte) (*v1beta1.Event, error) {
// event we receive is already patched, usually the sink uses it
// only to retrieve the name and namespace, here we'll use it directly
patchEvent <- event
return event, nil
},
}
cache := map[eventKey]*v1beta1.Event{}
eventBroadcaster := newBroadcaster(&testEvents, 0, cache).(*eventBroadcasterImpl)
recorder := eventBroadcaster.NewRecorder(scheme.Scheme, "k8s.io/kube-foo").(*recorderImpl)
cachedEvent := recorder.makeEvent(regarding, related, metav1.MicroTime{time.Now()}, v1.EventTypeNormal, "test", "some verbose message: 1", "eventTest", "eventTest-"+hostname, "started")
nonFinishedEvent := cachedEvent.DeepCopy()
nonFinishedEvent.ReportingController = "nonFinished-controller"
cachedEvent.Series = &v1beta1.EventSeries{
Count: 10,
LastObservedTime: LastObservedTime,
}
cache[getKey(cachedEvent)] = cachedEvent
cache[getKey(nonFinishedEvent)] = nonFinishedEvent
eventBroadcaster.finishSeries()
select {
case actualEvent := <-patchEvent:
t.Logf("validating event affected by patch request")
eventBroadcaster.mu.Lock()
defer eventBroadcaster.mu.Unlock()
if len(cache) != 1 {
t.Errorf("cache should be empty, but instead got a size of %v", len(cache))
}
if !actualEvent.Series.LastObservedTime.Equal(&cachedEvent.Series.LastObservedTime) {
t.Errorf("series was expected be seen with LastObservedTime %v, but instead got %v ", cachedEvent.Series.LastObservedTime, actualEvent.Series.LastObservedTime)
}
// check that we emitted only one event
if len(patchEvent) != 0 || len(createEvent) != 0 || len(updateEvent) != 0 {
t.Errorf("exactly one event should be emitted, but got %v", len(patchEvent))
}
case <-time.After(wait.ForeverTestTimeout):
t.Fatalf("timeout after %v", wait.ForeverTestTimeout)
}
}
func TestRefreshExistingEventSeries(t *testing.T) {
hostname, _ := os.Hostname()
testPod := &v1.Pod{
ObjectMeta: metav1.ObjectMeta{
SelfLink: "/api/version/pods/foo",
Name: "foo",
Namespace: "baz",
UID: "bar",
},
}
regarding, err := ref.GetPartialReference(scheme.Scheme, testPod, ".spec.containers[1]")
if err != nil {
t.Fatal(err)
}
related, err := ref.GetPartialReference(scheme.Scheme, testPod, ".spec.containers[0]")
if err != nil {
t.Fatal(err)
}
LastObservedTime := metav1.MicroTime{Time: time.Now().Add(-9 * time.Minute)}
createEvent := make(chan *v1beta1.Event, 10)
updateEvent := make(chan *v1beta1.Event, 10)
patchEvent := make(chan *v1beta1.Event, 10)
testEvents := testEventSeriesSink{
OnCreate: func(event *v1beta1.Event) (*v1beta1.Event, error) {
createEvent <- event
return event, nil
},
OnUpdate: func(event *v1beta1.Event) (*v1beta1.Event, error) {
updateEvent <- event
return event, nil
},
OnPatch: func(event *v1beta1.Event, patch []byte) (*v1beta1.Event, error) {
// event we receive is already patched, usually the sink uses it
//only to retrieve the name and namespace, here we'll use it directly.
patchEvent <- event
return event, nil
},
}
cache := map[eventKey]*v1beta1.Event{}
eventBroadcaster := newBroadcaster(&testEvents, 0, cache).(*eventBroadcasterImpl)
recorder := eventBroadcaster.NewRecorder(scheme.Scheme, "k8s.io/kube-foo").(*recorderImpl)
cachedEvent := recorder.makeEvent(regarding, related, metav1.MicroTime{time.Now()}, v1.EventTypeNormal, "test", "some verbose message: 1", "eventTest", "eventTest-"+hostname, "started")
cachedEvent.Series = &v1beta1.EventSeries{
Count: 10,
LastObservedTime: LastObservedTime,
}
cacheKey := getKey(cachedEvent)
cache[cacheKey] = cachedEvent
eventBroadcaster.refreshExistingEventSeries()
select {
case <-patchEvent:
t.Logf("validating event affected by patch request")
eventBroadcaster.mu.Lock()
defer eventBroadcaster.mu.Unlock()
if len(cache) != 1 {
t.Errorf("cache should be with same size, but instead got a size of %v", len(cache))
}
// check that we emitted only one event
if len(patchEvent) != 0 || len(createEvent) != 0 || len(updateEvent) != 0 {
t.Errorf("exactly one event should be emitted, but got %v", len(patchEvent))
}
case <-time.After(wait.ForeverTestTimeout):
t.Fatalf("timeout after %v", wait.ForeverTestTimeout)
}
}

View File

@@ -0,0 +1,58 @@
/*
Copyright 2019 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package events
import (
"k8s.io/api/events/v1beta1"
"k8s.io/apimachinery/pkg/runtime"
)
// EventRecorder knows how to record events on behalf of an EventSource.
type EventRecorder interface {
// Eventf constructs an event from the given information and puts it in the queue for sending.
// 'regarding' is the object this event is about. Event will make a reference-- or you may also
// pass a reference to the object directly.
// 'related' is the secondary object for more complex actions. E.g. when regarding object triggers
// a creation or deletion of related object.
// 'type' of this event, and can be one of Normal, Warning. New types could be added in future
// 'reason' is the reason this event is generated. 'reason' should be short and unique; it
// should be in UpperCamelCase format (starting with a capital letter). "reason" will be used
// to automate handling of events, so imagine people writing switch statements to handle them.
// You want to make that easy.
// 'note' is intended to be human readable.
Eventf(regarding runtime.Object, related runtime.Object, eventtype, reason, action, note string, args ...interface{})
}
// EventBroadcaster knows how to receive events and send them to any EventSink, watcher, or log.
type EventBroadcaster interface {
// StartRecordingToSink starts sending events received from the specified eventBroadcaster.
StartRecordingToSink(stopCh <-chan struct{})
// NewRecorder returns an EventRecorder that can be used to send events to this EventBroadcaster
// with the event source set to the given event source.
NewRecorder(scheme *runtime.Scheme, reportingController string) EventRecorder
}
// EventSink knows how to store events (client-go implements it.)
// EventSink must respect the namespace that will be embedded in 'event'.
// It is assumed that EventSink will return the same sorts of errors as
// client-go's REST client.
type EventSink interface {
Create(event *v1beta1.Event) (*v1beta1.Event, error)
Update(event *v1beta1.Event) (*v1beta1.Event, error)
Patch(oldEvent *v1beta1.Event, data []byte) (*v1beta1.Event, error)
}

View File

@@ -16,12 +16,16 @@ limitations under the License.
// Package leaderelection implements leader election of a set of endpoints.
// It uses an annotation in the endpoints object to store the record of the
// election state.
// election state. This implementation does not guarantee that only one
// client is acting as a leader (a.k.a. fencing).
//
// This implementation does not guarantee that only one client is acting as a
// leader (a.k.a. fencing). A client observes timestamps captured locally to
// infer the state of the leader election. Thus the implementation is tolerant
// to arbitrary clock skew, but is not tolerant to arbitrary clock skew rate.
// A client only acts on timestamps captured locally to infer the state of the
// leader election. The client does not consider timestamps in the leader
// election record to be accurate because these timestamps may not have been
// produced by a local clock. The implemention does not depend on their
// accuracy and only uses their change to indicate that another client has
// renewed the leader lease. Thus the implementation is tolerant to arbitrary
// clock skew, but is not tolerant to arbitrary clock skew rate.
//
// However the level of tolerance to skew rate can be configured by setting
// RenewDeadline and LeaseDuration appropriately. The tolerance expressed as a
@@ -105,12 +109,26 @@ type LeaderElectionConfig struct {
// LeaseDuration is the duration that non-leader candidates will
// wait to force acquire leadership. This is measured against time of
// last observed ack.
//
// A client needs to wait a full LeaseDuration without observing a change to
// the record before it can attempt to take over. When all clients are
// shutdown and a new set of clients are started with different names against
// the same leader record, they must wait the full LeaseDuration before
// attempting to acquire the lease. Thus LeaseDuration should be as short as
// possible (within your tolerance for clock skew rate) to avoid a possible
// long waits in the scenario.
//
// Core clients default this value to 15 seconds.
LeaseDuration time.Duration
// RenewDeadline is the duration that the acting master will retry
// refreshing leadership before giving up.
//
// Core clients default this value to 10 seconds.
RenewDeadline time.Duration
// RetryPeriod is the duration the LeaderElector clients should wait
// between tries of actions.
//
// Core clients default this value to 2 seconds.
RetryPeriod time.Duration
// Callbacks are callbacks that are triggered during certain lifecycle

View File

@@ -33,8 +33,8 @@ import (
"k8s.io/apimachinery/pkg/util/runtime"
)
// PortForwardProtocolV1Name is the subprotocol used for port forwarding.
// TODO move to API machinery and re-unify with kubelet/server/portfoward
// The subprotocol "portforward.k8s.io" is used for port forwarding.
const PortForwardProtocolV1Name = "portforward.k8s.io"
// PortForwarder knows how to listen for local connections and forward them to
@@ -401,6 +401,7 @@ func (pf *PortForwarder) handleConnection(conn net.Conn, port ForwardedPort) {
}
}
// Close stops all listeners of PortForwarder.
func (pf *PortForwarder) Close() {
// stop all listeners
for _, l := range pf.listeners {

View File

@@ -153,7 +153,7 @@ func TestEventf(t *testing.T) {
Name: "foo",
Namespace: "baz",
UID: "bar",
APIVersion: "version",
APIVersion: "v1",
FieldPath: "spec.containers[2]",
},
Reason: "Started",
@@ -162,7 +162,7 @@ func TestEventf(t *testing.T) {
Count: 1,
Type: v1.EventTypeNormal,
},
expectLog: `Event(v1.ObjectReference{Kind:"Pod", Namespace:"baz", Name:"foo", UID:"bar", APIVersion:"version", ResourceVersion:"", FieldPath:"spec.containers[2]"}): type: 'Normal' reason: 'Started' some verbose message: 1`,
expectLog: `Event(v1.ObjectReference{Kind:"Pod", Namespace:"baz", Name:"foo", UID:"bar", APIVersion:"v1", ResourceVersion:"", FieldPath:"spec.containers[2]"}): type: 'Normal' reason: 'Started' some verbose message: 1`,
expectUpdate: false,
},
{
@@ -181,7 +181,7 @@ func TestEventf(t *testing.T) {
Name: "foo",
Namespace: "baz",
UID: "bar",
APIVersion: "version",
APIVersion: "v1",
},
Reason: "Killed",
Message: "some other verbose message: 1",
@@ -189,7 +189,7 @@ func TestEventf(t *testing.T) {
Count: 1,
Type: v1.EventTypeNormal,
},
expectLog: `Event(v1.ObjectReference{Kind:"Pod", Namespace:"baz", Name:"foo", UID:"bar", APIVersion:"version", ResourceVersion:"", FieldPath:""}): type: 'Normal' reason: 'Killed' some other verbose message: 1`,
expectLog: `Event(v1.ObjectReference{Kind:"Pod", Namespace:"baz", Name:"foo", UID:"bar", APIVersion:"v1", ResourceVersion:"", FieldPath:""}): type: 'Normal' reason: 'Killed' some other verbose message: 1`,
expectUpdate: false,
},
{
@@ -208,7 +208,7 @@ func TestEventf(t *testing.T) {
Name: "foo",
Namespace: "baz",
UID: "bar",
APIVersion: "version",
APIVersion: "v1",
FieldPath: "spec.containers[2]",
},
Reason: "Started",
@@ -217,7 +217,7 @@ func TestEventf(t *testing.T) {
Count: 2,
Type: v1.EventTypeNormal,
},
expectLog: `Event(v1.ObjectReference{Kind:"Pod", Namespace:"baz", Name:"foo", UID:"bar", APIVersion:"version", ResourceVersion:"", FieldPath:"spec.containers[2]"}): type: 'Normal' reason: 'Started' some verbose message: 1`,
expectLog: `Event(v1.ObjectReference{Kind:"Pod", Namespace:"baz", Name:"foo", UID:"bar", APIVersion:"v1", ResourceVersion:"", FieldPath:"spec.containers[2]"}): type: 'Normal' reason: 'Started' some verbose message: 1`,
expectUpdate: true,
},
{
@@ -236,7 +236,7 @@ func TestEventf(t *testing.T) {
Name: "foo",
Namespace: "baz",
UID: "differentUid",
APIVersion: "version",
APIVersion: "v1",
FieldPath: "spec.containers[3]",
},
Reason: "Started",
@@ -245,7 +245,7 @@ func TestEventf(t *testing.T) {
Count: 1,
Type: v1.EventTypeNormal,
},
expectLog: `Event(v1.ObjectReference{Kind:"Pod", Namespace:"baz", Name:"foo", UID:"differentUid", APIVersion:"version", ResourceVersion:"", FieldPath:"spec.containers[3]"}): type: 'Normal' reason: 'Started' some verbose message: 1`,
expectLog: `Event(v1.ObjectReference{Kind:"Pod", Namespace:"baz", Name:"foo", UID:"differentUid", APIVersion:"v1", ResourceVersion:"", FieldPath:"spec.containers[3]"}): type: 'Normal' reason: 'Started' some verbose message: 1`,
expectUpdate: false,
},
{
@@ -264,7 +264,7 @@ func TestEventf(t *testing.T) {
Name: "foo",
Namespace: "baz",
UID: "bar",
APIVersion: "version",
APIVersion: "v1",
FieldPath: "spec.containers[2]",
},
Reason: "Started",
@@ -273,7 +273,7 @@ func TestEventf(t *testing.T) {
Count: 3,
Type: v1.EventTypeNormal,
},
expectLog: `Event(v1.ObjectReference{Kind:"Pod", Namespace:"baz", Name:"foo", UID:"bar", APIVersion:"version", ResourceVersion:"", FieldPath:"spec.containers[2]"}): type: 'Normal' reason: 'Started' some verbose message: 1`,
expectLog: `Event(v1.ObjectReference{Kind:"Pod", Namespace:"baz", Name:"foo", UID:"bar", APIVersion:"v1", ResourceVersion:"", FieldPath:"spec.containers[2]"}): type: 'Normal' reason: 'Started' some verbose message: 1`,
expectUpdate: true,
},
{
@@ -292,7 +292,7 @@ func TestEventf(t *testing.T) {
Name: "foo",
Namespace: "baz",
UID: "differentUid",
APIVersion: "version",
APIVersion: "v1",
FieldPath: "spec.containers[3]",
},
Reason: "Stopped",
@@ -301,7 +301,7 @@ func TestEventf(t *testing.T) {
Count: 1,
Type: v1.EventTypeNormal,
},
expectLog: `Event(v1.ObjectReference{Kind:"Pod", Namespace:"baz", Name:"foo", UID:"differentUid", APIVersion:"version", ResourceVersion:"", FieldPath:"spec.containers[3]"}): type: 'Normal' reason: 'Stopped' some verbose message: 1`,
expectLog: `Event(v1.ObjectReference{Kind:"Pod", Namespace:"baz", Name:"foo", UID:"differentUid", APIVersion:"v1", ResourceVersion:"", FieldPath:"spec.containers[3]"}): type: 'Normal' reason: 'Stopped' some verbose message: 1`,
expectUpdate: false,
},
{
@@ -320,7 +320,7 @@ func TestEventf(t *testing.T) {
Name: "foo",
Namespace: "baz",
UID: "differentUid",
APIVersion: "version",
APIVersion: "v1",
FieldPath: "spec.containers[3]",
},
Reason: "Stopped",
@@ -329,7 +329,7 @@ func TestEventf(t *testing.T) {
Count: 2,
Type: v1.EventTypeNormal,
},
expectLog: `Event(v1.ObjectReference{Kind:"Pod", Namespace:"baz", Name:"foo", UID:"differentUid", APIVersion:"version", ResourceVersion:"", FieldPath:"spec.containers[3]"}): type: 'Normal' reason: 'Stopped' some verbose message: 1`,
expectLog: `Event(v1.ObjectReference{Kind:"Pod", Namespace:"baz", Name:"foo", UID:"differentUid", APIVersion:"v1", ResourceVersion:"", FieldPath:"spec.containers[3]"}): type: 'Normal' reason: 'Stopped' some verbose message: 1`,
expectUpdate: true,
},
}
@@ -509,7 +509,7 @@ func TestLotsOfEvents(t *testing.T) {
Name: fmt.Sprintf("foo-%v", i),
Namespace: "baz",
UID: "bar",
APIVersion: "version",
APIVersion: "v1",
}
// we need to vary the reason to prevent aggregation
go recorder.Eventf(ref, v1.EventTypeNormal, "Reason-"+string(i), strconv.Itoa(i))
@@ -567,7 +567,7 @@ func TestEventfNoNamespace(t *testing.T) {
Name: "foo",
Namespace: "",
UID: "bar",
APIVersion: "version",
APIVersion: "v1",
FieldPath: "spec.containers[2]",
},
Reason: "Started",
@@ -576,7 +576,7 @@ func TestEventfNoNamespace(t *testing.T) {
Count: 1,
Type: v1.EventTypeNormal,
},
expectLog: `Event(v1.ObjectReference{Kind:"Pod", Namespace:"", Name:"foo", UID:"bar", APIVersion:"version", ResourceVersion:"", FieldPath:"spec.containers[2]"}): type: 'Normal' reason: 'Started' some verbose message: 1`,
expectLog: `Event(v1.ObjectReference{Kind:"Pod", Namespace:"", Name:"foo", UID:"bar", APIVersion:"v1", ResourceVersion:"", FieldPath:"spec.containers[2]"}): type: 'Normal' reason: 'Started' some verbose message: 1`,
expectUpdate: false,
},
}
@@ -677,7 +677,7 @@ func TestMultiSinkCache(t *testing.T) {
Name: "foo",
Namespace: "baz",
UID: "bar",
APIVersion: "version",
APIVersion: "v1",
FieldPath: "spec.containers[2]",
},
Reason: "Started",
@@ -686,7 +686,7 @@ func TestMultiSinkCache(t *testing.T) {
Count: 1,
Type: v1.EventTypeNormal,
},
expectLog: `Event(v1.ObjectReference{Kind:"Pod", Namespace:"baz", Name:"foo", UID:"bar", APIVersion:"version", ResourceVersion:"", FieldPath:"spec.containers[2]"}): type: 'Normal' reason: 'Started' some verbose message: 1`,
expectLog: `Event(v1.ObjectReference{Kind:"Pod", Namespace:"baz", Name:"foo", UID:"bar", APIVersion:"v1", ResourceVersion:"", FieldPath:"spec.containers[2]"}): type: 'Normal' reason: 'Started' some verbose message: 1`,
expectUpdate: false,
},
{
@@ -705,7 +705,7 @@ func TestMultiSinkCache(t *testing.T) {
Name: "foo",
Namespace: "baz",
UID: "bar",
APIVersion: "version",
APIVersion: "v1",
},
Reason: "Killed",
Message: "some other verbose message: 1",
@@ -713,7 +713,7 @@ func TestMultiSinkCache(t *testing.T) {
Count: 1,
Type: v1.EventTypeNormal,
},
expectLog: `Event(v1.ObjectReference{Kind:"Pod", Namespace:"baz", Name:"foo", UID:"bar", APIVersion:"version", ResourceVersion:"", FieldPath:""}): type: 'Normal' reason: 'Killed' some other verbose message: 1`,
expectLog: `Event(v1.ObjectReference{Kind:"Pod", Namespace:"baz", Name:"foo", UID:"bar", APIVersion:"v1", ResourceVersion:"", FieldPath:""}): type: 'Normal' reason: 'Killed' some other verbose message: 1`,
expectUpdate: false,
},
{
@@ -732,7 +732,7 @@ func TestMultiSinkCache(t *testing.T) {
Name: "foo",
Namespace: "baz",
UID: "bar",
APIVersion: "version",
APIVersion: "v1",
FieldPath: "spec.containers[2]",
},
Reason: "Started",
@@ -741,7 +741,7 @@ func TestMultiSinkCache(t *testing.T) {
Count: 2,
Type: v1.EventTypeNormal,
},
expectLog: `Event(v1.ObjectReference{Kind:"Pod", Namespace:"baz", Name:"foo", UID:"bar", APIVersion:"version", ResourceVersion:"", FieldPath:"spec.containers[2]"}): type: 'Normal' reason: 'Started' some verbose message: 1`,
expectLog: `Event(v1.ObjectReference{Kind:"Pod", Namespace:"baz", Name:"foo", UID:"bar", APIVersion:"v1", ResourceVersion:"", FieldPath:"spec.containers[2]"}): type: 'Normal' reason: 'Started' some verbose message: 1`,
expectUpdate: true,
},
{
@@ -760,7 +760,7 @@ func TestMultiSinkCache(t *testing.T) {
Name: "foo",
Namespace: "baz",
UID: "differentUid",
APIVersion: "version",
APIVersion: "v1",
FieldPath: "spec.containers[3]",
},
Reason: "Started",
@@ -769,7 +769,7 @@ func TestMultiSinkCache(t *testing.T) {
Count: 1,
Type: v1.EventTypeNormal,
},
expectLog: `Event(v1.ObjectReference{Kind:"Pod", Namespace:"baz", Name:"foo", UID:"differentUid", APIVersion:"version", ResourceVersion:"", FieldPath:"spec.containers[3]"}): type: 'Normal' reason: 'Started' some verbose message: 1`,
expectLog: `Event(v1.ObjectReference{Kind:"Pod", Namespace:"baz", Name:"foo", UID:"differentUid", APIVersion:"v1", ResourceVersion:"", FieldPath:"spec.containers[3]"}): type: 'Normal' reason: 'Started' some verbose message: 1`,
expectUpdate: false,
},
{
@@ -788,7 +788,7 @@ func TestMultiSinkCache(t *testing.T) {
Name: "foo",
Namespace: "baz",
UID: "bar",
APIVersion: "version",
APIVersion: "v1",
FieldPath: "spec.containers[2]",
},
Reason: "Started",
@@ -797,7 +797,7 @@ func TestMultiSinkCache(t *testing.T) {
Count: 3,
Type: v1.EventTypeNormal,
},
expectLog: `Event(v1.ObjectReference{Kind:"Pod", Namespace:"baz", Name:"foo", UID:"bar", APIVersion:"version", ResourceVersion:"", FieldPath:"spec.containers[2]"}): type: 'Normal' reason: 'Started' some verbose message: 1`,
expectLog: `Event(v1.ObjectReference{Kind:"Pod", Namespace:"baz", Name:"foo", UID:"bar", APIVersion:"v1", ResourceVersion:"", FieldPath:"spec.containers[2]"}): type: 'Normal' reason: 'Started' some verbose message: 1`,
expectUpdate: true,
},
{
@@ -816,7 +816,7 @@ func TestMultiSinkCache(t *testing.T) {
Name: "foo",
Namespace: "baz",
UID: "differentUid",
APIVersion: "version",
APIVersion: "v1",
FieldPath: "spec.containers[3]",
},
Reason: "Stopped",
@@ -825,7 +825,7 @@ func TestMultiSinkCache(t *testing.T) {
Count: 1,
Type: v1.EventTypeNormal,
},
expectLog: `Event(v1.ObjectReference{Kind:"Pod", Namespace:"baz", Name:"foo", UID:"differentUid", APIVersion:"version", ResourceVersion:"", FieldPath:"spec.containers[3]"}): type: 'Normal' reason: 'Stopped' some verbose message: 1`,
expectLog: `Event(v1.ObjectReference{Kind:"Pod", Namespace:"baz", Name:"foo", UID:"differentUid", APIVersion:"v1", ResourceVersion:"", FieldPath:"spec.containers[3]"}): type: 'Normal' reason: 'Stopped' some verbose message: 1`,
expectUpdate: false,
},
{
@@ -844,7 +844,7 @@ func TestMultiSinkCache(t *testing.T) {
Name: "foo",
Namespace: "baz",
UID: "differentUid",
APIVersion: "version",
APIVersion: "v1",
FieldPath: "spec.containers[3]",
},
Reason: "Stopped",
@@ -853,7 +853,7 @@ func TestMultiSinkCache(t *testing.T) {
Count: 2,
Type: v1.EventTypeNormal,
},
expectLog: `Event(v1.ObjectReference{Kind:"Pod", Namespace:"baz", Name:"foo", UID:"differentUid", APIVersion:"version", ResourceVersion:"", FieldPath:"spec.containers[3]"}): type: 'Normal' reason: 'Stopped' some verbose message: 1`,
expectLog: `Event(v1.ObjectReference{Kind:"Pod", Namespace:"baz", Name:"foo", UID:"differentUid", APIVersion:"v1", ResourceVersion:"", FieldPath:"spec.containers[3]"}): type: 'Normal' reason: 'Stopped' some verbose message: 1`,
expectUpdate: true,
},
}

View File

@@ -19,8 +19,6 @@ package reference
import (
"errors"
"fmt"
"net/url"
"strings"
"k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/meta"
@@ -30,8 +28,7 @@ import (
var (
// Errors that could be returned by GetReference.
ErrNilObject = errors.New("can't reference a nil object")
ErrNoSelfLink = errors.New("selfLink was empty, can't make reference")
ErrNilObject = errors.New("can't reference a nil object")
)
// GetReference returns an ObjectReference which refers to the given
@@ -47,20 +44,6 @@ func GetReference(scheme *runtime.Scheme, obj runtime.Object) (*v1.ObjectReferen
return ref, nil
}
gvk := obj.GetObjectKind().GroupVersionKind()
// if the object referenced is actually persisted, we can just get kind from meta
// if we are building an object reference to something not yet persisted, we should fallback to scheme
kind := gvk.Kind
if len(kind) == 0 {
// TODO: this is wrong
gvks, _, err := scheme.ObjectKinds(obj)
if err != nil {
return nil, err
}
kind = gvks[0].Kind
}
// An object that implements only List has enough metadata to build a reference
var listMeta metav1.Common
objectMeta, err := meta.Accessor(obj)
@@ -73,29 +56,29 @@ func GetReference(scheme *runtime.Scheme, obj runtime.Object) (*v1.ObjectReferen
listMeta = objectMeta
}
// if the object referenced is actually persisted, we can also get version from meta
version := gvk.GroupVersion().String()
if len(version) == 0 {
selfLink := listMeta.GetSelfLink()
if len(selfLink) == 0 {
return nil, ErrNoSelfLink
}
selfLinkUrl, err := url.Parse(selfLink)
gvk := obj.GetObjectKind().GroupVersionKind()
// If object meta doesn't contain data about kind and/or version,
// we are falling back to scheme.
//
// TODO: This doesn't work for CRDs, which are not registered in scheme.
if gvk.Empty() {
gvks, _, err := scheme.ObjectKinds(obj)
if err != nil {
return nil, err
}
// example paths: /<prefix>/<version>/*
parts := strings.Split(selfLinkUrl.Path, "/")
if len(parts) < 4 {
return nil, fmt.Errorf("unexpected self link format: '%v'; got version '%v'", selfLink, version)
}
if parts[1] == "api" {
version = parts[2]
} else {
version = parts[2] + "/" + parts[3]
if len(gvks) == 0 || gvks[0].Empty() {
return nil, fmt.Errorf("unexpected gvks registered for object %T: %v", obj, gvks)
}
// TODO: The same object can be registered for multiple group versions
// (although in practise this doesn't seem to be used).
// In such case, the version set may not be correct.
gvk = gvks[0]
}
kind := gvk.Kind
version := gvk.GroupVersion().String()
// only has list metadata
if objectMeta == nil {
return &v1.ObjectReference{

View File

@@ -37,29 +37,31 @@ func TestGetReferenceRefVersion(t *testing.T) {
tests := []struct {
name string
input *TestRuntimeObj
groupVersion schema.GroupVersion
expectedRefVersion string
}{
{
name: "api from selflink",
name: "v1 GV from scheme",
input: &TestRuntimeObj{
ObjectMeta: metav1.ObjectMeta{SelfLink: "/api/v1/namespaces"},
ObjectMeta: metav1.ObjectMeta{SelfLink: "/bad-selflink/unused"},
},
groupVersion: schema.GroupVersion{Group: "", Version: "v1"},
expectedRefVersion: "v1",
},
{
name: "foo.group/v3 from selflink",
name: "foo.group/v3 GV from scheme",
input: &TestRuntimeObj{
ObjectMeta: metav1.ObjectMeta{SelfLink: "/apis/foo.group/v3/namespaces"},
ObjectMeta: metav1.ObjectMeta{SelfLink: "/bad-selflink/unused"},
},
groupVersion: schema.GroupVersion{Group: "foo.group", Version: "v3"},
expectedRefVersion: "foo.group/v3",
},
}
scheme := runtime.NewScheme()
scheme.AddKnownTypes(schema.GroupVersion{Group: "this", Version: "is ignored"}, &TestRuntimeObj{})
for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
scheme := runtime.NewScheme()
scheme.AddKnownTypes(test.groupVersion, &TestRuntimeObj{})
ref, err := GetReference(scheme, test.input)
if err != nil {
t.Fatal(err)

View File

@@ -18,42 +18,86 @@ package watch
import (
"sync"
"sync/atomic"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/watch"
"k8s.io/client-go/tools/cache"
)
func newTicketer() *ticketer {
return &ticketer{
func newEventProcessor(out chan<- watch.Event) *eventProcessor {
return &eventProcessor{
out: out,
cond: sync.NewCond(&sync.Mutex{}),
done: make(chan struct{}),
}
}
type ticketer struct {
counter uint64
// eventProcessor buffers events and writes them to an out chan when a reader
// is waiting. Because of the requirement to buffer events, it synchronizes
// input with a condition, and synchronizes output with a channels. It needs to
// be able to yield while both waiting on an input condition and while blocked
// on writing to the output channel.
type eventProcessor struct {
out chan<- watch.Event
cond *sync.Cond
current uint64
cond *sync.Cond
buff []watch.Event
done chan struct{}
}
func (t *ticketer) GetTicket() uint64 {
// -1 to start from 0
return atomic.AddUint64(&t.counter, 1) - 1
func (e *eventProcessor) run() {
for {
batch := e.takeBatch()
e.writeBatch(batch)
if e.stopped() {
return
}
}
}
func (t *ticketer) WaitForTicket(ticket uint64, f func()) {
t.cond.L.Lock()
defer t.cond.L.Unlock()
for ticket != t.current {
t.cond.Wait()
func (e *eventProcessor) takeBatch() []watch.Event {
e.cond.L.Lock()
defer e.cond.L.Unlock()
for len(e.buff) == 0 && !e.stopped() {
e.cond.Wait()
}
f()
batch := e.buff
e.buff = nil
return batch
}
t.current++
t.cond.Broadcast()
func (e *eventProcessor) writeBatch(events []watch.Event) {
for _, event := range events {
select {
case e.out <- event:
case <-e.done:
return
}
}
}
func (e *eventProcessor) push(event watch.Event) {
e.cond.L.Lock()
defer e.cond.L.Unlock()
defer e.cond.Signal()
e.buff = append(e.buff, event)
}
func (e *eventProcessor) stopped() bool {
select {
case <-e.done:
return true
default:
return false
}
}
func (e *eventProcessor) stop() {
close(e.done)
e.cond.Signal()
}
// NewIndexerInformerWatcher will create an IndexerInformer and wrap it into watch.Interface
@@ -61,55 +105,44 @@ func (t *ticketer) WaitForTicket(ticket uint64, f func()) {
// it also returns a channel you can use to wait for the informers to fully shutdown.
func NewIndexerInformerWatcher(lw cache.ListerWatcher, objType runtime.Object) (cache.Indexer, cache.Controller, watch.Interface, <-chan struct{}) {
ch := make(chan watch.Event)
doneCh := make(chan struct{})
w := watch.NewProxyWatcher(ch)
t := newTicketer()
e := newEventProcessor(ch)
indexer, informer := cache.NewIndexerInformer(lw, objType, 0, cache.ResourceEventHandlerFuncs{
AddFunc: func(obj interface{}) {
go t.WaitForTicket(t.GetTicket(), func() {
select {
case ch <- watch.Event{
Type: watch.Added,
Object: obj.(runtime.Object),
}:
case <-w.StopChan():
}
e.push(watch.Event{
Type: watch.Added,
Object: obj.(runtime.Object),
})
},
UpdateFunc: func(old, new interface{}) {
go t.WaitForTicket(t.GetTicket(), func() {
select {
case ch <- watch.Event{
Type: watch.Modified,
Object: new.(runtime.Object),
}:
case <-w.StopChan():
}
e.push(watch.Event{
Type: watch.Modified,
Object: new.(runtime.Object),
})
},
DeleteFunc: func(obj interface{}) {
go t.WaitForTicket(t.GetTicket(), func() {
staleObj, stale := obj.(cache.DeletedFinalStateUnknown)
if stale {
// We have no means of passing the additional information down using watch API based on watch.Event
// but the caller can filter such objects by checking if metadata.deletionTimestamp is set
obj = staleObj
}
staleObj, stale := obj.(cache.DeletedFinalStateUnknown)
if stale {
// We have no means of passing the additional information down using
// watch API based on watch.Event but the caller can filter such
// objects by checking if metadata.deletionTimestamp is set
obj = staleObj
}
select {
case ch <- watch.Event{
Type: watch.Deleted,
Object: obj.(runtime.Object),
}:
case <-w.StopChan():
}
e.push(watch.Event{
Type: watch.Deleted,
Object: obj.(runtime.Object),
})
},
}, cache.Indexers{})
go e.run()
doneCh := make(chan struct{})
go func() {
defer close(doneCh)
defer e.stop()
informer.Run(w.StopChan())
}()

View File

@@ -17,8 +17,9 @@ limitations under the License.
package watch
import (
"math/rand"
"context"
"reflect"
goruntime "runtime"
"sort"
"testing"
"time"
@@ -28,6 +29,7 @@ import (
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apimachinery/pkg/util/diff"
"k8s.io/apimachinery/pkg/watch"
fakeclientset "k8s.io/client-go/kubernetes/fake"
@@ -35,6 +37,86 @@ import (
"k8s.io/client-go/tools/cache"
)
// TestEventProcessorExit is expected to timeout if the event processor fails
// to exit when stopped.
func TestEventProcessorExit(t *testing.T) {
event := watch.Event{}
tests := []struct {
name string
write func(e *eventProcessor)
}{
{
name: "exit on blocked read",
write: func(e *eventProcessor) {
e.push(event)
},
},
{
name: "exit on blocked write",
write: func(e *eventProcessor) {
e.push(event)
e.push(event)
},
},
}
for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
out := make(chan watch.Event)
e := newEventProcessor(out)
test.write(e)
exited := make(chan struct{})
go func() {
e.run()
close(exited)
}()
<-out
e.stop()
goruntime.Gosched()
<-exited
})
}
}
type apiInt int
func (apiInt) GetObjectKind() schema.ObjectKind { return nil }
func (apiInt) DeepCopyObject() runtime.Object { return nil }
func TestEventProcessorOrdersEvents(t *testing.T) {
out := make(chan watch.Event)
e := newEventProcessor(out)
go e.run()
numProcessed := 0
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
go func() {
for i := 0; i < 1000; i++ {
e := <-out
if got, want := int(e.Object.(apiInt)), i; got != want {
t.Errorf("unexpected event: got=%d, want=%d", got, want)
}
numProcessed++
}
cancel()
}()
for i := 0; i < 1000; i++ {
e.push(watch.Event{Object: apiInt(i)})
}
<-ctx.Done()
e.stop()
if numProcessed != 1000 {
t.Errorf("unexpected number of events processed: %d", numProcessed)
}
}
type byEventTypeAndName []watch.Event
func (a byEventTypeAndName) Len() int { return len(a) }
@@ -51,44 +133,6 @@ func (a byEventTypeAndName) Less(i, j int) bool {
return a[i].Object.(*corev1.Secret).Name < a[j].Object.(*corev1.Secret).Name
}
func TestTicketer(t *testing.T) {
tg := newTicketer()
const numTickets = 100 // current golang limit for race detector is 8192 simultaneously alive goroutines
var tickets []uint64
for i := 0; i < numTickets; i++ {
ticket := tg.GetTicket()
tickets = append(tickets, ticket)
exp, got := uint64(i), ticket
if got != exp {
t.Fatalf("expected ticket %d, got %d", exp, got)
}
}
// shuffle tickets
rand.Shuffle(len(tickets), func(i, j int) {
tickets[i], tickets[j] = tickets[j], tickets[i]
})
res := make(chan uint64, len(tickets))
for _, ticket := range tickets {
go func(ticket uint64) {
time.Sleep(time.Duration(rand.Intn(50)) * time.Millisecond)
tg.WaitForTicket(ticket, func() {
res <- ticket
})
}(ticket)
}
for i := 0; i < numTickets; i++ {
exp, got := uint64(i), <-res
if got != exp {
t.Fatalf("expected ticket %d, got %d", exp, got)
}
}
}
func TestNewInformerWatcher(t *testing.T) {
// Make sure there are no 2 same types of events on a secret with the same name or that might be flaky.
tt := []struct {

View File

@@ -17,6 +17,7 @@ limitations under the License.
package certificate
import (
"context"
"crypto/ecdsa"
"crypto/elliptic"
cryptorand "crypto/rand"
@@ -148,9 +149,13 @@ type CSRClientFunc func(current *tls.Certificate) (certificatesclient.Certificat
func (e *NoCertKeyError) Error() string { return string(*e) }
type manager struct {
getTemplate func() *x509.CertificateRequest
lastRequestLock sync.Mutex
lastRequest *x509.CertificateRequest
getTemplate func() *x509.CertificateRequest
// lastRequestLock guards lastRequestCancel and lastRequest
lastRequestLock sync.Mutex
lastRequestCancel context.CancelFunc
lastRequest *x509.CertificateRequest
dynamicTemplate bool
usages []certificates.KeyUsage
forceRotation bool
@@ -261,7 +266,8 @@ func (m *manager) Start() {
case <-timer.C:
// unblock when deadline expires
case <-templateChanged:
if reflect.DeepEqual(m.getLastRequest(), m.getTemplate()) {
_, lastRequestTemplate := m.getLastRequest()
if reflect.DeepEqual(lastRequestTemplate, m.getTemplate()) {
// if the template now matches what we last requested, restart the rotation deadline loop
return
}
@@ -289,10 +295,19 @@ func (m *manager) Start() {
if m.dynamicTemplate {
go wait.Until(func() {
// check if the current template matches what we last requested
if !m.certSatisfiesTemplate() && !reflect.DeepEqual(m.getLastRequest(), m.getTemplate()) {
lastRequestCancel, lastRequestTemplate := m.getLastRequest()
if !m.certSatisfiesTemplate() && !reflect.DeepEqual(lastRequestTemplate, m.getTemplate()) {
// if the template is different, queue up an interrupt of the rotation deadline loop.
// if we've requested a CSR that matches the new template by the time the interrupt is handled, the interrupt is disregarded.
templateChanged <- struct{}{}
if lastRequestCancel != nil {
// if we're currently waiting on a submitted request that no longer matches what we want, stop waiting
lastRequestCancel()
}
select {
case templateChanged <- struct{}{}:
case <-m.stopCh:
}
}
}, time.Second, m.stopCh)
}
@@ -386,12 +401,15 @@ func (m *manager) rotateCerts() (bool, error) {
return false, m.updateServerError(err)
}
ctx, cancel := context.WithTimeout(context.Background(), certificateWaitTimeout)
defer cancel()
// Once we've successfully submitted a CSR for this template, record that we did so
m.setLastRequest(template)
m.setLastRequest(cancel, template)
// Wait for the certificate to be signed. This interface and internal timout
// is a remainder after the old design using raw watch wrapped with backoff.
crtPEM, err := csr.WaitForCertificate(client, req, certificateWaitTimeout)
crtPEM, err := csr.WaitForCertificate(ctx, client, req)
if err != nil {
utilruntime.HandleError(fmt.Errorf("Certificate request was not signed: %v", err))
return false, nil
@@ -561,14 +579,15 @@ func (m *manager) generateCSR() (template *x509.CertificateRequest, csrPEM []byt
return template, csrPEM, keyPEM, privateKey, nil
}
func (m *manager) getLastRequest() *x509.CertificateRequest {
func (m *manager) getLastRequest() (context.CancelFunc, *x509.CertificateRequest) {
m.lastRequestLock.Lock()
defer m.lastRequestLock.Unlock()
return m.lastRequest
return m.lastRequestCancel, m.lastRequest
}
func (m *manager) setLastRequest(r *x509.CertificateRequest) {
func (m *manager) setLastRequest(cancel context.CancelFunc, r *x509.CertificateRequest) {
m.lastRequestLock.Lock()
defer m.lastRequestLock.Unlock()
m.lastRequestCancel = cancel
m.lastRequest = r
}

View File

@@ -82,7 +82,7 @@ func RequestCertificate(client certificatesclient.CertificateSigningRequestInter
}
// WaitForCertificate waits for a certificate to be issued until timeout, or returns an error.
func WaitForCertificate(client certificatesclient.CertificateSigningRequestInterface, req *certificates.CertificateSigningRequest, timeout time.Duration) (certData []byte, err error) {
func WaitForCertificate(ctx context.Context, client certificatesclient.CertificateSigningRequestInterface, req *certificates.CertificateSigningRequest) (certData []byte, err error) {
fieldSelector := fields.OneTermEqualSelector("metadata.name", req.Name).String()
lw := &cache.ListWatch{
ListFunc: func(options metav1.ListOptions) (runtime.Object, error) {
@@ -94,8 +94,6 @@ func WaitForCertificate(client certificatesclient.CertificateSigningRequestInter
return client.Watch(options)
},
}
ctx, cancel := watchtools.ContextWithOptionalTimeout(context.Background(), timeout)
defer cancel()
event, err := watchtools.UntilWithSync(
ctx,
lw,

View File

@@ -18,6 +18,7 @@ package workqueue
import (
"container/heap"
"sync"
"time"
"k8s.io/apimachinery/pkg/util/clock"
@@ -66,6 +67,8 @@ type delayingType struct {
// stopCh lets us signal a shutdown to the waiting loop
stopCh chan struct{}
// stopOnce guarantees we only signal shutdown a single time
stopOnce sync.Once
// heartbeat ensures we wait no more than maxWait before firing
heartbeat clock.Ticker
@@ -133,11 +136,14 @@ func (pq waitForPriorityQueue) Peek() interface{} {
return pq[0]
}
// ShutDown gives a way to shut off this queue
// ShutDown stops the queue. After the queue drains, the returned shutdown bool
// on Get() will be true. This method may be invoked more than once.
func (q *delayingType) ShutDown() {
q.Interface.ShutDown()
close(q.stopCh)
q.heartbeat.Stop()
q.stopOnce.Do(func() {
q.Interface.ShutDown()
close(q.stopCh)
q.heartbeat.Stop()
})
}
// AddAfter adds the given item to the work queue after the given delay