Compare commits

...

30 Commits

Author SHA1 Message Date
Kubernetes Publisher
772a768f43 Merge pull request #100954 from saschagrunert/automated-cherry-pick-of-#99839-upstream-release-1.18
Automated cherry pick of #99839: Cleanup portforward streams after their usage

Kubernetes-commit: 1d2742f26c6a5ca4df6e3b48fce5f754baa90224
2021-05-18 10:43:43 +00:00
Kubernetes Publisher
fa3acefe68 Merge pull request #101179 from apelisse/cherrypick-smd-version
Pin sigs.k8s.io/structured-merged-diff/v3 at v3.0.1

Kubernetes-commit: a5d842bef97260d07eb54cc6e0ad57b64f6cd51b
2021-05-18 10:43:42 +00:00
Antoine Pelisse
00131ab7d1 Pin sigs.k8s.io/structured-merged-diff/v3 at v3.0.1
Kubernetes-commit: ecb1af7cce27dc31bc8a47a5fd5cf83fe7dac9c4
2021-04-15 20:53:19 -07:00
Kubernetes Publisher
08715c7129 Merge pull request #100514 from joelsmith/release-1.18
[1.18] Cherry pick of #98477: update gogo/protobuf to v1.3.2

Kubernetes-commit: 55c7f26659b8cf4ce1a424a4619197c2499e2247
2021-03-29 19:33:11 +00:00
Joel Smith
2dd6a2b6b8 update gogo/protobuf to v1.3.2
gogo/protobuf@v1.3.2 fixes https://cve.mitre.org/cgi-bin/cvename.cgi?name=CVE-2021-3121

Ref: https://github.com/kubernetes/client-go/issues/927

Kubernetes-commit: f54f378b2be6a7a2a1b34d8cefbba799126a05bb
2021-03-23 23:25:28 -06:00
Kubernetes Publisher
b994da18bc Merge pull request #100376 from liggitt/http2-1.18
Cherrypick #95981 to 1.18, Enables HTTP/2 health check

Kubernetes-commit: e680574e6e218b29fca89aac9d83ad82341c8ac4
2021-03-25 11:23:31 +00:00
Sascha Grunert
a52149042f Cleanup portforward streams after their usage
This implements a stream cleanup when using portforwardings. Before
applying this patch, the streams []httpstream.Stream within
`spdy/connection.go` would fill-up for each streaming request. This
could result in heavy memory usage. Now we use the stream identifier to
keep track of them and finally remove them again once they're no longer
needed.

Signed-off-by: Sascha Grunert <sgrunert@redhat.com>

Kubernetes-commit: cccf49ce247c82c570b586be5438b5a7c24760d9
2021-03-05 13:39:04 +01:00
Chao Xu
02a8022313 Add a unit test testing the HTTP/2 health check help the REST client
detects broken TCP connections.

Kubernetes-commit: 126c82fba468780f5db23847aefface0c81b83a5
2020-10-29 23:03:14 -07:00
Jordan Liggitt
9b6f62d95a update golang.org/x/net and golang.org/x/sys, equivalent of PR 96549
Kubernetes-commit: 95ceebc8f528f8990c3b91809ca7c50a7c24be93
2021-03-18 13:09:09 -04:00
Kubernetes Publisher
649866c409 Merge pull request #95354 from sfowl/automated-cherry-pick-of-#95316-upstream-release-1.18
Automated cherry pick of #95316: Mask bearer token in logs when logLevel >= 9

Kubernetes-commit: 16bf87fc78f1e51bdb65a67b8981279f43786eba
2020-12-14 17:14:07 +00:00
Kubernetes Publisher
3214b668d1 Merge pull request #95964 from dprotaso/automated-cherry-pick-of-#95939-upstream-release-1.18
Automated cherry pick of #95939: Address scenario where releasing a resource lock fails if a prior update fails or gets cancelled

Kubernetes-commit: f9925902620ae4c375b3f16e58794ae6a96d4cb6
2020-11-06 00:34:21 +00:00
Dave Protasowski
36ad760117 Re-add the event recorder in the release test
Prior having a mock recorder would cause panics since the lock
would be set to nil on update failures. Now the recorder will
use the cached lock

Kubernetes-commit: b7e3af7579fb9f79d3304ba78f73f80efde805ae
2020-10-27 22:45:33 -04:00
Dave Protasowski
89beb5b8df Don't clear the cached resourcelock when errors occurs on updates
This allows the lock to be release normally - even with a
potentially stale lock. This flow should only occur when we're
the lease holders.

Kubernetes-commit: 04170f66529f98367bc0d91f9419962948f95188
2020-10-27 22:41:39 -04:00
Dave Protasowski
2f053ea2ff Add failing test showing release is not working properly
Kubernetes-commit: 9988d73c9157cd5bb14dfaf0b8411d6c845df153
2020-10-27 22:29:22 -04:00
Kubernetes Publisher
c301108be7 Merge pull request #95927 from dprotaso/automated-cherry-pick-of-#80954-upstream-release-1.18
Automated cherry pick of #80954: Generate complete leader election record to resolve

Kubernetes-commit: 3d7136e9c51f31df37e2f5151c226941be67f4ba
2020-10-28 08:21:57 -07:00
Sam Fowler
1b8383fc15 Mask bearer token in logs when logLevel >= 9
Kubernetes-commit: f5bf592fc0d60fe2f135bcea30a61b7ee6d8f2b9
2020-10-06 11:10:38 +10:00
Zachary Seguin
5053776c1e Add lease release tests in leader election
Kubernetes-commit: 9030fc47a24f630770b6966bc2b08153c57b4cc8
2020-04-22 18:52:44 -04:00
Zachary Seguin
228a223128 Generate complete leader election record to resolve leader election issues with LeaseLocks
Kubernetes-commit: dfa528ed0bd94cd603366d7a40fc880d4ab1af63
2019-08-03 17:40:03 -04:00
Kubernetes Publisher
0716e817fc Merge pull request #95619 from roycaihw/automated-cherry-pick-of-#95427-upstream-release-1.18
Automated cherry pick of #95427: don't cache transports for incomparable configs

Kubernetes-commit: 0036586379f5b9958017200798c4bb7ae7d85519
2020-10-16 05:41:25 -07:00
Haowei Cai
7e1165d068 don't cache transports for incomparable configs
Co-authored-by: Jordan Liggitt <liggitt@google.com>

Kubernetes-commit: 20b77693a2d12f7f8e75c66b7a751fd0613ca465
2020-10-07 15:44:27 -07:00
Kubernetes Publisher
4c2c810a97 Merge pull request #94426 from gobomb/automated-cherry-pick-of-#93646-origin-release-1.18
Automated cherry pick of #93646: let panics propagate up when processLoop panic

Kubernetes-commit: 9150263c8c7a42f3ae28e6e27a471d2697dfe5ed
2020-09-04 05:30:45 +00:00
Kubernetes Publisher
f3104ae88e Merge pull request #94430 from janeczku/automated-cherry-pick-of-#94316-upstream-release-1.18
Automated cherry pick of #94316: Fixed reflector not recovering from "Too large resource

Kubernetes-commit: abb70d04424848f39a771d04521822c81481b99b
2020-09-04 05:30:44 +00:00
Kubernetes Publisher
82b9b1a146 Merge pull request #94148 from liggitt/json-patch-1.18
[1.18] Update to json-patch 4.9.0

Kubernetes-commit: 62eff8d42cff51ddab6ef3c5b3f7380a5c02815f
2020-09-03 13:32:01 +00:00
janeczku
94230b73bd Fixed reflector not recovering from "Too large resource version" errors with API servers 1.17.0-1.18.5
Kubernetes-commit: 744dd65b388e9fc17f53072db35c38748a83e777
2020-08-28 21:17:27 +02:00
Jordan Liggitt
0b89f44804 Update json-patch to v4.9.0 tagged release
Kubernetes-commit: f28d1007a56fa23c1874b973fa0024eddffc6fac
2020-08-20 17:14:41 -04:00
Kubernetes Publisher
c8494005d1 Merge pull request #93811 from liggitt/json-patch-4.8.0-1.18
[1.18] Update to json-patch 4.8.0

Kubernetes-commit: 7914bac28a993d4493e0c1e31323bb9330dbdcf6
2020-08-12 05:30:03 +00:00
Jordan Liggitt
146c268590 Update to json-patch 4.8.0
Kubernetes-commit: b924a9330380fd993ad9ca4ebb171c1a67ecb51f
2020-08-08 09:59:16 -04:00
gobomb
ce358577b6 let panics propagate up when processLoop panic
Kubernetes-commit: 2474a2ebc8f953104b25b48c771596d57e918470
2020-08-03 09:30:39 +00:00
Kubernetes Publisher
9e5bcc7f62 Merge pull request #92688 from wojtek-t/automated-cherry-pick-of-#92537-upstream-release-1.18
Automated cherry pick of #92537 upstream release 1.18

Kubernetes-commit: 91ae197b9d0b13b8280992595d0556d118bf11dd
2020-07-09 05:07:02 +00:00
wojtekt
3ac0631eb9 Fix bug in reflector not recovering from "Too large resource version" errors
Kubernetes-commit: 8012722d626d50fd18059ec5af6d195a7dc180c1
2020-06-26 09:45:29 +02:00
17 changed files with 720 additions and 86 deletions

20
Godeps/Godeps.json generated
View File

@@ -76,7 +76,7 @@
},
{
"ImportPath": "github.com/evanphx/json-patch",
"Rev": "v4.2.0"
"Rev": "v4.9.0"
},
{
"ImportPath": "github.com/fsnotify/fsnotify",
@@ -108,7 +108,7 @@
},
{
"ImportPath": "github.com/gogo/protobuf",
"Rev": "v1.3.1"
"Rev": "v1.3.2"
},
{
"ImportPath": "github.com/golang/glog",
@@ -188,7 +188,7 @@
},
{
"ImportPath": "github.com/kisielk/errcheck",
"Rev": "v1.2.0"
"Rev": "v1.5.0"
},
{
"ImportPath": "github.com/kisielk/gotool",
@@ -238,6 +238,10 @@
"ImportPath": "github.com/peterbourgon/diskv",
"Rev": "v2.0.1"
},
{
"ImportPath": "github.com/pkg/errors",
"Rev": "v0.8.1"
},
{
"ImportPath": "github.com/pmezard/go-difflib",
"Rev": "v1.0.0"
@@ -276,7 +280,7 @@
},
{
"ImportPath": "golang.org/x/net",
"Rev": "13f9640d40b9"
"Rev": "69a78807bb2b"
},
{
"ImportPath": "golang.org/x/oauth2",
@@ -288,7 +292,7 @@
},
{
"ImportPath": "golang.org/x/sys",
"Rev": "fde4db37ae7a"
"Rev": "5cba982894dd"
},
{
"ImportPath": "golang.org/x/text",
@@ -348,11 +352,11 @@
},
{
"ImportPath": "k8s.io/api",
"Rev": "1b2a9461f0df"
"Rev": "53468e23a787"
},
{
"ImportPath": "k8s.io/apimachinery",
"Rev": "d51655f4991f"
"Rev": "97edb8e5a7aa"
},
{
"ImportPath": "k8s.io/gengo",
@@ -372,7 +376,7 @@
},
{
"ImportPath": "sigs.k8s.io/structured-merge-diff/v3",
"Rev": "v3.0.0"
"Rev": "v3.0.1"
},
{
"ImportPath": "sigs.k8s.io/yaml",

19
go.mod
View File

@@ -9,8 +9,8 @@ require (
github.com/Azure/go-autorest/autorest v0.9.0
github.com/Azure/go-autorest/autorest/adal v0.5.0
github.com/davecgh/go-spew v1.1.1
github.com/evanphx/json-patch v4.2.0+incompatible
github.com/gogo/protobuf v1.3.1
github.com/evanphx/json-patch v4.9.0+incompatible
github.com/gogo/protobuf v1.3.2
github.com/golang/groupcache v0.0.0-20160516000752-02826c3e7903
github.com/golang/protobuf v1.3.2
github.com/google/btree v1.0.0 // indirect
@@ -23,21 +23,22 @@ require (
github.com/peterbourgon/diskv v2.0.1+incompatible
github.com/spf13/pflag v1.0.5
github.com/stretchr/testify v1.4.0
golang.org/x/crypto v0.0.0-20200220183623-bac4c82f6975
golang.org/x/net v0.0.0-20191004110552-13f9640d40b9
golang.org/x/crypto v0.0.0-20200622213623-75b288015ac9
golang.org/x/net v0.0.0-20201110031124-69a78807bb2b
golang.org/x/oauth2 v0.0.0-20190604053449-0f29369cfe45
golang.org/x/time v0.0.0-20190308202827-9d24e82272b4
google.golang.org/appengine v1.5.0 // indirect
k8s.io/api v0.0.0-20200429122142-1b2a9461f0df
k8s.io/apimachinery v0.0.0-20200429161533-d51655f4991f
k8s.io/api v0.0.0-20210518101910-53468e23a787
k8s.io/apimachinery v0.0.0-20210518100739-97edb8e5a7aa
k8s.io/klog v1.0.0
k8s.io/utils v0.0.0-20200324210504-a9aa75ae1b89
sigs.k8s.io/yaml v1.2.0
)
replace (
golang.org/x/sys => golang.org/x/sys v0.0.0-20190813064441-fde4db37ae7a // pinned to release-branch.go1.13
golang.org/x/crypto => golang.org/x/crypto v0.0.0-20200220183623-bac4c82f6975
golang.org/x/text => golang.org/x/text v0.3.2
golang.org/x/tools => golang.org/x/tools v0.0.0-20190821162956-65e3620a7ae7 // pinned to release-branch.go1.13
k8s.io/api => k8s.io/api v0.0.0-20200429122142-1b2a9461f0df
k8s.io/apimachinery => k8s.io/apimachinery v0.0.0-20200429161533-d51655f4991f
k8s.io/api => k8s.io/api v0.0.0-20210518101910-53468e23a787
k8s.io/apimachinery => k8s.io/apimachinery v0.0.0-20210518100739-97edb8e5a7aa
)

40
go.sum
View File

@@ -30,8 +30,8 @@ github.com/docker/spdystream v0.0.0-20160310174837-449fdfce4d96/go.mod h1:Qh8CwZ
github.com/elazarl/goproxy v0.0.0-20180725130230-947c36da3153 h1:yUdfgN0XgIJw7foRItutHYUIhlcKzcSf5vDpdhQAKTc=
github.com/elazarl/goproxy v0.0.0-20180725130230-947c36da3153/go.mod h1:/Zj4wYkgs4iZTTu3o/KG3Itv/qCCa8VVMlb3i9OVuzc=
github.com/emicklei/go-restful v0.0.0-20170410110728-ff4f55a20633/go.mod h1:otzb+WCGbkyDHkqmQmT5YD2WR4BBwUdeQoFo8l/7tVs=
github.com/evanphx/json-patch v4.2.0+incompatible h1:fUDGZCv/7iAN7u0puUVhvKCcsR6vRfwrJatElLBEf0I=
github.com/evanphx/json-patch v4.2.0+incompatible/go.mod h1:50XU6AFN0ol/bzJsmQLiYLvXMP4fmwYFNcr97nuDLSk=
github.com/evanphx/json-patch v4.9.0+incompatible h1:kLcOMZeuLAJvL2BPWLMIj5oaZQobrkAqrL+WFZwQses=
github.com/evanphx/json-patch v4.9.0+incompatible/go.mod h1:50XU6AFN0ol/bzJsmQLiYLvXMP4fmwYFNcr97nuDLSk=
github.com/fsnotify/fsnotify v1.4.7 h1:IXs+QLmnXW2CcXuY+8Mzv/fWEsPGWxqefPtCP5CnV9I=
github.com/fsnotify/fsnotify v1.4.7/go.mod h1:jwhsz4b93w/PPRr/qN1Yymfu8t87LnFCMoQvtojpjFo=
github.com/ghodss/yaml v0.0.0-20150909031657-73d445a93680/go.mod h1:4dBDuWmgqj2HViK6kFavaiC9ZROes6MMH2rRYeMEF04=
@@ -40,8 +40,8 @@ github.com/go-openapi/jsonpointer v0.0.0-20160704185906-46af16f9f7b1/go.mod h1:+
github.com/go-openapi/jsonreference v0.0.0-20160704190145-13c6e3589ad9/go.mod h1:W3Z9FmVs9qj+KR4zFKmDPGiLdk1D9Rlm7cyMvf57TTg=
github.com/go-openapi/spec v0.0.0-20160808142527-6aced65f8501/go.mod h1:J8+jY1nAiCcj+friV/PDoE1/3eeccG9LYBs0tYvLOWc=
github.com/go-openapi/swag v0.0.0-20160704191624-1d0bd113de87/go.mod h1:DXUve3Dpr1UfpPtxFw+EFuQ41HhCWZfha5jSVRG7C7I=
github.com/gogo/protobuf v1.3.1 h1:DqDEcV5aeaTmdFBePNpYsp3FlcVH/2ISVVM9Qf8PSls=
github.com/gogo/protobuf v1.3.1/go.mod h1:SlYgWuQ5SjCEi6WLHjHCa1yvBfUnHcTbrrZtXPKa29o=
github.com/gogo/protobuf v1.3.2 h1:Ov1cvc58UF3b5XjBnZv7+opcTcQFZebYjWzi34vdm4Q=
github.com/gogo/protobuf v1.3.2/go.mod h1:P1XiOD3dCwIKUDQYPy72D8LYyHL2YPYrpS2s69NZV8Q=
github.com/golang/glog v0.0.0-20160126235308-23def4e6c14b/go.mod h1:SBH7ygxi8pfUlaOkMMuAQtPIUF8ecWP5IEl/CR7VP2Q=
github.com/golang/groupcache v0.0.0-20160516000752-02826c3e7903 h1:LbsanbbD6LieFkXbj9YNNBupiGHJgFeLpO0j0Fza1h8=
github.com/golang/groupcache v0.0.0-20160516000752-02826c3e7903/go.mod h1:cIg4eruTrX1D+g88fzRXU5OdNfaM+9IcxsU14FzY7Hc=
@@ -83,7 +83,7 @@ github.com/json-iterator/go v1.1.6/go.mod h1:+SdeFBvtyEkXs7REEP0seUULqWtbJapLOCV
github.com/json-iterator/go v1.1.8 h1:QiWkFLKq0T7mpzwOTu6BzNDbfTE8OLrYhVKYMLF46Ok=
github.com/json-iterator/go v1.1.8/go.mod h1:KdQUCv79m/52Kvf8AW2vK1V8akMuk1QjK/uOdHXbAo4=
github.com/jstemmer/go-junit-report v0.0.0-20190106144839-af01ea7f8024/go.mod h1:6v2b51hI/fHJwM22ozAgKL4VKDeJcHhJFhtBdhmNjmU=
github.com/kisielk/errcheck v1.2.0/go.mod h1:/BMXB+zMLi60iA8Vv6Ksmxu/1UDYcXs4uQLJ+jE2L00=
github.com/kisielk/errcheck v1.5.0/go.mod h1:pFxgyoBC7bSaBwPgfKdkLd5X25qrDl4LWUI2bnpBCr8=
github.com/kisielk/gotool v1.0.0/go.mod h1:XhKaO+MFFWcvkIS/tQcRk01m1F5IRFswLeQ+oQHNcck=
github.com/kr/pretty v0.1.0 h1:L/CwN0zerZDmRFUapSPitk6f+Q3+0za1rQkzVuMiMFI=
github.com/kr/pretty v0.1.0/go.mod h1:dAy3ld7l9f0ibDNOQOHHMYYIIbhfbHSm3C4ZsoJORNo=
@@ -108,6 +108,8 @@ github.com/onsi/gomega v1.7.0 h1:XPnZz8VVBHjVsy1vzJmRwIcSwiUO+JFfrv/xGiigmME=
github.com/onsi/gomega v1.7.0/go.mod h1:ex+gbHU/CVuBBDIJjb2X0qEXbFg53c61hWP/1CpauHY=
github.com/peterbourgon/diskv v2.0.1+incompatible h1:UBdAOUP5p4RWqPBg048CAvpKN+vxiaj6gdUUzhl4XmI=
github.com/peterbourgon/diskv v2.0.1+incompatible/go.mod h1:uqqh8zWWbv1HBMNONnaR/tNboyR3/BZd58JJSHlUSCU=
github.com/pkg/errors v0.8.1 h1:iURUrRGxPUNPdy5/HRSm+Yj6okJ6UtLINN0Q9M4+h3I=
github.com/pkg/errors v0.8.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0=
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
github.com/spf13/afero v1.2.2/go.mod h1:9ZxEEn6pIJ8Rxe320qSDBk6AsU0r9pR7Q4OcevTdifk=
@@ -119,8 +121,6 @@ github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UV
github.com/stretchr/testify v1.4.0 h1:2E4SXV/wtOkTonXsotYi4li6zVWxYlZuYNCXe9XRJyk=
github.com/stretchr/testify v1.4.0/go.mod h1:j7eGeouHqKxXV5pUuKE4zz7dFj8WfuZ+81PSLYec5m4=
go.opencensus.io v0.21.0/go.mod h1:mSImk1erAIZhrmZN+AvHh14ztQfjbGwt4TtuofqLduU=
golang.org/x/crypto v0.0.0-20190211182817-74369b46fc67/go.mod h1:6SG95UA2DQfeDnfUPMdvaQW0Q7yPrPDi9nlGo2tz2b4=
golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w=
golang.org/x/crypto v0.0.0-20200220183623-bac4c82f6975 h1:/Tl7pH94bvbAAHBdZJT947M/+gp0+CqQXDtMRC0fseo=
golang.org/x/crypto v0.0.0-20200220183623-bac4c82f6975/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto=
golang.org/x/exp v0.0.0-20190121172915-509febef88a4/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA=
@@ -136,8 +136,8 @@ golang.org/x/net v0.0.0-20190213061140-3a22650c66bd/go.mod h1:mL1N/T3taQHkDXs73r
golang.org/x/net v0.0.0-20190311183353-d8887717615a/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg=
golang.org/x/net v0.0.0-20190404232315-eb5bcb51f2a3/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg=
golang.org/x/net v0.0.0-20190620200207-3b0461eec859/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s=
golang.org/x/net v0.0.0-20191004110552-13f9640d40b9 h1:rjwSpXsdiK0dV8/Naq3kAw9ymfAeJIyd0upUIElB+lI=
golang.org/x/net v0.0.0-20191004110552-13f9640d40b9/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s=
golang.org/x/net v0.0.0-20201110031124-69a78807bb2b h1:uwuIcX0g4Yl1NC5XAz37xsr2lTtcqevgzYNVt49waME=
golang.org/x/net v0.0.0-20201110031124-69a78807bb2b/go.mod h1:sp8m0HH+o8qH0wwXwYZr8TS3Oi6o0r6Gce1SSxlDquU=
golang.org/x/oauth2 v0.0.0-20180821212333-d2e6202438be/go.mod h1:N/0e6XlmueqKjAGxoOufVs8QHGRruUQn6yWY3a++T0U=
golang.org/x/oauth2 v0.0.0-20190226205417-e64efc72b421/go.mod h1:gOpvHmFTYa4IltrdGE7lF6nIHvwfUNPOp7c8zoXwtLw=
golang.org/x/oauth2 v0.0.0-20190604053449-0f29369cfe45 h1:SVwTIAaPC2U/AvvLNZ2a7OVsmBpC8L5BlwK1whH3hm0=
@@ -146,11 +146,15 @@ golang.org/x/sync v0.0.0-20180314180146-1d60e4601c6f/go.mod h1:RxMgew5VJxzue5/jJ
golang.org/x/sync v0.0.0-20181221193216-37e7f081c4d4/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.0.0-20190227155943-e225da77a7e6/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sys v0.0.0-20190813064441-fde4db37ae7a h1:aYOabOQFp6Vj6W1F80affTUvO9UxmJRx8K0gsfABByQ=
golang.org/x/sys v0.0.0-20190813064441-fde4db37ae7a/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/text v0.0.0-20160726164857-2910a502d2bf/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
golang.org/x/text v0.3.1-0.20180807135948-17ff2d5776d2/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
golang.org/x/sys v0.0.0-20170830134202-bb24a47a89ea/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20180830151530-49385e6e1522/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20180909124046-d0be0721c37e/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20190209173611-3b5209105503/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20190412213103-97732733099d/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20200930185726-fdedc70b468f/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20201112073958-5cba982894dd h1:5CtCZbICpIOFdgO940moixOPjc0178IU44m4EjOO5IY=
golang.org/x/sys v0.0.0-20201112073958-5cba982894dd/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/text v0.3.2 h1:tW2bmiBqwgJj/UpqtC8EpXEZVYOwU0yG4iWbprSVAcs=
golang.org/x/text v0.3.2/go.mod h1:bEr9sfX3Q8Zfm5fL9x+3itogRgK3+ptLWKqgva+5dAk=
golang.org/x/time v0.0.0-20181108054448-85acf8d2951c/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ=
@@ -182,8 +186,8 @@ gopkg.in/yaml.v2 v2.2.8 h1:obN1ZagJSUGI0Ek/LBmuj4SNLPfIny3KsKFopxRdj10=
gopkg.in/yaml.v2 v2.2.8/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
honnef.co/go/tools v0.0.0-20190102054323-c2f93a96b099/go.mod h1:rf3lG4BRIbNafJWhAfAdb/ePZxsR/4RtNHQocxwk9r4=
honnef.co/go/tools v0.0.0-20190106161140-3f1c8253044a/go.mod h1:rf3lG4BRIbNafJWhAfAdb/ePZxsR/4RtNHQocxwk9r4=
k8s.io/api v0.0.0-20200429122142-1b2a9461f0df/go.mod h1:3IQXHrU8AKFBlRNkA7Gg5ohimIUFCMZKj1udLSrDY6g=
k8s.io/apimachinery v0.0.0-20200429161533-d51655f4991f/go.mod h1:OaXp26zu/5J7p0f92ASynJa1pZo06YlV9fG7BoWbCko=
k8s.io/api v0.0.0-20210518101910-53468e23a787/go.mod h1:LlUnNse6BQx63yz6yMZOGvzQcVws1/bcjOsbAi7uZPc=
k8s.io/apimachinery v0.0.0-20210518100739-97edb8e5a7aa/go.mod h1:70HIRzSveORLKbatTlXzI2B2UUhbWzbq8Vqyf+HbdUQ=
k8s.io/gengo v0.0.0-20190128074634-0689ccc1d7d6/go.mod h1:ezvh/TsK7cY6rbqRK0oQQ8IAqLxYwwyPxAX1Pzy0ii0=
k8s.io/klog v0.0.0-20181102134211-b9b56d5dfc92/go.mod h1:Gq+BEi5rUBO/HRz0bTSXDUcqjScdoY3a9IHpCEIOOfk=
k8s.io/klog v0.3.0/go.mod h1:Gq+BEi5rUBO/HRz0bTSXDUcqjScdoY3a9IHpCEIOOfk=
@@ -194,8 +198,8 @@ k8s.io/kube-openapi v0.0.0-20200410145947-61e04a5be9a6/go.mod h1:GRQhZsXIAJ1xR0C
k8s.io/utils v0.0.0-20200324210504-a9aa75ae1b89 h1:d4vVOjXm687F1iLSP2q3lyPPuyvTUt3aVoBpi2DqRsU=
k8s.io/utils v0.0.0-20200324210504-a9aa75ae1b89/go.mod h1:sZAwmy6armz5eXlNoLmJcl4F1QuKu7sr+mFQ0byX7Ew=
sigs.k8s.io/structured-merge-diff/v3 v3.0.0-20200116222232-67a7b8c61874/go.mod h1:PlARxl6Hbt/+BC80dRLi1qAmnMqwqDg62YvvVkZjemw=
sigs.k8s.io/structured-merge-diff/v3 v3.0.0 h1:dOmIZBMfhcHS09XZkMyUgkq5trg3/jRyJYFZUiaOp8E=
sigs.k8s.io/structured-merge-diff/v3 v3.0.0/go.mod h1:PlARxl6Hbt/+BC80dRLi1qAmnMqwqDg62YvvVkZjemw=
sigs.k8s.io/structured-merge-diff/v3 v3.0.1 h1:ISORLGKzslMY5RWkCSGNy5uDb3OHyEkGEhuSATvSp3A=
sigs.k8s.io/structured-merge-diff/v3 v3.0.1/go.mod h1:PlARxl6Hbt/+BC80dRLi1qAmnMqwqDg62YvvVkZjemw=
sigs.k8s.io/yaml v1.1.0/go.mod h1:UJmg0vDUVViEyp3mgSv9WPwZCDxu4rQW1olrI1uml+o=
sigs.k8s.io/yaml v1.2.0 h1:kr/MCeFWJWTwyaHoR9c8EjH9OumOmoF9YGiZd7lFm/Q=
sigs.k8s.io/yaml v1.2.0/go.mod h1:yfXDCHCao9+ENCvLSE62v9VSji2MKu5jeNfTrofGhJc=

165
rest/connection_test.go Normal file
View File

@@ -0,0 +1,165 @@
/*
Copyright 2019 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package rest
import (
"context"
"crypto/tls"
"fmt"
"io"
"net"
"net/http"
"net/http/httptest"
"net/url"
"os"
"strconv"
"sync/atomic"
"testing"
"time"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apimachinery/pkg/runtime/serializer"
utilnet "k8s.io/apimachinery/pkg/util/net"
)
type tcpLB struct {
t *testing.T
ln net.Listener
serverURL string
dials int32
}
func (lb *tcpLB) handleConnection(in net.Conn, stopCh chan struct{}) {
out, err := net.Dial("tcp", lb.serverURL)
if err != nil {
lb.t.Log(err)
return
}
go io.Copy(out, in)
go io.Copy(in, out)
<-stopCh
if err := out.Close(); err != nil {
lb.t.Fatalf("failed to close connection: %v", err)
}
}
func (lb *tcpLB) serve(stopCh chan struct{}) {
conn, err := lb.ln.Accept()
if err != nil {
lb.t.Fatalf("failed to accept: %v", err)
}
atomic.AddInt32(&lb.dials, 1)
go lb.handleConnection(conn, stopCh)
}
func newLB(t *testing.T, serverURL string) *tcpLB {
ln, err := net.Listen("tcp", "127.0.0.1:0")
if err != nil {
t.Fatalf("failed to bind: %v", err)
}
lb := tcpLB{
serverURL: serverURL,
ln: ln,
t: t,
}
return &lb
}
func setEnv(key, value string) func() {
originalValue := os.Getenv(key)
os.Setenv(key, value)
return func() {
os.Setenv(key, originalValue)
}
}
const (
readIdleTimeout int = 1
pingTimeout int = 1
)
func TestReconnectBrokenTCP(t *testing.T) {
defer setEnv("HTTP2_READ_IDLE_TIMEOUT_SECONDS", strconv.Itoa(readIdleTimeout))()
defer setEnv("HTTP2_PING_TIMEOUT_SECONDS", strconv.Itoa(pingTimeout))()
defer setEnv("DISABLE_HTTP2", "")()
ts := httptest.NewUnstartedServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
fmt.Fprintf(w, "Hello, %s", r.Proto)
}))
ts.TLS = &tls.Config{NextProtos: []string{"h2", "http/1.1"}}
ts.StartTLS()
defer ts.Close()
u, err := url.Parse(ts.URL)
if err != nil {
t.Fatalf("failed to parse URL from %q: %v", ts.URL, err)
}
lb := newLB(t, u.Host)
defer lb.ln.Close()
stopCh := make(chan struct{})
go lb.serve(stopCh)
transport, ok := ts.Client().Transport.(*http.Transport)
if !ok {
t.Fatalf("failed to assert *http.Transport")
}
config := &Config{
Host: "https://" + lb.ln.Addr().String(),
Transport: utilnet.SetTransportDefaults(transport),
Timeout: 1 * time.Second,
// These fields are required to create a REST client.
ContentConfig: ContentConfig{
GroupVersion: &schema.GroupVersion{},
NegotiatedSerializer: &serializer.CodecFactory{},
},
}
client, err := RESTClientFor(config)
if err != nil {
t.Fatalf("failed to create REST client: %v", err)
}
data, err := client.Get().AbsPath("/").DoRaw(context.TODO())
if err != nil {
t.Fatalf("unexpected err: %s: %v", data, err)
}
if string(data) != "Hello, HTTP/2.0" {
t.Fatalf("unexpected response: %s", data)
}
// Deliberately let the LB stop proxying traffic for the current
// connection. This mimics a broken TCP connection that's not properly
// closed.
close(stopCh)
stopCh = make(chan struct{})
go lb.serve(stopCh)
// Sleep enough time for the HTTP/2 health check to detect and close
// the broken TCP connection.
time.Sleep(time.Duration(1+readIdleTimeout+pingTimeout) * time.Second)
// If the HTTP/2 health check were disabled, the broken connection
// would still be in the connection pool, the following request would
// then reuse the broken connection instead of creating a new one, and
// thus would fail.
data, err = client.Get().AbsPath("/").DoRaw(context.TODO())
if err != nil {
t.Fatalf("unexpected err: %v", err)
}
if string(data) != "Hello, HTTP/2.0" {
t.Fatalf("unexpected response: %s", data)
}
dials := atomic.LoadInt32(&lb.dials)
if dials != 2 {
t.Fatalf("expected %d dials, got %d", 2, dials)
}
}

View File

@@ -138,11 +138,11 @@ func (c *controller) Run(stopCh <-chan struct{}) {
c.reflectorMutex.Unlock()
var wg wait.Group
defer wg.Wait()
wg.StartWithChannel(stopCh, r.Run)
wait.Until(c.processLoop, time.Second, stopCh)
wg.Wait()
}
// Returns true once this controller has completed an initial resource listing

View File

@@ -402,3 +402,49 @@ func TestUpdate(t *testing.T) {
testDoneWG.Wait()
close(stop)
}
func TestPanicPropagated(t *testing.T) {
// source simulates an apiserver object endpoint.
source := fcache.NewFakeControllerSource()
// Make a controller that just panic if the AddFunc is called.
_, controller := NewInformer(
source,
&v1.Pod{},
time.Millisecond*100,
ResourceEventHandlerFuncs{
AddFunc: func(obj interface{}) {
// Create a panic.
panic("Just panic.")
},
},
)
// Run the controller and run it until we close stop.
stop := make(chan struct{})
defer close(stop)
propagated := make(chan interface{})
go func() {
defer func() {
if r := recover(); r != nil {
propagated <- r
}
}()
controller.Run(stop)
}()
// Let's add a object to the source. It will trigger a panic.
source.Add(&v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "test"}})
// Check if the panic propagated up.
select {
case p := <-propagated:
if p == "Just panic." {
t.Logf("Test Passed")
} else {
t.Errorf("unrecognized panic in controller run: %v", p)
}
case <-time.After(wait.ForeverTestTimeout):
t.Errorf("timeout: the panic failed to propagate from the controller run method!")
}
}

View File

@@ -82,9 +82,9 @@ type Reflector struct {
// observed when doing a sync with the underlying store
// it is thread safe, but not synchronized with the underlying store
lastSyncResourceVersion string
// isLastSyncResourceVersionGone is true if the previous list or watch request with lastSyncResourceVersion
// failed with an HTTP 410 (Gone) status code.
isLastSyncResourceVersionGone bool
// isLastSyncResourceVersionUnavailable is true if the previous list or watch request with
// lastSyncResourceVersion failed with an "expired" or "too large resource version" error.
isLastSyncResourceVersionUnavailable bool
// lastSyncResourceVersionMutex guards read/write access to lastSyncResourceVersion
lastSyncResourceVersionMutex sync.RWMutex
// WatchListPageSize is the requested chunk size of initial and resync watch lists.
@@ -256,13 +256,14 @@ func (r *Reflector) ListAndWatch(stopCh <-chan struct{}) error {
}
list, paginatedResult, err = pager.List(context.Background(), options)
if isExpiredError(err) {
r.setIsLastSyncResourceVersionExpired(true)
// Retry immediately if the resource version used to list is expired.
if isExpiredError(err) || isTooLargeResourceVersionError(err) {
r.setIsLastSyncResourceVersionUnavailable(true)
// Retry immediately if the resource version used to list is unavailable.
// The pager already falls back to full list if paginated list calls fail due to an "Expired" error on
// continuation pages, but the pager might not be enabled, or the full list might fail because the
// resource version it is listing at is expired, so we need to fallback to resourceVersion="" in all
// to recover and ensure the reflector makes forward progress.
// continuation pages, but the pager might not be enabled, the full list might fail because the
// resource version it is listing at is expired or the cache may not yet be synced to the provided
// resource version. So we need to fallback to resourceVersion="" in all to recover and ensure
// the reflector makes forward progress.
list, paginatedResult, err = pager.List(context.Background(), metav1.ListOptions{ResourceVersion: r.relistResourceVersion()})
}
close(listCh)
@@ -292,7 +293,7 @@ func (r *Reflector) ListAndWatch(stopCh <-chan struct{}) error {
r.paginatedResult = true
}
r.setIsLastSyncResourceVersionExpired(false) // list was successful
r.setIsLastSyncResourceVersionUnavailable(false) // list was successful
initTrace.Step("Objects listed")
listMetaInterface, err := meta.ListAccessor(list)
if err != nil {
@@ -396,7 +397,7 @@ func (r *Reflector) ListAndWatch(stopCh <-chan struct{}) error {
if err != errorStopRequested {
switch {
case isExpiredError(err):
// Don't set LastSyncResourceVersionExpired - LIST call with ResourceVersion=RV already
// Don't set LastSyncResourceVersionUnavailable - LIST call with ResourceVersion=RV already
// has a semantic that it returns data at least as fresh as provided RV.
// So first try to LIST with setting RV to resource version of last observed object.
klog.V(4).Infof("%s: watch of %v closed with: %v", r.name, r.expectedTypeName, err)
@@ -519,9 +520,9 @@ func (r *Reflector) relistResourceVersion() string {
r.lastSyncResourceVersionMutex.RLock()
defer r.lastSyncResourceVersionMutex.RUnlock()
if r.isLastSyncResourceVersionGone {
if r.isLastSyncResourceVersionUnavailable {
// Since this reflector makes paginated list requests, and all paginated list requests skip the watch cache
// if the lastSyncResourceVersion is expired, we set ResourceVersion="" and list again to re-establish reflector
// if the lastSyncResourceVersion is unavailable, we set ResourceVersion="" and list again to re-establish reflector
// to the latest available ResourceVersion, using a consistent read from etcd.
return ""
}
@@ -533,12 +534,12 @@ func (r *Reflector) relistResourceVersion() string {
return r.lastSyncResourceVersion
}
// setIsLastSyncResourceVersionExpired sets if the last list or watch request with lastSyncResourceVersion returned a
// expired error: HTTP 410 (Gone) Status Code.
func (r *Reflector) setIsLastSyncResourceVersionExpired(isExpired bool) {
// setIsLastSyncResourceVersionUnavailable sets if the last list or watch request with lastSyncResourceVersion returned
// "expired" or "too large resource version" error.
func (r *Reflector) setIsLastSyncResourceVersionUnavailable(isUnavailable bool) {
r.lastSyncResourceVersionMutex.Lock()
defer r.lastSyncResourceVersionMutex.Unlock()
r.isLastSyncResourceVersionGone = isExpired
r.isLastSyncResourceVersionUnavailable = isUnavailable
}
func isExpiredError(err error) bool {
@@ -548,3 +549,28 @@ func isExpiredError(err error) bool {
// check when we fully drop support for Kubernetes 1.17 servers from reflectors.
return apierrors.IsResourceExpired(err) || apierrors.IsGone(err)
}
func isTooLargeResourceVersionError(err error) bool {
if apierrors.HasStatusCause(err, metav1.CauseTypeResourceVersionTooLarge) {
return true
}
// In Kubernetes 1.17.0-1.18.5, the api server doesn't set the error status cause to
// metav1.CauseTypeResourceVersionTooLarge to indicate that the requested minimum resource
// version is larger than the largest currently available resource version. To ensure backward
// compatibility with these server versions we also need to detect the error based on the content
// of the error message field.
if !apierrors.IsTimeout(err) {
return false
}
apierr, ok := err.(apierrors.APIStatus)
if !ok || apierr == nil || apierr.Status().Details == nil {
return false
}
for _, cause := range apierr.Status().Details.Causes {
// Matches the message returned by api server 1.17.0-1.18.5 for this error condition
if cause.Message == "Too large resource version" {
return true
}
}
return false
}

View File

@@ -714,6 +714,70 @@ func TestReflectorFullListIfExpired(t *testing.T) {
}
}
func TestReflectorFullListIfTooLarge(t *testing.T) {
stopCh := make(chan struct{})
s := NewStore(MetaNamespaceKeyFunc)
listCallRVs := []string{}
lw := &testLW{
WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) {
// Stop once the reflector begins watching since we're only interested in the list.
close(stopCh)
fw := watch.NewFake()
return fw, nil
},
ListFunc: func(options metav1.ListOptions) (runtime.Object, error) {
listCallRVs = append(listCallRVs, options.ResourceVersion)
switch options.ResourceVersion {
// initial list
case "0":
return &v1.PodList{ListMeta: metav1.ListMeta{ResourceVersion: "20"}}, nil
// relist after the initial list
case "20":
err := apierrors.NewTimeoutError("too large resource version", 1)
err.ErrStatus.Details.Causes = []metav1.StatusCause{{Type: metav1.CauseTypeResourceVersionTooLarge}}
return nil, err
// relist after the initial list (covers the error format used in api server 1.17.0-1.18.5)
case "30":
err := apierrors.NewTimeoutError("too large resource version", 1)
err.ErrStatus.Details.Causes = []metav1.StatusCause{{Message: "Too large resource version"}}
return nil, err
// relist from etcd after "too large" error
case "":
return &v1.PodList{ListMeta: metav1.ListMeta{ResourceVersion: "30"}}, nil
default:
return nil, fmt.Errorf("unexpected List call: %s", options.ResourceVersion)
}
},
}
r := NewReflector(lw, &v1.Pod{}, s, 0)
// Initial list should use RV=0
if err := r.ListAndWatch(stopCh); err != nil {
t.Fatal(err)
}
// Relist from the future version.
// This may happen, as watchcache is initialized from "current global etcd resource version"
// when kube-apiserver is starting and if no objects are changing after that each kube-apiserver
// may be synced to a different version and they will never converge.
// TODO: We should use etcd progress-notify feature to avoid this behavior but until this is
// done we simply try to relist from now to avoid continuous errors on relists.
for i := 1; i <= 2; i++ {
// relist twice to cover the two variants of TooLargeResourceVersion api errors
stopCh = make(chan struct{})
if err := r.ListAndWatch(stopCh); err != nil {
t.Fatal(err)
}
}
expectedRVs := []string{"0", "20", "", "30", ""}
if !reflect.DeepEqual(listCallRVs, expectedRVs) {
t.Errorf("Expected series of list calls with resource version of %#v but got: %#v", expectedRVs, listCallRVs)
}
}
func TestReflectorSetExpectedType(t *testing.T) {
obj := &unstructured.Unstructured{}
gvk := schema.GroupVersionKind{

View File

@@ -289,8 +289,12 @@ func (le *LeaderElector) release() bool {
if !le.IsLeader() {
return true
}
now := metav1.Now()
leaderElectionRecord := rl.LeaderElectionRecord{
LeaderTransitions: le.observedRecord.LeaderTransitions,
LeaderTransitions: le.observedRecord.LeaderTransitions,
LeaseDurationSeconds: 1,
RenewTime: now,
AcquireTime: now,
}
if err := le.config.Lock.Update(context.TODO(), leaderElectionRecord); err != nil {
klog.Errorf("Failed to release lock: %v", err)

View File

@@ -917,3 +917,284 @@ func TestTryAcquireOrRenewEndpointsLeases(t *testing.T) {
func TestTryAcquireOrRenewConfigMapsLeases(t *testing.T) {
testTryAcquireOrRenewMultiLock(t, "configmapsleases")
}
func testReleaseLease(t *testing.T, objectType string) {
tests := []struct {
name string
observedRecord rl.LeaderElectionRecord
observedTime time.Time
reactors []Reactor
expectSuccess bool
transitionLeader bool
outHolder string
}{
{
name: "release acquired lock from no object",
reactors: []Reactor{
{
verb: "get",
objectType: objectType,
reaction: func(action fakeclient.Action) (handled bool, ret runtime.Object, err error) {
return true, nil, errors.NewNotFound(action.(fakeclient.GetAction).GetResource().GroupResource(), action.(fakeclient.GetAction).GetName())
},
},
{
verb: "create",
objectType: objectType,
reaction: func(action fakeclient.Action) (handled bool, ret runtime.Object, err error) {
return true, action.(fakeclient.CreateAction).GetObject(), nil
},
},
{
verb: "update",
objectType: objectType,
reaction: func(action fakeclient.Action) (handled bool, ret runtime.Object, err error) {
return true, action.(fakeclient.UpdateAction).GetObject(), nil
},
},
},
expectSuccess: true,
outHolder: "",
},
}
for i := range tests {
test := &tests[i]
t.Run(test.name, func(t *testing.T) {
// OnNewLeader is called async so we have to wait for it.
var wg sync.WaitGroup
wg.Add(1)
var reportedLeader string
var lock rl.Interface
objectMeta := metav1.ObjectMeta{Namespace: "foo", Name: "bar"}
resourceLockConfig := rl.ResourceLockConfig{
Identity: "baz",
EventRecorder: &record.FakeRecorder{},
}
c := &fake.Clientset{}
for _, reactor := range test.reactors {
c.AddReactor(reactor.verb, objectType, reactor.reaction)
}
c.AddReactor("*", "*", func(action fakeclient.Action) (bool, runtime.Object, error) {
t.Errorf("unreachable action. testclient called too many times: %+v", action)
return true, nil, fmt.Errorf("unreachable action")
})
switch objectType {
case "endpoints":
lock = &rl.EndpointsLock{
EndpointsMeta: objectMeta,
LockConfig: resourceLockConfig,
Client: c.CoreV1(),
}
case "configmaps":
lock = &rl.ConfigMapLock{
ConfigMapMeta: objectMeta,
LockConfig: resourceLockConfig,
Client: c.CoreV1(),
}
case "leases":
lock = &rl.LeaseLock{
LeaseMeta: objectMeta,
LockConfig: resourceLockConfig,
Client: c.CoordinationV1(),
}
}
lec := LeaderElectionConfig{
Lock: lock,
LeaseDuration: 10 * time.Second,
Callbacks: LeaderCallbacks{
OnNewLeader: func(l string) {
defer wg.Done()
reportedLeader = l
},
},
}
observedRawRecord := GetRawRecordOrDie(t, objectType, test.observedRecord)
le := &LeaderElector{
config: lec,
observedRecord: test.observedRecord,
observedRawRecord: observedRawRecord,
observedTime: test.observedTime,
clock: clock.RealClock{},
}
if !le.tryAcquireOrRenew(context.Background()) {
t.Errorf("unexpected result of tryAcquireOrRenew: [succeeded=%v]", true)
}
le.maybeReportTransition()
// Wait for a response to the leader transition, and add 1 so that we can track the final transition.
wg.Wait()
wg.Add(1)
if test.expectSuccess != le.release() {
t.Errorf("unexpected result of release: [succeeded=%v]", !test.expectSuccess)
}
le.observedRecord.AcquireTime = metav1.Time{}
le.observedRecord.RenewTime = metav1.Time{}
if le.observedRecord.HolderIdentity != test.outHolder {
t.Errorf("expected holder:\n\t%+v\ngot:\n\t%+v", test.outHolder, le.observedRecord.HolderIdentity)
}
if len(test.reactors) != len(c.Actions()) {
t.Errorf("wrong number of api interactions")
}
if test.transitionLeader && le.observedRecord.LeaderTransitions != 1 {
t.Errorf("leader should have transitioned but did not")
}
if !test.transitionLeader && le.observedRecord.LeaderTransitions != 0 {
t.Errorf("leader should not have transitioned but did")
}
le.maybeReportTransition()
wg.Wait()
if reportedLeader != test.outHolder {
t.Errorf("reported leader was not the new leader. expected %q, got %q", test.outHolder, reportedLeader)
}
})
}
}
// Will test leader election using endpoints as the resource
func TestReleaseLeaseEndpoints(t *testing.T) {
testReleaseLease(t, "endpoints")
}
// Will test leader election using endpoints as the resource
func TestReleaseLeaseConfigMaps(t *testing.T) {
testReleaseLease(t, "configmaps")
}
// Will test leader election using endpoints as the resource
func TestReleaseLeaseLeases(t *testing.T) {
testReleaseLease(t, "leases")
}
func TestReleaseOnCancellation_Endpoints(t *testing.T) {
testReleaseOnCancellation(t, "endpoints")
}
func TestReleaseOnCancellation_ConfigMaps(t *testing.T) {
testReleaseOnCancellation(t, "configmaps")
}
func TestReleaseOnCancellation_Leases(t *testing.T) {
testReleaseOnCancellation(t, "leases")
}
func testReleaseOnCancellation(t *testing.T, objectType string) {
var (
onNewLeader = make(chan struct{})
onRenewCalled = make(chan struct{})
onRenewResume = make(chan struct{})
onRelease = make(chan struct{})
lockObj runtime.Object
updates int
)
resourceLockConfig := rl.ResourceLockConfig{
Identity: "baz",
EventRecorder: &record.FakeRecorder{},
}
c := &fake.Clientset{}
c.AddReactor("get", objectType, func(action fakeclient.Action) (handled bool, ret runtime.Object, err error) {
if lockObj != nil {
return true, lockObj, nil
}
return true, nil, errors.NewNotFound(action.(fakeclient.GetAction).GetResource().GroupResource(), action.(fakeclient.GetAction).GetName())
})
// create lock
c.AddReactor("create", objectType, func(action fakeclient.Action) (handled bool, ret runtime.Object, err error) {
lockObj = action.(fakeclient.CreateAction).GetObject()
return true, lockObj, nil
})
c.AddReactor("update", objectType, func(action fakeclient.Action) (handled bool, ret runtime.Object, err error) {
updates++
// Second update (first renew) should return our canceled error
// FakeClient doesn't do anything with the context so we're doing this ourselves
if updates == 2 {
close(onRenewCalled)
<-onRenewResume
return true, nil, context.Canceled
} else if updates == 3 {
close(onRelease)
}
lockObj = action.(fakeclient.UpdateAction).GetObject()
return true, lockObj, nil
})
c.AddReactor("*", "*", func(action fakeclient.Action) (bool, runtime.Object, error) {
t.Errorf("unreachable action. testclient called too many times: %+v", action)
return true, nil, fmt.Errorf("unreachable action")
})
lock, err := rl.New(objectType, "foo", "bar", c.CoreV1(), c.CoordinationV1(), resourceLockConfig)
if err != nil {
t.Fatal("resourcelock.New() = ", err)
}
lec := LeaderElectionConfig{
Lock: lock,
LeaseDuration: 15 * time.Second,
RenewDeadline: 2 * time.Second,
RetryPeriod: 1 * time.Second,
// This is what we're testing
ReleaseOnCancel: true,
Callbacks: LeaderCallbacks{
OnNewLeader: func(identity string) {},
OnStoppedLeading: func() {},
OnStartedLeading: func(context.Context) {
close(onNewLeader)
},
},
}
elector, err := NewLeaderElector(lec)
if err != nil {
t.Fatal("Failed to create leader elector: ", err)
}
ctx, cancel := context.WithCancel(context.Background())
go elector.Run(ctx)
// Wait for us to become the leader
select {
case <-onNewLeader:
case <-time.After(10 * time.Second):
t.Fatal("failed to become the leader")
}
// Wait for renew (update) to be invoked
select {
case <-onRenewCalled:
case <-time.After(10 * time.Second):
t.Fatal("the elector failed to renew the lock")
}
// Cancel the context - stopping the elector while
// it's running
cancel()
// Resume the update call to return the cancellation
// which should trigger the release flow
close(onRenewResume)
select {
case <-onRelease:
case <-time.After(10 * time.Second):
t.Fatal("the lock was not released")
}
}

View File

@@ -92,8 +92,12 @@ func (cml *ConfigMapLock) Update(ctx context.Context, ler LeaderElectionRecord)
cml.cm.Annotations = make(map[string]string)
}
cml.cm.Annotations[LeaderElectionRecordAnnotationKey] = string(recordBytes)
cml.cm, err = cml.Client.ConfigMaps(cml.ConfigMapMeta.Namespace).Update(ctx, cml.cm, metav1.UpdateOptions{})
return err
cm, err := cml.Client.ConfigMaps(cml.ConfigMapMeta.Namespace).Update(ctx, cml.cm, metav1.UpdateOptions{})
if err != nil {
return err
}
cml.cm = cm
return nil
}
// RecordEvent in leader election while adding meta-data

View File

@@ -87,8 +87,12 @@ func (el *EndpointsLock) Update(ctx context.Context, ler LeaderElectionRecord) e
el.e.Annotations = make(map[string]string)
}
el.e.Annotations[LeaderElectionRecordAnnotationKey] = string(recordBytes)
el.e, err = el.Client.Endpoints(el.EndpointsMeta.Namespace).Update(ctx, el.e, metav1.UpdateOptions{})
return err
e, err := el.Client.Endpoints(el.EndpointsMeta.Namespace).Update(ctx, el.e, metav1.UpdateOptions{})
if err != nil {
return err
}
el.e = e
return nil
}
// RecordEvent in leader election while adding meta-data

View File

@@ -71,9 +71,14 @@ func (ll *LeaseLock) Update(ctx context.Context, ler LeaderElectionRecord) error
return errors.New("lease not initialized, call get or create first")
}
ll.lease.Spec = LeaderElectionRecordToLeaseSpec(&ler)
var err error
ll.lease, err = ll.Client.Leases(ll.LeaseMeta.Namespace).Update(ctx, ll.lease, metav1.UpdateOptions{})
return err
lease, err := ll.Client.Leases(ll.LeaseMeta.Namespace).Update(ctx, ll.lease, metav1.UpdateOptions{})
if err != nil {
return err
}
ll.lease = lease
return nil
}
// RecordEvent in leader election while adding meta-data

View File

@@ -68,6 +68,9 @@ func (c *fakeConnection) CloseChan() <-chan bool {
return c.closeChan
}
func (c *fakeConnection) RemoveStreams(_ ...httpstream.Stream) {
}
func (c *fakeConnection) SetIdleTimeout(timeout time.Duration) {
// no-op
}

View File

@@ -47,10 +47,8 @@ type tlsCacheKey struct {
keyData string
certFile string
keyFile string
getCert string
serverName string
nextProtos string
dial string
disableCompression bool
}
@@ -59,22 +57,24 @@ func (t tlsCacheKey) String() string {
if len(t.keyData) > 0 {
keyText = "<redacted>"
}
return fmt.Sprintf("insecure:%v, caData:%#v, certData:%#v, keyData:%s, getCert: %s, serverName:%s, dial:%s disableCompression:%t", t.insecure, t.caData, t.certData, keyText, t.getCert, t.serverName, t.dial, t.disableCompression)
return fmt.Sprintf("insecure:%v, caData:%#v, certData:%#v, keyData:%s, serverName:%s, disableCompression:%t", t.insecure, t.caData, t.certData, keyText, t.serverName, t.disableCompression)
}
func (c *tlsTransportCache) get(config *Config) (http.RoundTripper, error) {
key, err := tlsConfigKey(config)
key, canCache, err := tlsConfigKey(config)
if err != nil {
return nil, err
}
// Ensure we only create a single transport for the given TLS options
c.mu.Lock()
defer c.mu.Unlock()
if canCache {
// Ensure we only create a single transport for the given TLS options
c.mu.Lock()
defer c.mu.Unlock()
// See if we already have a custom transport for this config
if t, ok := c.transports[key]; ok {
return t, nil
// See if we already have a custom transport for this config
if t, ok := c.transports[key]; ok {
return t, nil
}
}
// Get the TLS options for this client config
@@ -104,8 +104,7 @@ func (c *tlsTransportCache) get(config *Config) (http.RoundTripper, error) {
go dynamicCertDialer.Run(wait.NeverStop)
}
// Cache a single transport for these options
c.transports[key] = utilnet.SetTransportDefaults(&http.Transport{
transport := utilnet.SetTransportDefaults(&http.Transport{
Proxy: http.ProxyFromEnvironment,
TLSHandshakeTimeout: 10 * time.Second,
TLSClientConfig: tlsConfig,
@@ -113,22 +112,32 @@ func (c *tlsTransportCache) get(config *Config) (http.RoundTripper, error) {
DialContext: dial,
DisableCompression: config.DisableCompression,
})
return c.transports[key], nil
if canCache {
// Cache a single transport for these options
c.transports[key] = transport
}
return transport, nil
}
// tlsConfigKey returns a unique key for tls.Config objects returned from TLSConfigFor
func tlsConfigKey(c *Config) (tlsCacheKey, error) {
func tlsConfigKey(c *Config) (tlsCacheKey, bool, error) {
// Make sure ca/key/cert content is loaded
if err := loadTLSFiles(c); err != nil {
return tlsCacheKey{}, err
return tlsCacheKey{}, false, err
}
if c.TLS.GetCert != nil || c.Dial != nil {
// cannot determine equality for functions
return tlsCacheKey{}, false, nil
}
k := tlsCacheKey{
insecure: c.TLS.Insecure,
caData: string(c.TLS.CAData),
getCert: fmt.Sprintf("%p", c.TLS.GetCert),
serverName: c.TLS.ServerName,
nextProtos: strings.Join(c.TLS.NextProtos, ","),
dial: fmt.Sprintf("%p", c.Dial),
disableCompression: c.DisableCompression,
}
@@ -140,5 +149,5 @@ func tlsConfigKey(c *Config) (tlsCacheKey, error) {
k.keyData = string(c.TLS.KeyData)
}
return k, nil
return k, true, nil
}

View File

@@ -36,16 +36,24 @@ func TestTLSConfigKey(t *testing.T) {
}
for nameA, valueA := range identicalConfigurations {
for nameB, valueB := range identicalConfigurations {
keyA, err := tlsConfigKey(valueA)
keyA, canCache, err := tlsConfigKey(valueA)
if err != nil {
t.Errorf("Unexpected error for %q: %v", nameA, err)
continue
}
keyB, err := tlsConfigKey(valueB)
if !canCache {
t.Errorf("Unexpected canCache=false")
continue
}
keyB, canCache, err := tlsConfigKey(valueB)
if err != nil {
t.Errorf("Unexpected error for %q: %v", nameB, err)
continue
}
if !canCache {
t.Errorf("Unexpected canCache=false")
continue
}
if keyA != keyB {
t.Errorf("Expected identical cache keys for %q and %q, got:\n\t%s\n\t%s", nameA, nameB, keyA, keyB)
continue
@@ -131,12 +139,12 @@ func TestTLSConfigKey(t *testing.T) {
}
for nameA, valueA := range uniqueConfigurations {
for nameB, valueB := range uniqueConfigurations {
keyA, err := tlsConfigKey(valueA)
keyA, canCacheA, err := tlsConfigKey(valueA)
if err != nil {
t.Errorf("Unexpected error for %q: %v", nameA, err)
continue
}
keyB, err := tlsConfigKey(valueB)
keyB, canCacheB, err := tlsConfigKey(valueB)
if err != nil {
t.Errorf("Unexpected error for %q: %v", nameB, err)
continue
@@ -147,12 +155,17 @@ func TestTLSConfigKey(t *testing.T) {
if keyA != keyB {
t.Errorf("Expected identical cache keys for %q and %q, got:\n\t%s\n\t%s", nameA, nameB, keyA, keyB)
}
if canCacheA != canCacheB {
t.Errorf("Expected identical canCache %q and %q, got:\n\t%v\n\t%v", nameA, nameB, canCacheA, canCacheB)
}
continue
}
if keyA == keyB {
t.Errorf("Expected unique cache keys for %q and %q, got:\n\t%s\n\t%s", nameA, nameB, keyA, keyB)
continue
if canCacheA && canCacheB {
if keyA == keyB {
t.Errorf("Expected unique cache keys for %q and %q, got:\n\t%s\n\t%s", nameA, nameB, keyA, keyB)
continue
}
}
}
}

View File

@@ -340,6 +340,7 @@ func (r *requestInfo) toCurl() string {
headers := ""
for key, values := range r.RequestHeaders {
for _, value := range values {
value = maskValue(key, value)
headers += fmt.Sprintf(` -H %q`, fmt.Sprintf("%s: %s", key, value))
}
}