Compare commits

..

55 Commits

Author SHA1 Message Date
Kubernetes Publisher
6dd9e8fec7 Fix dependencies to point to kubernetes-1.16.3-beta.0 tag 2019-10-16 11:10:57 +00:00
Kubernetes Publisher
f06fe3961c Merge pull request #83433 from liggitt/automated-cherry-pick-of-#83261-upstream-release-1.16
[1.16] Automated cherry pick of #83261: bump gopkg.in/yaml.v2 v2.2.4

Kubernetes-commit: a2ff5d71deafff1d9688eb6f7533dd239980c420
2019-10-04 12:09:05 +00:00
Jordan Liggitt
3e20d70240 bump gopkg.in/yaml.v2 v2.2.4
Kubernetes-commit: 90a50158270ed5e7406386a12b12b2606c587ffd
2019-10-02 14:46:08 -04:00
Kubernetes Publisher
a39e818a09 Merge pull request #82618 from warmchang/automated-cherry-pick-of-#82446-upstream-release-1.16
Automated cherry pick of #82446: Check cache is synced first before sleeping

Kubernetes-commit: 98a472df01437952de1689730f4b211eb4b7b9c4
2019-09-29 04:01:53 +00:00
Darren Shepherd
6b59ea55de Check cache is synced first before sleeping
If a cache was already synced, cache.WaitForCacheSync would
always take 100ms to complete because the PollUntil method will
sleep first before checking the condition.  Switching to
PollImmediateUntil will ensure already synced caches will return
immediately.  For code that has, for example, 20 informers, the time
to check the cache was in sync would take at least 2 seconds, but with
this change it can be as fast as you can actually load the data.

Signed-off-by: Darren Shepherd <darren@rancher.com>

Kubernetes-commit: d2e0a25c2696aef4ea5445d9b856bad1ebec832b
2019-09-06 16:30:04 -07:00
Kubernetes Publisher
5c318c3197 Merge remote-tracking branch 'origin/master' into release-1.16
Kubernetes-commit: 3f1a4438b1768aca647be6ad2966da54c231331e
2019-09-07 11:50:30 +00:00
Jordan Liggitt
77dac3396c Update client-go install instructions
Kubernetes-commit: df3d143d157c0558ba6b0f642b16697a774ecb94
2019-09-06 11:00:01 -04:00
Kubernetes Publisher
dd35e17ce2 Merge remote-tracking branch 'origin/master' into release-1.16
Kubernetes-commit: 78981197d01e8524d254656095f3cb71a5851e61
2019-08-31 15:21:36 +00:00
Kubernetes Publisher
76b513d726 Merge pull request #77354 from jennybuckley/crd-apply
Use CRD validation field in server-side apply

Kubernetes-commit: ab162cd28c332d0ecfb4f918d5f91e9e57acdb61
2019-08-31 15:54:14 +00:00
Kubernetes Publisher
f98c928ade Merge pull request #82090 from liggitt/webhook-http2
Use http/1.1 for apiserver->webhook clients

Kubernetes-commit: f442b6ef320140730f544527597a140e535f1e1d
2019-08-31 15:54:13 +00:00
Kubernetes Publisher
d8bd9d957c Merge remote-tracking branch 'origin/master' into release-1.16
Kubernetes-commit: 6c190d225f53551bc46db5b3442e95230ab6ecff
2019-08-30 08:27:58 +00:00
Kubernetes Publisher
88d1b77684 Merge pull request #81770 from Hyzhou/fix_link
Fix broken link to api-conventions doc.

Kubernetes-commit: a9f0db16614ae62563ead2018f1692407bd93d8f
2019-08-30 11:54:08 +00:00
Kubernetes Publisher
fc996b4e8f Merge pull request #81947 from RainbowMango/pr_cleanup_staticcheck_for_client_p1
Cleanup client-go static analysis issues-phase 1

Kubernetes-commit: b131b4b4caa8f47df867174ac1eff251c5ad8328
2019-08-30 11:54:07 +00:00
jennybuckley
c4765fa903 Update generated
Kubernetes-commit: badd5b9a26026138e4fc44a643ec1c6b65a7891b
2019-08-29 19:10:28 -07:00
Kubernetes Publisher
8666a3c212 Merge remote-tracking branch 'origin/master' into release-1.16
Kubernetes-commit: 14761b020eadfaaf7864d39aca79c0fc089c609f
2019-08-29 12:32:50 +00:00
Kubernetes Publisher
7b17012442 Merge pull request #80766 from robscott/discovery-api
Adding Discovery API for EndpointSlice

Kubernetes-commit: 6c9f26ca3ab8656673f0f0b05097bdf7e8fbd911
2019-08-29 15:57:03 +00:00
misakazhou
9185f580bf Fix broken link to api-conventions doc.
Signed-off-by: misakazhou <misakazhou@tencent.com>

Kubernetes-commit: f0323a2030c7adae0e0965a7d3b455dd416472a0
2019-08-29 08:35:16 +08:00
Kubernetes Publisher
960df9c231 Merge remote-tracking branch 'origin/master' into release-1.16
Kubernetes-commit: 1aea0dd9383a2f7ac9f7e055ff10a08ea241db51
2019-08-28 15:53:39 +00:00
Jordan Liggitt
072974b1c7 Plumb NextProtos to TLS client config, honor http/2 client preference
Kubernetes-commit: aef05c8dca2c1a9967ebd9a2f67a0bf7fb16f079
2019-08-28 09:55:37 -04:00
RainbowMango
56dbcf5ef6 Cleanup staticcheck issues for package in client-go.
Kubernetes-commit: c8c055b3163dd2661b3f9dd1b0ffb718a61aba24
2019-08-26 20:55:32 +08:00
Pavithra Ramesh
c1bd5124e1 Update vendor k8s-cloud-provider and google API
Ran commands:
hack/pin-dependency.sh github.com/GoogleCloudPlatform/k8s-cloud-provider 27a4ced34534a6c32b63159b100ac0efaa1d37b3
hack/update-vendor.sh

hack/pin-dependency.sh google.golang.org/api 5213b809086156e6e2b262a41394993fcff97439
hack/update-vendor.sh

hack/verify-vendor.sh

merge conflicts

Kubernetes-commit: ce3b145e7369da6c1179346a4e6f4f9992d235b7
2019-08-23 15:51:45 -07:00
Kubernetes Publisher
66e051383d Merge pull request #81527 from yastij/move-controller-util
move WaitForCacheSync to the sharedInformer package

Kubernetes-commit: 927f45191ef34012c890a17a4e77fdbe0f12de94
2019-08-28 15:53:38 +00:00
Kubernetes Publisher
cfbd47c887 Merge remote-tracking branch 'origin/master' into release-1.16
Kubernetes-commit: 2e22e0cb6d1c1790a9927c547d64af729a628689
2019-08-27 05:04:53 +00:00
Kubernetes Publisher
69c62aaa87 Merge pull request #81889 from tedyu/ctrl-rm-refl-mutex
Utilize reflectorMutex in LastSyncResourceVersion

Kubernetes-commit: 9a5dc8f082cc7163e93f773d433e4719282837bb
2019-08-27 07:53:27 +00:00
Ted Yu
35c406afa6 Utilize reflectorMutex in LastSyncResourceVersion
Kubernetes-commit: 4f54538ae8ff7a4e6829bea1479194af87b7833c
2019-08-26 07:46:26 -07:00
Kubernetes Publisher
66eec82cba Merge remote-tracking branch 'origin/master' into release-1.16
Kubernetes-commit: ecd7e4cab747770f5c362ace9bd3487bdb97c66b
2019-08-26 04:48:06 +00:00
Kubernetes Publisher
8d86f340d3 Merge pull request #81507 from eloyekunle/fix/scale-client
update scale client to support non-namespaced resources

Kubernetes-commit: a118e59912ebf3fc6bd79e580048ae6fac7a7431
2019-08-26 11:59:55 +00:00
Kubernetes Publisher
7e45b45e87 Merge pull request #81780 from yastij/fix-nil-panic
check that the recorded event is not nil on refreshExistingEventSeries

Kubernetes-commit: ab7c4a7c3767447cfdee10dc59996836858f764f
2019-08-26 11:59:54 +00:00
Kubernetes Publisher
e0d2c14f19 Merge remote-tracking branch 'origin/master' into release-1.16
Kubernetes-commit: 8d7df36242af72fe3777e3710751f6b99ce23558
2019-08-23 12:10:56 +00:00
Kubernetes Publisher
de5739d200 Merge pull request #81768 from wojtek-t/cleanup_serializers
Remove dead code

Kubernetes-commit: 2cbe49e84925b44803c451d3ab48dabf9d5e7973
2019-08-26 11:59:51 +00:00
Kubernetes Publisher
7fbd0083d9 Merge pull request #81634 from likakuli/fix_errno
fixes a bug that connection refused error cannot be recognized correctly

Kubernetes-commit: 22a8bcf30af52d87a0289bb705ff5c41aaf6f72a
2019-08-26 11:59:49 +00:00
Elijah Oyekunle
d768c4d3c9 update scale client to support non-namespaced resources
Kubernetes-commit: fcd44b44cadef56e9782dcc875f5572cbc43b018
2019-08-23 10:50:45 +01:00
Yassine TIJANI
d975a76928 check that the recorded event is not nil on refreshExistingEventSeries
Signed-off-by: Yassine TIJANI <ytijani@vmware.com>

Kubernetes-commit: 000c2c557f862270ec83341c1af3fcd7c6f7d4a2
2019-08-22 11:56:35 +01:00
wojtekt
c04d72efe3 Remove dead code
Kubernetes-commit: b1675717168826c3314bf5b7e28c282f9643eadf
2019-08-22 08:58:45 +02:00
Kubernetes Publisher
e484c9e80a Merge remote-tracking branch 'origin/master' into release-1.16
Kubernetes-commit: 7501d5a6edd50dbaa72faffc434ba0ea0a19c33e
2019-08-21 10:24:00 -04:00
Kubernetes Publisher
e4af74985a Merge pull request #71757 from mikedanese/fixcancel
implement request cancellation in token transport

Kubernetes-commit: bc1d8c6d61078269540f807248a455cba09e672c
2019-08-22 05:47:51 +00:00
Kubernetes Publisher
2597db3f9d Merge remote-tracking branch 'origin/master' into release-1.16. Deleting CHANGELOG-1.13.md CHANGELOG-1.14.md CHANGELOG-1.15.md
Kubernetes-commit: 92639f719ee353219fa2cd19128a93409bb3a1e3
2019-08-20 10:20:12 +00:00
likakuli
dcdb50aa6e fixes a bug that connection refused error cannot be recognized correctly
Kubernetes-commit: 3cec9098020ecc168573c7ee658282954dac2a2e
2019-08-20 12:18:46 +08:00
Yassine TIJANI
ee2e9c29c1 move WaitForCacheSync to the sharedInformer package
Signed-off-by: Yassine TIJANI <ytijani@vmware.com>

Kubernetes-commit: 7e4c3096fe71afc6a23c273b3309ed5db7289d8c
2019-08-16 21:15:53 +02:00
Tim Allclair
be9bc239ba Bump golang.org/x/tools version for staticcheck compat
Kubernetes-commit: 9a02ef7fe5eb166a53131c52b59844179c4158e9
2019-08-08 11:04:56 -07:00
Kubernetes Publisher
a240a565b0 Merge remote-tracking branch 'origin/master' into release-1.16
Kubernetes-commit: 4771a93d817479e76263c16473d18622f40f7c32
2019-08-16 10:16:47 +00:00
Stephen Augustus
fc8f647b3e Update Azure/azure-sdk-for-go and Azure/go-autorest modules
Signed-off-by: Stephen Augustus <saugustus@vmware.com>

Kubernetes-commit: db855f9ba0ced2303fbb3b9776dd2a0d78b31897
2019-08-15 17:57:49 -04:00
Kubernetes Publisher
e57c476f29 Merge pull request #81394 from cblecker/golang-deps
Update golang/x/net dependency

Kubernetes-commit: 1f6cb3cb9def97320a5412dcbea1661edd95c29e
2019-08-16 10:16:45 +00:00
Kubernetes Publisher
e1fc35c323 Merge pull request #81331 from tariq1890/tst_typo
[k8s.io/client-go]fix typo in eventseries_test.go method

Kubernetes-commit: a520302fb4673e595fcb70d2a4db26598371be92
2019-08-16 10:16:44 +00:00
Kubernetes Publisher
98353c3b1f Merge pull request #81330 from tedyu/hide-auth-hdr
Hide bearer token in logs

Kubernetes-commit: 4441f1d9c3e94d9a3d93b4f184a591cab02a5245
2019-08-16 10:16:42 +00:00
Kubernetes Publisher
48ec506cd3 Merge pull request #81305 from tedyu/evt-patch-err
Error code incorrectly hidden in recordEvent

Kubernetes-commit: f22b67dd8f9f4c1ce7434a6432e4f952ef36ea32
2019-08-16 10:16:41 +00:00
Kubernetes Publisher
2e05e8a827 Merge pull request #81164 from dims/update-to-latest-klog-0.4.0
Update to latest klog 0.4.0

Kubernetes-commit: 461b2d8b9ae818b4b08a6c74d2ec6048fe7a52b6
2019-08-16 10:16:40 +00:00
Christoph Blecker
777a8ca82e Update vendor
Kubernetes-commit: 5f971d6d8862de319edbd24a729a704292c560dc
2019-08-13 17:51:45 -07:00
Christoph Blecker
81d21c0363 Pin golang.org/x/net to cdfb69a
Kubernetes-commit: ebadc53f50219112c3d80039de7b13779f9d4f63
2019-08-13 17:45:53 -07:00
Ted Yu
93c325cd95 Hide bearer token in logs
Kubernetes-commit: 010d8382642119c73cb2405286b347c08d704287
2019-08-13 09:36:13 -07:00
Tariq Ibrahim
c3eee973ab Address review comments
Kubernetes-commit: 805640382339f646a175f04f9e4846d5bf2ef61f
2019-08-13 09:07:39 -07:00
tariqibrahim
c46c59360c [k8s.io/client-go]fix typo in eventseries_test.go method
Kubernetes-commit: 06e1b38768ba243d29dcc6a3fe51381d2624a62f
2019-08-12 21:16:23 -07:00
Ted Yu
7d84cadb4f Error code incorrectly hidden in recordEvent
Kubernetes-commit: 31f374f42a2c816c8fb928c5db46474f5fc24b31
2019-08-12 12:11:37 -07:00
Rob Scott
9d95539429 Adding discovery/v1alpha1 API for EndpointSlices
Kubernetes-commit: f80cee928040ad458fbced70392063e0a5d160e5
2019-07-30 12:48:34 -07:00
Mike Danese
5a0ab6a74c implement request cancellation in token transport
Kubernetes-commit: a42e029e6905bee5b9d5489610c4fbe5988eeac6
2018-12-05 12:36:48 -08:00
54 changed files with 268 additions and 1321 deletions

12
Godeps/Godeps.json generated
View File

@@ -228,11 +228,11 @@
},
{
"ImportPath": "github.com/onsi/ginkgo",
"Rev": "v1.10.1"
"Rev": "v1.8.0"
},
{
"ImportPath": "github.com/onsi/gomega",
"Rev": "v1.7.0"
"Rev": "v1.5.0"
},
{
"ImportPath": "github.com/peterbourgon/diskv",
@@ -344,11 +344,11 @@
},
{
"ImportPath": "k8s.io/api",
"Rev": "35675025e55c"
"Rev": "9d520059cbbd"
},
{
"ImportPath": "k8s.io/apimachinery",
"Rev": "fb3eea214746"
"Rev": "a2eda9f80ab8"
},
{
"ImportPath": "k8s.io/gengo",
@@ -356,7 +356,7 @@
},
{
"ImportPath": "k8s.io/klog",
"Rev": "v1.0.0"
"Rev": "v0.4.0"
},
{
"ImportPath": "k8s.io/kube-openapi",
@@ -364,7 +364,7 @@
},
{
"ImportPath": "k8s.io/utils",
"Rev": "8d271d903fe4"
"Rev": "581e00157fb1"
},
{
"ImportPath": "sigs.k8s.io/structured-merge-diff",

View File

@@ -2,9 +2,9 @@
Go clients for talking to a [kubernetes](http://kubernetes.io/) cluster.
We recommend using the `kubernetes-1.x.y` tag matching the current Kubernetes release (`kubernetes-1.15.3` at the time this was written).
See [INSTALL.md](/INSTALL.md) for detailed installation instructions.
`go get k8s.io/client-go@master` works, but will fetch `master`, which may be less stable than a tagged release.
We currently recommend using the v12.0.0 tag. See [INSTALL.md](/INSTALL.md) for
detailed installation instructions. `go get k8s.io/client-go/...` works, but
will build `master`, which doesn't handle the dependencies well.
[![BuildStatus Widget]][BuildStatus Result]
[![GoReport Widget]][GoReport Status]
@@ -164,8 +164,8 @@ This repository is still a mirror of
the code development is still done in the staging area. Since Kubernetes 1.8
release, when syncing the code from the staging area, we also sync the Kubernetes
version tags to client-go, prefixed with "kubernetes-". For example, if you check
out the `kubernetes-1.15.3` tag in client-go, the code you get is exactly the
same as if you check out the `v1.15.3` tag in Kubernetes, and change directory to
out the `kubernetes-v1.8.0` tag in client-go, the code you get is exactly the
same as if you check out the `v1.8.0` tag in kubernetes, and change directory to
`staging/src/k8s.io/client-go`. The purpose is to let users quickly find matching
commits among published repos, like
[sample-apiserver](https://github.com/kubernetes/sample-apiserver),
@@ -176,13 +176,10 @@ you care about backwards compatibility.
### How to get it
Use go1.11+ and fetch the desired version using the `go get` command. For example:
```
go get k8s.io/client-go@kubernetes-1.15.3
```
See [INSTALL.md](/INSTALL.md) for detailed instructions.
You can use `go get k8s.io/client-go/...` to get client-go, but **you will get
the unstable master branch** and `client-go`'s vendored dependencies will not be
added to your `$GOPATH`. So we think most users will want to use a dependency
management system. See [INSTALL.md](/INSTALL.md) for detailed instructions.
### How to use it

View File

@@ -33,7 +33,7 @@ import (
"k8s.io/client-go/util/retry"
//
// Uncomment to load all auth plugins
// _ "k8s.io/client-go/plugin/pkg/client/auth"
// _ "k8s.io/client-go/plugin/pkg/client/auth
//
// Or uncomment to load specific auth plugins
// _ "k8s.io/client-go/plugin/pkg/client/auth/azure"

View File

@@ -34,7 +34,7 @@ import (
"k8s.io/client-go/util/retry"
//
// Uncomment to load all auth plugins
// _ "k8s.io/client-go/plugin/pkg/client/auth"
// _ "k8s.io/client-go/plugin/pkg/client/auth
//
// Or uncomment to load specific auth plugins
// _ "k8s.io/client-go/plugin/pkg/client/auth/azure"

View File

@@ -27,7 +27,7 @@ import (
"k8s.io/client-go/rest"
//
// Uncomment to load all auth plugins
// _ "k8s.io/client-go/plugin/pkg/client/auth"
// _ "k8s.io/client-go/plugin/pkg/client/auth
//
// Or uncomment to load specific auth plugins
// _ "k8s.io/client-go/plugin/pkg/client/auth/azure"
@@ -48,8 +48,6 @@ func main() {
panic(err.Error())
}
for {
// get pods in all the namespaces by omitting namespace
// Or specify namespace to get pods in particular namespace
pods, err := clientset.CoreV1().Pods("").List(metav1.ListOptions{})
if err != nil {
panic(err.Error())
@@ -57,17 +55,17 @@ func main() {
fmt.Printf("There are %d pods in the cluster\n", len(pods.Items))
// Examples for error handling:
// - Use helper functions e.g. errors.IsNotFound()
// - Use helper functions like e.g. errors.IsNotFound()
// - And/or cast to StatusError and use its properties like e.g. ErrStatus.Message
_, err = clientset.CoreV1().Pods("default").Get("example-xxxxx", metav1.GetOptions{})
if errors.IsNotFound(err) {
fmt.Printf("Pod example-xxxxx not found in default namespace\n")
fmt.Printf("Pod not found\n")
} else if statusError, isStatus := err.(*errors.StatusError); isStatus {
fmt.Printf("Error getting pod %v\n", statusError.ErrStatus.Message)
} else if err != nil {
panic(err.Error())
} else {
fmt.Printf("Found example-xxxxx pod in default namespace\n")
fmt.Printf("Found pod\n")
}
time.Sleep(10 * time.Second)

View File

@@ -7,16 +7,15 @@ This example demonstrates how to use the leader election package.
Run the following three commands in separate terminals. Each terminal needs a unique `id`.
```bash
# first terminal
go run main.go -kubeconfig=/path/to/kubeconfig -logtostderr=true -lease-lock-name=example -lease-lock-namespace=default -id=1
# first terminal
go run *.go -kubeconfig=/my/config -logtostderr=true -id=1
# second terminal
go run main.go -kubeconfig=/path/to/kubeconfig -logtostderr=true -lease-lock-name=example -lease-lock-namespace=default -id=2
# second terminal
go run *.go -kubeconfig=/my/config -logtostderr=true -id=2
# third terminal
go run main.go -kubeconfig=/path/to/kubeconfig -logtostderr=true -lease-lock-name=example -lease-lock-namespace=default -id=3
go run *.go -kubeconfig=/my/config -logtostderr=true -id=3
```
> You can ignore the `-kubeconfig` flag if you are running these commands in the Kubernetes cluster.
Now kill the existing leader. You will see from the terminal outputs that one of the remaining two processes will be elected as the new leader.

View File

@@ -19,18 +19,21 @@ package main
import (
"context"
"flag"
"fmt"
"log"
"os"
"os/signal"
"strings"
"syscall"
"time"
"github.com/google/uuid"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
clientset "k8s.io/client-go/kubernetes"
"k8s.io/client-go/rest"
"k8s.io/client-go/tools/clientcmd"
"k8s.io/client-go/tools/leaderelection"
"k8s.io/client-go/tools/leaderelection/resourcelock"
"k8s.io/client-go/transport"
"k8s.io/klog"
)
@@ -59,16 +62,13 @@ func main() {
var id string
flag.StringVar(&kubeconfig, "kubeconfig", "", "absolute path to the kubeconfig file")
flag.StringVar(&id, "id", uuid.New().String(), "the holder identity name")
flag.StringVar(&leaseLockName, "lease-lock-name", "", "the lease lock resource name")
flag.StringVar(&leaseLockNamespace, "lease-lock-namespace", "", "the lease lock resource namespace")
flag.StringVar(&id, "id", "", "the holder identity name")
flag.StringVar(&leaseLockName, "lease-lock-name", "example", "the lease lock resource name")
flag.StringVar(&leaseLockNamespace, "lease-lock-namespace", "default", "the lease lock resource namespace")
flag.Parse()
if leaseLockName == "" {
klog.Fatal("unable to get lease lock resource name (missing lease-lock-name flag).")
}
if leaseLockNamespace == "" {
klog.Fatal("unable to get lease lock resource namespace (missing lease-lock-namespace flag).")
if id == "" {
klog.Fatal("unable to get id (missing id flag).")
}
// leader election uses the Kubernetes API by writing to a
@@ -82,29 +82,6 @@ func main() {
}
client := clientset.NewForConfigOrDie(config)
run := func(ctx context.Context) {
// complete your controller loop here
klog.Info("Controller loop...")
select {}
}
// use a Go context so we can tell the leaderelection code when we
// want to step down
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
// listen for interrupts or the Linux SIGTERM signal and cancel
// our context, which the leader election code will observe and
// step down
ch := make(chan os.Signal, 1)
signal.Notify(ch, os.Interrupt, syscall.SIGTERM)
go func() {
<-ch
klog.Info("Received termination, signaling shutdown")
cancel()
}()
// we use the Lease lock type since edits to Leases are less common
// and fewer objects in the cluster watch "all Leases".
lock := &resourcelock.LeaseLock{
@@ -118,6 +95,25 @@ func main() {
},
}
// use a Go context so we can tell the leaderelection code when we
// want to step down
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
// use a client that will stop allowing new requests once the context ends
config.Wrap(transport.ContextCanceller(ctx, fmt.Errorf("the leader is shutting down")))
// listen for interrupts or the Linux SIGTERM signal and cancel
// our context, which the leader election code will observe and
// step down
ch := make(chan os.Signal, 1)
signal.Notify(ch, os.Interrupt, syscall.SIGTERM)
go func() {
<-ch
log.Printf("Received termination, signaling shutdown")
cancel()
}()
// start the leader election code loop
leaderelection.RunOrDie(ctx, leaderelection.LeaderElectionConfig{
Lock: lock,
@@ -135,12 +131,12 @@ func main() {
OnStartedLeading: func(ctx context.Context) {
// we're notified when we start - this is where you would
// usually put your code
run(ctx)
klog.Infof("%s: leading", id)
},
OnStoppedLeading: func() {
// we can do cleanup here
klog.Infof("leader lost: %s", id)
os.Exit(0)
// we can do cleanup here, or after the RunOrDie method
// returns
klog.Infof("%s: lost", id)
},
OnNewLeader: func(identity string) {
// we're notified when new leader elected
@@ -148,8 +144,18 @@ func main() {
// I just got the lock
return
}
klog.Infof("new leader elected: %s", identity)
klog.Infof("new leader elected: %v", identity)
},
},
})
// because the context is closed, the client should report errors
_, err = client.CoordinationV1().Leases(leaseLockNamespace).Get(leaseLockName, metav1.GetOptions{})
if err == nil || !strings.Contains(err.Error(), "the leader is shutting down") {
log.Fatalf("%s: expected to get an error when trying to make a client call: %v", id, err)
}
// we no longer hold the lease, so perform any cleanup and then
// exit
log.Printf("%s: done", id)
}

13
go.mod
View File

@@ -14,7 +14,6 @@ require (
github.com/golang/groupcache v0.0.0-20160516000752-02826c3e7903
github.com/golang/protobuf v1.3.1
github.com/google/gofuzz v1.0.0
github.com/google/uuid v1.1.1
github.com/googleapis/gnostic v0.0.0-20170729233727-0c5108395e2d
github.com/gophercloud/gophercloud v0.1.0
github.com/gregjones/httpcache v0.0.0-20170728041850-787624de3eb7
@@ -27,10 +26,10 @@ require (
golang.org/x/oauth2 v0.0.0-20190604053449-0f29369cfe45
golang.org/x/time v0.0.0-20181108054448-85acf8d2951c
google.golang.org/appengine v1.5.0 // indirect
k8s.io/api v0.0.0-20191015145727-35675025e55c
k8s.io/apimachinery v0.0.0-20191014065749-fb3eea214746
k8s.io/klog v1.0.0
k8s.io/utils v0.0.0-20191010214722-8d271d903fe4
k8s.io/api v0.0.0-20191016110406-9d520059cbbd
k8s.io/apimachinery v0.0.0-20191004115801-a2eda9f80ab8
k8s.io/klog v0.4.0
k8s.io/utils v0.0.0-20190801114015-581e00157fb1
sigs.k8s.io/yaml v1.1.0
)
@@ -42,6 +41,6 @@ replace (
golang.org/x/sys => golang.org/x/sys v0.0.0-20190209173611-3b5209105503
golang.org/x/text => golang.org/x/text v0.3.1-0.20181227161524-e6919f6577db
golang.org/x/time => golang.org/x/time v0.0.0-20161028155119-f51c12702a4d
k8s.io/api => k8s.io/api v0.0.0-20191015145727-35675025e55c
k8s.io/apimachinery => k8s.io/apimachinery v0.0.0-20191014065749-fb3eea214746
k8s.io/api => k8s.io/api v0.0.0-20191016110406-9d520059cbbd
k8s.io/apimachinery => k8s.io/apimachinery v0.0.0-20191004115801-a2eda9f80ab8
)

21
go.sum
View File

@@ -64,7 +64,6 @@ github.com/google/gofuzz v1.0.0 h1:A8PeW59pxE9IoFRqBp37U+mSNaQoZ46F1f0f863XSXw=
github.com/google/gofuzz v1.0.0/go.mod h1:dBl0BpW6vV/+mYPU4Po3pmUjxk6FQPldtuIdl/M65Eg=
github.com/google/martian v2.1.0+incompatible/go.mod h1:9I4somxYTbIHy5NJKHRl3wXiIaQGbYVAs8BPL6v8lEs=
github.com/google/pprof v0.0.0-20181206194817-3ea8567a2e57/go.mod h1:zfwlbNMJ+OItoe0UupaVj+oy1omPYYDuagoSzA8v9mc=
github.com/google/uuid v1.1.1 h1:Gkbcsh/GbpXz7lPftLA3P6TYMwjCLYm83jiFQZF/3gY=
github.com/google/uuid v1.1.1/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
github.com/googleapis/gax-go/v2 v2.0.4/go.mod h1:0Wqv26UfaUD9n4G6kQubkQ+KchISgw+vpHVxEJEs9eg=
github.com/googleapis/gnostic v0.0.0-20170729233727-0c5108395e2d h1:7XGaL1e6bYS1yIonGp9761ExpPPV1ui0SAC59Yube9k=
@@ -105,11 +104,11 @@ github.com/mxk/go-flowrate v0.0.0-20140419014527-cca7078d478f/go.mod h1:ZdcZmHo+
github.com/onsi/ginkgo v0.0.0-20170829012221-11459a886d9c/go.mod h1:lLunBs/Ym6LB5Z9jYTR76FiuTmxDTDusOGeTQH+WWjE=
github.com/onsi/ginkgo v1.6.0 h1:Ix8l273rp3QzYgXSR+c8d1fTG7UPgYkOSELPhiY/YGw=
github.com/onsi/ginkgo v1.6.0/go.mod h1:lLunBs/Ym6LB5Z9jYTR76FiuTmxDTDusOGeTQH+WWjE=
github.com/onsi/ginkgo v1.10.1 h1:q/mM8GF/n0shIN8SaAZ0V+jnLPzen6WIVZdiwrRlMlo=
github.com/onsi/ginkgo v1.10.1/go.mod h1:lLunBs/Ym6LB5Z9jYTR76FiuTmxDTDusOGeTQH+WWjE=
github.com/onsi/ginkgo v1.8.0 h1:VkHVNpR4iVnU8XQR6DBm8BqYjN7CRzw+xKUbVVbbW9w=
github.com/onsi/ginkgo v1.8.0/go.mod h1:lLunBs/Ym6LB5Z9jYTR76FiuTmxDTDusOGeTQH+WWjE=
github.com/onsi/gomega v0.0.0-20170829124025-dcabb60a477c/go.mod h1:C1qb7wdrVGGVU+Z6iS04AVkA3Q65CEZX59MT0QO5uiA=
github.com/onsi/gomega v1.7.0 h1:XPnZz8VVBHjVsy1vzJmRwIcSwiUO+JFfrv/xGiigmME=
github.com/onsi/gomega v1.7.0/go.mod h1:ex+gbHU/CVuBBDIJjb2X0qEXbFg53c61hWP/1CpauHY=
github.com/onsi/gomega v1.5.0 h1:izbySO9zDPmjJ8rDjLvkA2zJHIo+HkYXHnf7eN7SSyo=
github.com/onsi/gomega v1.5.0/go.mod h1:ex+gbHU/CVuBBDIJjb2X0qEXbFg53c61hWP/1CpauHY=
github.com/peterbourgon/diskv v2.0.1+incompatible h1:UBdAOUP5p4RWqPBg048CAvpKN+vxiaj6gdUUzhl4XmI=
github.com/peterbourgon/diskv v2.0.1+incompatible/go.mod h1:uqqh8zWWbv1HBMNONnaR/tNboyR3/BZd58JJSHlUSCU=
github.com/pmezard/go-difflib v0.0.0-20151028094244-d8ed2627bdf0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
@@ -180,17 +179,17 @@ gopkg.in/yaml.v2 v2.2.4 h1:/eiJrUcujPVeJ3xlSWaiNi3uSVmDGBK1pDHUHAnao1I=
gopkg.in/yaml.v2 v2.2.4/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
honnef.co/go/tools v0.0.0-20190102054323-c2f93a96b099/go.mod h1:rf3lG4BRIbNafJWhAfAdb/ePZxsR/4RtNHQocxwk9r4=
honnef.co/go/tools v0.0.0-20190106161140-3f1c8253044a/go.mod h1:rf3lG4BRIbNafJWhAfAdb/ePZxsR/4RtNHQocxwk9r4=
k8s.io/api v0.0.0-20191015145727-35675025e55c/go.mod h1:QkEbiSvETjiF869vsgCmJAUDD1VVz6Ut0ULyQP6ivY4=
k8s.io/apimachinery v0.0.0-20191014065749-fb3eea214746/go.mod h1:92mWDd8Ji2sw2157KIgino5wCxffA8KSvhW2oY4ypdw=
k8s.io/api v0.0.0-20191016110406-9d520059cbbd/go.mod h1:/L5qH+AD540e7Cetbui1tuJeXdmNhO8jM6VkXeDdDhQ=
k8s.io/apimachinery v0.0.0-20191004115801-a2eda9f80ab8/go.mod h1:llRdnznGEAqC3DcNm6yEj472xaFVfLM7hnYofMb12tQ=
k8s.io/gengo v0.0.0-20190128074634-0689ccc1d7d6/go.mod h1:ezvh/TsK7cY6rbqRK0oQQ8IAqLxYwwyPxAX1Pzy0ii0=
k8s.io/klog v0.0.0-20181102134211-b9b56d5dfc92/go.mod h1:Gq+BEi5rUBO/HRz0bTSXDUcqjScdoY3a9IHpCEIOOfk=
k8s.io/klog v0.3.0/go.mod h1:Gq+BEi5rUBO/HRz0bTSXDUcqjScdoY3a9IHpCEIOOfk=
k8s.io/klog v1.0.0 h1:Pt+yjF5aB1xDSVbau4VsWe+dQNzA0qv1LlXdC2dF6Q8=
k8s.io/klog v1.0.0/go.mod h1:4Bi6QPql/J/LkTDqv7R/cd3hPo4k2DG6Ptcz060Ez5I=
k8s.io/klog v0.4.0 h1:lCJCxf/LIowc2IGS9TPjWDyXY4nOmdGdfcwwDQCOURQ=
k8s.io/klog v0.4.0/go.mod h1:4Bi6QPql/J/LkTDqv7R/cd3hPo4k2DG6Ptcz060Ez5I=
k8s.io/kube-openapi v0.0.0-20190816220812-743ec37842bf h1:EYm5AW/UUDbnmnI+gK0TJDVK9qPLhM+sRHYanNKw0EQ=
k8s.io/kube-openapi v0.0.0-20190816220812-743ec37842bf/go.mod h1:1TqjTSzOxsLGIKfj0lK8EeCP7K1iUG65v09OM0/WG5E=
k8s.io/utils v0.0.0-20191010214722-8d271d903fe4 h1:Gi+/O1saihwDqnlmC8Vhv1M5Sp4+rbOmK9TbsLn8ZEA=
k8s.io/utils v0.0.0-20191010214722-8d271d903fe4/go.mod h1:sZAwmy6armz5eXlNoLmJcl4F1QuKu7sr+mFQ0byX7Ew=
k8s.io/utils v0.0.0-20190801114015-581e00157fb1 h1:+ySTxfHnfzZb9ys375PXNlLhkJPLKgHajBU0N62BDvE=
k8s.io/utils v0.0.0-20190801114015-581e00157fb1/go.mod h1:sZAwmy6armz5eXlNoLmJcl4F1QuKu7sr+mFQ0byX7Ew=
sigs.k8s.io/structured-merge-diff v0.0.0-20190525122527-15d366b2352e/go.mod h1:wWxsB5ozmmv/SG7nM11ayaAW51xMvak/t1r0CSlcokI=
sigs.k8s.io/yaml v1.1.0 h1:4A07+ZFc2wgJwo8YNlQpr1rVlgUDlxXHhPJciaPY5gs=
sigs.k8s.io/yaml v1.1.0/go.mod h1:UJmg0vDUVViEyp3mgSv9WPwZCDxu4rQW1olrI1uml+o=

View File

@@ -17,17 +17,11 @@ limitations under the License.
package fake
import (
"context"
authenticationapi "k8s.io/api/authentication/v1"
core "k8s.io/client-go/testing"
)
func (c *FakeTokenReviews) Create(tokenReview *authenticationapi.TokenReview) (result *authenticationapi.TokenReview, err error) {
return c.CreateContext(context.Background(), tokenReview)
}
func (c *FakeTokenReviews) CreateContext(ctx context.Context, tokenReview *authenticationapi.TokenReview) (result *authenticationapi.TokenReview, err error) {
obj, err := c.Fake.Invokes(core.NewRootCreateAction(authenticationapi.SchemeGroupVersion.WithResource("tokenreviews"), tokenReview), &authenticationapi.TokenReview{})
return obj.(*authenticationapi.TokenReview), err
}

View File

@@ -17,24 +17,16 @@ limitations under the License.
package v1
import (
"context"
authenticationapi "k8s.io/api/authentication/v1"
)
type TokenReviewExpansion interface {
Create(tokenReview *authenticationapi.TokenReview) (result *authenticationapi.TokenReview, err error)
CreateContext(ctx context.Context, tokenReview *authenticationapi.TokenReview) (result *authenticationapi.TokenReview, err error)
}
func (c *tokenReviews) Create(tokenReview *authenticationapi.TokenReview) (result *authenticationapi.TokenReview, err error) {
return c.CreateContext(context.Background(), tokenReview)
}
func (c *tokenReviews) CreateContext(ctx context.Context, tokenReview *authenticationapi.TokenReview) (result *authenticationapi.TokenReview, err error) {
result = &authenticationapi.TokenReview{}
err = c.client.Post().
Context(ctx).
Resource("tokenreviews").
Body(tokenReview).
Do().

View File

@@ -17,17 +17,11 @@ limitations under the License.
package fake
import (
"context"
authenticationapi "k8s.io/api/authentication/v1beta1"
core "k8s.io/client-go/testing"
)
func (c *FakeTokenReviews) Create(tokenReview *authenticationapi.TokenReview) (result *authenticationapi.TokenReview, err error) {
return c.CreateContext(context.Background(), tokenReview)
}
func (c *FakeTokenReviews) CreateContext(ctx context.Context, tokenReview *authenticationapi.TokenReview) (result *authenticationapi.TokenReview, err error) {
obj, err := c.Fake.Invokes(core.NewRootCreateAction(authenticationapi.SchemeGroupVersion.WithResource("tokenreviews"), tokenReview), &authenticationapi.TokenReview{})
return obj.(*authenticationapi.TokenReview), err
}

View File

@@ -17,24 +17,16 @@ limitations under the License.
package v1beta1
import (
"context"
authenticationapi "k8s.io/api/authentication/v1beta1"
)
type TokenReviewExpansion interface {
Create(tokenReview *authenticationapi.TokenReview) (result *authenticationapi.TokenReview, err error)
CreateContext(ctx context.Context, tokenReview *authenticationapi.TokenReview) (result *authenticationapi.TokenReview, err error)
}
func (c *tokenReviews) Create(tokenReview *authenticationapi.TokenReview) (result *authenticationapi.TokenReview, err error) {
return c.CreateContext(context.Background(), tokenReview)
}
func (c *tokenReviews) CreateContext(ctx context.Context, tokenReview *authenticationapi.TokenReview) (result *authenticationapi.TokenReview, err error) {
result = &authenticationapi.TokenReview{}
err = c.client.Post().
Context(ctx).
Resource("tokenreviews").
Body(tokenReview).
Do().

View File

@@ -17,17 +17,11 @@ limitations under the License.
package fake
import (
"context"
authorizationapi "k8s.io/api/authorization/v1"
core "k8s.io/client-go/testing"
)
func (c *FakeLocalSubjectAccessReviews) Create(sar *authorizationapi.LocalSubjectAccessReview) (result *authorizationapi.LocalSubjectAccessReview, err error) {
return c.CreateContext(context.Background(), sar)
}
func (c *FakeLocalSubjectAccessReviews) CreateContext(ctx context.Context, sar *authorizationapi.LocalSubjectAccessReview) (result *authorizationapi.LocalSubjectAccessReview, err error) {
obj, err := c.Fake.Invokes(core.NewCreateAction(authorizationapi.SchemeGroupVersion.WithResource("localsubjectaccessreviews"), c.ns, sar), &authorizationapi.SubjectAccessReview{})
return obj.(*authorizationapi.LocalSubjectAccessReview), err
}

View File

@@ -17,17 +17,11 @@ limitations under the License.
package fake
import (
"context"
authorizationapi "k8s.io/api/authorization/v1"
core "k8s.io/client-go/testing"
)
func (c *FakeSelfSubjectAccessReviews) Create(sar *authorizationapi.SelfSubjectAccessReview) (result *authorizationapi.SelfSubjectAccessReview, err error) {
return c.CreateContext(context.Background(), sar)
}
func (c *FakeSelfSubjectAccessReviews) CreateContext(ctx context.Context, sar *authorizationapi.SelfSubjectAccessReview) (result *authorizationapi.SelfSubjectAccessReview, err error) {
obj, err := c.Fake.Invokes(core.NewRootCreateAction(authorizationapi.SchemeGroupVersion.WithResource("selfsubjectaccessreviews"), sar), &authorizationapi.SelfSubjectAccessReview{})
return obj.(*authorizationapi.SelfSubjectAccessReview), err
}

View File

@@ -17,17 +17,11 @@ limitations under the License.
package fake
import (
"context"
authorizationapi "k8s.io/api/authorization/v1"
core "k8s.io/client-go/testing"
)
func (c *FakeSelfSubjectRulesReviews) Create(srr *authorizationapi.SelfSubjectRulesReview) (result *authorizationapi.SelfSubjectRulesReview, err error) {
return c.CreateContext(context.Background(), srr)
}
func (c *FakeSelfSubjectRulesReviews) CreateContext(ctx context.Context, srr *authorizationapi.SelfSubjectRulesReview) (result *authorizationapi.SelfSubjectRulesReview, err error) {
obj, err := c.Fake.Invokes(core.NewRootCreateAction(authorizationapi.SchemeGroupVersion.WithResource("selfsubjectrulesreviews"), srr), &authorizationapi.SelfSubjectRulesReview{})
return obj.(*authorizationapi.SelfSubjectRulesReview), err
}

View File

@@ -17,17 +17,11 @@ limitations under the License.
package fake
import (
"context"
authorizationapi "k8s.io/api/authorization/v1"
core "k8s.io/client-go/testing"
)
func (c *FakeSubjectAccessReviews) Create(sar *authorizationapi.SubjectAccessReview) (result *authorizationapi.SubjectAccessReview, err error) {
return c.CreateContext(context.Background(), sar)
}
func (c *FakeSubjectAccessReviews) CreateContext(ctx context.Context, sar *authorizationapi.SubjectAccessReview) (result *authorizationapi.SubjectAccessReview, err error) {
obj, err := c.Fake.Invokes(core.NewRootCreateAction(authorizationapi.SchemeGroupVersion.WithResource("subjectaccessreviews"), sar), &authorizationapi.SubjectAccessReview{})
if obj == nil {
return nil, err

View File

@@ -17,24 +17,16 @@ limitations under the License.
package v1
import (
"context"
authorizationapi "k8s.io/api/authorization/v1"
)
type LocalSubjectAccessReviewExpansion interface {
Create(sar *authorizationapi.LocalSubjectAccessReview) (result *authorizationapi.LocalSubjectAccessReview, err error)
CreateContext(ctx context.Context, sar *authorizationapi.LocalSubjectAccessReview) (result *authorizationapi.LocalSubjectAccessReview, err error)
}
func (c *localSubjectAccessReviews) Create(sar *authorizationapi.LocalSubjectAccessReview) (result *authorizationapi.LocalSubjectAccessReview, err error) {
return c.CreateContext(context.Background(), sar)
}
func (c *localSubjectAccessReviews) CreateContext(ctx context.Context, sar *authorizationapi.LocalSubjectAccessReview) (result *authorizationapi.LocalSubjectAccessReview, err error) {
result = &authorizationapi.LocalSubjectAccessReview{}
err = c.client.Post().
Context(ctx).
Namespace(c.ns).
Resource("localsubjectaccessreviews").
Body(sar).

View File

@@ -17,24 +17,16 @@ limitations under the License.
package v1
import (
"context"
authorizationapi "k8s.io/api/authorization/v1"
)
type SelfSubjectAccessReviewExpansion interface {
Create(sar *authorizationapi.SelfSubjectAccessReview) (result *authorizationapi.SelfSubjectAccessReview, err error)
CreateContext(ctx context.Context, sar *authorizationapi.SelfSubjectAccessReview) (result *authorizationapi.SelfSubjectAccessReview, err error)
}
func (c *selfSubjectAccessReviews) Create(sar *authorizationapi.SelfSubjectAccessReview) (result *authorizationapi.SelfSubjectAccessReview, err error) {
return c.CreateContext(context.Background(), sar)
}
func (c *selfSubjectAccessReviews) CreateContext(ctx context.Context, sar *authorizationapi.SelfSubjectAccessReview) (result *authorizationapi.SelfSubjectAccessReview, err error) {
result = &authorizationapi.SelfSubjectAccessReview{}
err = c.client.Post().
Context(ctx).
Resource("selfsubjectaccessreviews").
Body(sar).
Do().

View File

@@ -17,24 +17,16 @@ limitations under the License.
package v1
import (
"context"
authorizationapi "k8s.io/api/authorization/v1"
)
type SelfSubjectRulesReviewExpansion interface {
Create(srr *authorizationapi.SelfSubjectRulesReview) (result *authorizationapi.SelfSubjectRulesReview, err error)
CreateContext(ctx context.Context, srr *authorizationapi.SelfSubjectRulesReview) (result *authorizationapi.SelfSubjectRulesReview, err error)
}
func (c *selfSubjectRulesReviews) Create(srr *authorizationapi.SelfSubjectRulesReview) (result *authorizationapi.SelfSubjectRulesReview, err error) {
return c.CreateContext(context.Background(), srr)
}
func (c *selfSubjectRulesReviews) CreateContext(ctx context.Context, srr *authorizationapi.SelfSubjectRulesReview) (result *authorizationapi.SelfSubjectRulesReview, err error) {
result = &authorizationapi.SelfSubjectRulesReview{}
err = c.client.Post().
Context(ctx).
Resource("selfsubjectrulesreviews").
Body(srr).
Do().

View File

@@ -17,25 +17,17 @@ limitations under the License.
package v1
import (
"context"
authorizationapi "k8s.io/api/authorization/v1"
)
// The SubjectAccessReviewExpansion interface allows manually adding extra methods to the AuthorizationInterface.
type SubjectAccessReviewExpansion interface {
Create(sar *authorizationapi.SubjectAccessReview) (result *authorizationapi.SubjectAccessReview, err error)
CreateContext(ctx context.Context, sar *authorizationapi.SubjectAccessReview) (result *authorizationapi.SubjectAccessReview, err error)
}
func (c *subjectAccessReviews) Create(sar *authorizationapi.SubjectAccessReview) (result *authorizationapi.SubjectAccessReview, err error) {
return c.CreateContext(context.Background(), sar)
}
func (c *subjectAccessReviews) CreateContext(ctx context.Context, sar *authorizationapi.SubjectAccessReview) (result *authorizationapi.SubjectAccessReview, err error) {
result = &authorizationapi.SubjectAccessReview{}
err = c.client.Post().
Context(ctx).
Resource("subjectaccessreviews").
Body(sar).
Do().

View File

@@ -17,17 +17,11 @@ limitations under the License.
package fake
import (
"context"
authorizationapi "k8s.io/api/authorization/v1beta1"
core "k8s.io/client-go/testing"
)
func (c *FakeLocalSubjectAccessReviews) Create(sar *authorizationapi.LocalSubjectAccessReview) (result *authorizationapi.LocalSubjectAccessReview, err error) {
return c.CreateContext(context.Background(), sar)
}
func (c *FakeLocalSubjectAccessReviews) CreateContext(ctx context.Context, sar *authorizationapi.LocalSubjectAccessReview) (result *authorizationapi.LocalSubjectAccessReview, err error) {
obj, err := c.Fake.Invokes(core.NewCreateAction(authorizationapi.SchemeGroupVersion.WithResource("localsubjectaccessreviews"), c.ns, sar), &authorizationapi.SubjectAccessReview{})
return obj.(*authorizationapi.LocalSubjectAccessReview), err
}

View File

@@ -17,17 +17,11 @@ limitations under the License.
package fake
import (
"context"
authorizationapi "k8s.io/api/authorization/v1beta1"
core "k8s.io/client-go/testing"
)
func (c *FakeSelfSubjectAccessReviews) Create(sar *authorizationapi.SelfSubjectAccessReview) (result *authorizationapi.SelfSubjectAccessReview, err error) {
return c.CreateContext(context.Background(), sar)
}
func (c *FakeSelfSubjectAccessReviews) CreateContext(ctx context.Context, sar *authorizationapi.SelfSubjectAccessReview) (result *authorizationapi.SelfSubjectAccessReview, err error) {
obj, err := c.Fake.Invokes(core.NewRootCreateAction(authorizationapi.SchemeGroupVersion.WithResource("selfsubjectaccessreviews"), sar), &authorizationapi.SelfSubjectAccessReview{})
return obj.(*authorizationapi.SelfSubjectAccessReview), err
}

View File

@@ -17,17 +17,11 @@ limitations under the License.
package fake
import (
"context"
authorizationapi "k8s.io/api/authorization/v1beta1"
core "k8s.io/client-go/testing"
)
func (c *FakeSelfSubjectRulesReviews) Create(srr *authorizationapi.SelfSubjectRulesReview) (result *authorizationapi.SelfSubjectRulesReview, err error) {
return c.CreateContext(context.Background(), srr)
}
func (c *FakeSelfSubjectRulesReviews) CreateContext(ctx context.Context, srr *authorizationapi.SelfSubjectRulesReview) (result *authorizationapi.SelfSubjectRulesReview, err error) {
obj, err := c.Fake.Invokes(core.NewRootCreateAction(authorizationapi.SchemeGroupVersion.WithResource("selfsubjectrulesreviews"), srr), &authorizationapi.SelfSubjectRulesReview{})
return obj.(*authorizationapi.SelfSubjectRulesReview), err
}

View File

@@ -17,17 +17,11 @@ limitations under the License.
package fake
import (
"context"
authorizationapi "k8s.io/api/authorization/v1beta1"
core "k8s.io/client-go/testing"
)
func (c *FakeSubjectAccessReviews) Create(sar *authorizationapi.SubjectAccessReview) (result *authorizationapi.SubjectAccessReview, err error) {
return c.CreateContext(context.Background(), sar)
}
func (c *FakeSubjectAccessReviews) CreateContext(ctx context.Context, sar *authorizationapi.SubjectAccessReview) (result *authorizationapi.SubjectAccessReview, err error) {
obj, err := c.Fake.Invokes(core.NewRootCreateAction(authorizationapi.SchemeGroupVersion.WithResource("subjectaccessreviews"), sar), &authorizationapi.SubjectAccessReview{})
return obj.(*authorizationapi.SubjectAccessReview), err
}

View File

@@ -17,24 +17,16 @@ limitations under the License.
package v1beta1
import (
"context"
authorizationapi "k8s.io/api/authorization/v1beta1"
)
type LocalSubjectAccessReviewExpansion interface {
Create(sar *authorizationapi.LocalSubjectAccessReview) (result *authorizationapi.LocalSubjectAccessReview, err error)
CreateContext(ctx context.Context, sar *authorizationapi.LocalSubjectAccessReview) (result *authorizationapi.LocalSubjectAccessReview, err error)
}
func (c *localSubjectAccessReviews) Create(sar *authorizationapi.LocalSubjectAccessReview) (result *authorizationapi.LocalSubjectAccessReview, err error) {
return c.CreateContext(context.Background(), sar)
}
func (c *localSubjectAccessReviews) CreateContext(ctx context.Context, sar *authorizationapi.LocalSubjectAccessReview) (result *authorizationapi.LocalSubjectAccessReview, err error) {
result = &authorizationapi.LocalSubjectAccessReview{}
err = c.client.Post().
Context(ctx).
Namespace(c.ns).
Resource("localsubjectaccessreviews").
Body(sar).

View File

@@ -17,24 +17,16 @@ limitations under the License.
package v1beta1
import (
"context"
authorizationapi "k8s.io/api/authorization/v1beta1"
)
type SelfSubjectAccessReviewExpansion interface {
Create(sar *authorizationapi.SelfSubjectAccessReview) (result *authorizationapi.SelfSubjectAccessReview, err error)
CreateContext(ctx context.Context, sar *authorizationapi.SelfSubjectAccessReview) (result *authorizationapi.SelfSubjectAccessReview, err error)
}
func (c *selfSubjectAccessReviews) Create(sar *authorizationapi.SelfSubjectAccessReview) (result *authorizationapi.SelfSubjectAccessReview, err error) {
return c.CreateContext(context.Background(), sar)
}
func (c *selfSubjectAccessReviews) CreateContext(ctx context.Context, sar *authorizationapi.SelfSubjectAccessReview) (result *authorizationapi.SelfSubjectAccessReview, err error) {
result = &authorizationapi.SelfSubjectAccessReview{}
err = c.client.Post().
Context(ctx).
Resource("selfsubjectaccessreviews").
Body(sar).
Do().

View File

@@ -17,24 +17,16 @@ limitations under the License.
package v1beta1
import (
"context"
authorizationapi "k8s.io/api/authorization/v1beta1"
)
type SelfSubjectRulesReviewExpansion interface {
Create(srr *authorizationapi.SelfSubjectRulesReview) (result *authorizationapi.SelfSubjectRulesReview, err error)
CreateContext(ctx context.Context, srr *authorizationapi.SelfSubjectRulesReview) (result *authorizationapi.SelfSubjectRulesReview, err error)
}
func (c *selfSubjectRulesReviews) Create(srr *authorizationapi.SelfSubjectRulesReview) (result *authorizationapi.SelfSubjectRulesReview, err error) {
return c.CreateContext(context.Background(), srr)
}
func (c *selfSubjectRulesReviews) CreateContext(ctx context.Context, srr *authorizationapi.SelfSubjectRulesReview) (result *authorizationapi.SelfSubjectRulesReview, err error) {
result = &authorizationapi.SelfSubjectRulesReview{}
err = c.client.Post().
Context(ctx).
Resource("selfsubjectrulesreviews").
Body(srr).
Do().

View File

@@ -17,25 +17,17 @@ limitations under the License.
package v1beta1
import (
"context"
authorizationapi "k8s.io/api/authorization/v1beta1"
)
// The SubjectAccessReviewExpansion interface allows manually adding extra methods to the AuthorizationInterface.
type SubjectAccessReviewExpansion interface {
Create(sar *authorizationapi.SubjectAccessReview) (result *authorizationapi.SubjectAccessReview, err error)
CreateContext(ctx context.Context, sar *authorizationapi.SubjectAccessReview) (result *authorizationapi.SubjectAccessReview, err error)
}
func (c *subjectAccessReviews) Create(sar *authorizationapi.SubjectAccessReview) (result *authorizationapi.SubjectAccessReview, err error) {
return c.CreateContext(context.Background(), sar)
}
func (c *subjectAccessReviews) CreateContext(ctx context.Context, sar *authorizationapi.SubjectAccessReview) (result *authorizationapi.SubjectAccessReview, err error) {
result = &authorizationapi.SubjectAccessReview{}
err = c.client.Post().
Context(ctx).
Resource("subjectaccessreviews").
Body(sar).
Do().

View File

@@ -104,17 +104,17 @@ func (e *events) Search(scheme *runtime.Scheme, objOrRef runtime.Object) (*v1.Ev
if err != nil {
return nil, err
}
if len(e.ns) > 0 && ref.Namespace != e.ns {
if e.ns != "" && ref.Namespace != e.ns {
return nil, fmt.Errorf("won't be able to find any events of namespace '%v' in namespace '%v'", ref.Namespace, e.ns)
}
stringRefKind := string(ref.Kind)
var refKind *string
if len(stringRefKind) > 0 {
if stringRefKind != "" {
refKind = &stringRefKind
}
stringRefUID := string(ref.UID)
var refUID *string
if len(stringRefUID) > 0 {
if stringRefUID != "" {
refUID = &stringRefUID
}
fieldSelector := e.GetFieldSelector(&ref.Name, &ref.Namespace, refKind, refUID)
@@ -124,9 +124,10 @@ func (e *events) Search(scheme *runtime.Scheme, objOrRef runtime.Object) (*v1.Ev
// Returns the appropriate field selector based on the API version being used to communicate with the server.
// The returned field selector can be used with List and Watch to filter desired events.
func (e *events) GetFieldSelector(involvedObjectName, involvedObjectNamespace, involvedObjectKind, involvedObjectUID *string) fields.Selector {
apiVersion := e.client.APIVersion().String()
field := fields.Set{}
if involvedObjectName != nil {
field["involvedObject.name"] = *involvedObjectName
field[GetInvolvedObjectNameFieldLabel(apiVersion)] = *involvedObjectName
}
if involvedObjectNamespace != nil {
field["involvedObject.namespace"] = *involvedObjectNamespace
@@ -141,7 +142,6 @@ func (e *events) GetFieldSelector(involvedObjectName, involvedObjectNamespace, i
}
// Returns the appropriate field label to use for name of the involved object as per the given API version.
// DEPRECATED: please use "involvedObject.name" inline.
func GetInvolvedObjectNameFieldLabel(version string) string {
return "involvedObject.name"
}

View File

@@ -23,7 +23,7 @@ import (
"k8s.io/klog"
metainternalversionscheme "k8s.io/apimachinery/pkg/apis/meta/internalversion/scheme"
metainternalversion "k8s.io/apimachinery/pkg/apis/meta/internalversion"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/schema"
@@ -63,7 +63,7 @@ func ConfigFor(inConfig *rest.Config) *rest.Config {
config := rest.CopyConfig(inConfig)
config.AcceptContentTypes = "application/vnd.kubernetes.protobuf,application/json"
config.ContentType = "application/vnd.kubernetes.protobuf"
config.NegotiatedSerializer = metainternalversionscheme.Codecs.WithoutConversion()
config.NegotiatedSerializer = metainternalversion.Codecs.WithoutConversion()
if config.UserAgent == "" {
config.UserAgent = rest.DefaultKubernetesUserAgent()
}

View File

@@ -16,7 +16,7 @@
def version_x_defs():
# This should match the list of packages in kube::version::ldflag
stamp_pkgs = [
"k8s.io/component-base/version",
"k8s.io/kubernetes/pkg/version",
# In hack/lib/version.sh, this has a vendor/ prefix. That isn't needed here?
"k8s.io/client-go/pkg/version",
]

View File

@@ -190,10 +190,6 @@ func (c *fakeCodec) Encode(obj runtime.Object, stream io.Writer) error {
return nil
}
func (c *fakeCodec) Identifier() runtime.Identifier {
return runtime.Identifier("fake")
}
type fakeRoundTripper struct{}
func (r *fakeRoundTripper) RoundTrip(*http.Request) (*http.Response, error) {

View File

@@ -691,33 +691,6 @@ func (r *Request) Stream() (io.ReadCloser, error) {
}
}
// requestPreflightCheck looks for common programmer errors on Request.
//
// We tackle here two programmer mistakes. The first one is to try to create
// something(POST) using an empty string as namespace with namespaceSet as
// true. If namespaceSet is true then namespace should also be defined. The
// second mistake is, when under the same circumstances, the programmer tries
// to GET, PUT or DELETE a named resource(resourceName != ""), again, if
// namespaceSet is true then namespace must not be empty.
func (r *Request) requestPreflightCheck() error {
if !r.namespaceSet {
return nil
}
if len(r.namespace) > 0 {
return nil
}
switch r.verb {
case "POST":
return fmt.Errorf("an empty namespace may not be set during creation")
case "GET", "PUT", "DELETE":
if len(r.resourceName) > 0 {
return fmt.Errorf("an empty namespace may not be set when a resource name is provided")
}
}
return nil
}
// request connects to the server and invokes the provided function when a server response is
// received. It handles retry behavior and up front validation of requests. It will invoke
// fn at most once. It will return an error if a problem occurred prior to connecting to the
@@ -734,8 +707,12 @@ func (r *Request) request(fn func(*http.Request, *http.Response)) error {
return r.err
}
if err := r.requestPreflightCheck(); err != nil {
return err
// TODO: added to catch programmer errors (invoking operations with an object with an empty namespace)
if (r.verb == "GET" || r.verb == "PUT" || r.verb == "DELETE") && r.namespaceSet && len(r.resourceName) > 0 && len(r.namespace) == 0 {
return fmt.Errorf("an empty namespace may not be set when a resource name is provided")
}
if (r.verb == "POST") && r.namespaceSet && len(r.namespace) == 0 {
return fmt.Errorf("an empty namespace may not be set during creation")
}
client := r.client
@@ -838,6 +815,8 @@ func (r *Request) request(fn func(*http.Request, *http.Response)) error {
// processing.
//
// Error type:
// * If the request can't be constructed, or an error happened earlier while building its
// arguments: *RequestConstructionError
// * If the server responds with a status: *errors.StatusError or *errors.UnexpectedObjectError
// * http.Client.Do errors are returned directly.
func (r *Request) Do() Result {

View File

@@ -1994,66 +1994,3 @@ func defaultResourcePathWithPrefix(prefix, resource, namespace, name string) str
}
return path
}
func TestRequestPreflightCheck(t *testing.T) {
for _, tt := range []struct {
name string
verbs []string
namespace string
resourceName string
namespaceSet bool
expectsErr bool
}{
{
name: "no namespace set",
verbs: []string{"GET", "PUT", "DELETE", "POST"},
namespaceSet: false,
expectsErr: false,
},
{
name: "empty resource name and namespace",
verbs: []string{"GET", "PUT", "DELETE"},
namespaceSet: true,
expectsErr: false,
},
{
name: "resource name with empty namespace",
verbs: []string{"GET", "PUT", "DELETE"},
namespaceSet: true,
resourceName: "ResourceName",
expectsErr: true,
},
{
name: "post empty resource name and namespace",
verbs: []string{"POST"},
namespaceSet: true,
expectsErr: true,
},
{
name: "working requests",
verbs: []string{"GET", "PUT", "DELETE", "POST"},
namespaceSet: true,
resourceName: "ResourceName",
namespace: "Namespace",
expectsErr: false,
},
} {
t.Run(tt.name, func(t *testing.T) {
for _, verb := range tt.verbs {
r := &Request{
verb: verb,
namespace: tt.namespace,
resourceName: tt.resourceName,
namespaceSet: tt.namespaceSet,
}
err := r.requestPreflightCheck()
hasErr := err != nil
if hasErr == tt.expectsErr {
return
}
t.Errorf("%s: expects error: %v, has error: %v", verb, tt.expectsErr, hasErr)
}
})
}
}

View File

@@ -539,6 +539,13 @@ func (f *DeltaFIFO) Resync() error {
return nil
}
func (f *DeltaFIFO) syncKey(key string) error {
f.lock.Lock()
defer f.lock.Unlock()
return f.syncKeyLocked(key)
}
func (f *DeltaFIFO) syncKeyLocked(key string) error {
obj, exists, err := f.knownObjects.GetByKey(key)
if err != nil {

View File

@@ -29,9 +29,7 @@ import (
apierrs "k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/api/meta"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apimachinery/pkg/util/clock"
"k8s.io/apimachinery/pkg/util/naming"
utilnet "k8s.io/apimachinery/pkg/util/net"
@@ -43,22 +41,15 @@ import (
"k8s.io/utils/trace"
)
const defaultExpectedTypeName = "<unspecified>"
// Reflector watches a specified resource and causes all changes to be reflected in the given store.
type Reflector struct {
// name identifies this reflector. By default it will be a file:line if possible.
name string
// metrics tracks basic metric information about the reflector
metrics *reflectorMetrics
// The name of the type we expect to place in the store. The name
// will be the stringification of expectedGVK if provided, and the
// stringification of expectedType otherwise. It is for display
// only, and should not be used for parsing or comparison.
expectedTypeName string
// The type of object we expect to place in the store.
expectedType reflect.Type
// The GVK of the object we expect to place in the store if unstructured.
expectedGVK *schema.GroupVersionKind
// The destination to sync up with the watch source
store Store
// listerWatcher is used to perform lists and watches.
@@ -111,35 +102,14 @@ func NewNamedReflector(name string, lw ListerWatcher, expectedType interface{},
name: name,
listerWatcher: lw,
store: store,
expectedType: reflect.TypeOf(expectedType),
period: time.Second,
resyncPeriod: resyncPeriod,
clock: &clock.RealClock{},
}
r.setExpectedType(expectedType)
return r
}
func (r *Reflector) setExpectedType(expectedType interface{}) {
r.expectedType = reflect.TypeOf(expectedType)
if r.expectedType == nil {
r.expectedTypeName = defaultExpectedTypeName
return
}
r.expectedTypeName = r.expectedType.String()
if obj, ok := expectedType.(*unstructured.Unstructured); ok {
// Use gvk to check that watch event objects are of the desired type.
gvk := obj.GroupVersionKind()
if gvk.Empty() {
klog.V(4).Infof("Reflector from %s configured with expectedType of *unstructured.Unstructured with empty GroupVersionKind.", r.name)
return
}
r.expectedGVK = &gvk
r.expectedTypeName = gvk.String()
}
}
// internalPackages are packages that ignored when creating a default reflector name. These packages are in the common
// call chains to NewReflector, so they'd be low entropy names for reflectors
var internalPackages = []string{"client-go/tools/cache/"}
@@ -147,7 +117,7 @@ var internalPackages = []string{"client-go/tools/cache/"}
// Run starts a watch and handles watch events. Will restart the watch if it is closed.
// Run will exit when stopCh is closed.
func (r *Reflector) Run(stopCh <-chan struct{}) {
klog.V(3).Infof("Starting reflector %v (%s) from %s", r.expectedTypeName, r.resyncPeriod, r.name)
klog.V(3).Infof("Starting reflector %v (%s) from %s", r.expectedType, r.resyncPeriod, r.name)
wait.Until(func() {
if err := r.ListAndWatch(stopCh); err != nil {
utilruntime.HandleError(err)
@@ -159,6 +129,9 @@ var (
// nothing will ever be sent down this channel
neverExitWatch <-chan time.Time = make(chan time.Time)
// Used to indicate that watching stopped so that a resync could happen.
errorResyncRequested = errors.New("resync channel fired")
// Used to indicate that watching stopped because of a signal from the stop
// channel passed in from a client of the reflector.
errorStopRequested = errors.New("Stop requested")
@@ -182,7 +155,7 @@ func (r *Reflector) resyncChan() (<-chan time.Time, func() bool) {
// and then use the resource version to watch.
// It returns error if ListAndWatch didn't even try to initialize watch.
func (r *Reflector) ListAndWatch(stopCh <-chan struct{}) error {
klog.V(3).Infof("Listing and watching %v from %s", r.expectedTypeName, r.name)
klog.V(3).Infof("Listing and watching %v from %s", r.expectedType, r.name)
var resourceVersion string
// Explicitly set "0" as resource version - it's fine for the List()
@@ -223,7 +196,7 @@ func (r *Reflector) ListAndWatch(stopCh <-chan struct{}) error {
case <-listCh:
}
if err != nil {
return fmt.Errorf("%s: Failed to list %v: %v", r.name, r.expectedTypeName, err)
return fmt.Errorf("%s: Failed to list %v: %v", r.name, r.expectedType, err)
}
initTrace.Step("Objects listed")
listMetaInterface, err := meta.ListAccessor(list)
@@ -302,9 +275,9 @@ func (r *Reflector) ListAndWatch(stopCh <-chan struct{}) error {
case io.EOF:
// watch closed normally
case io.ErrUnexpectedEOF:
klog.V(1).Infof("%s: Watch for %v closed with unexpected EOF: %v", r.name, r.expectedTypeName, err)
klog.V(1).Infof("%s: Watch for %v closed with unexpected EOF: %v", r.name, r.expectedType, err)
default:
utilruntime.HandleError(fmt.Errorf("%s: Failed to watch %v: %v", r.name, r.expectedTypeName, err))
utilruntime.HandleError(fmt.Errorf("%s: Failed to watch %v: %v", r.name, r.expectedType, err))
}
// If this is "connection refused" error, it means that most likely apiserver is not responsive.
// It doesn't make sense to re-list all objects because most likely we will be able to restart
@@ -321,9 +294,9 @@ func (r *Reflector) ListAndWatch(stopCh <-chan struct{}) error {
if err != errorStopRequested {
switch {
case apierrs.IsResourceExpired(err):
klog.V(4).Infof("%s: watch of %v ended with: %v", r.name, r.expectedTypeName, err)
klog.V(4).Infof("%s: watch of %v ended with: %v", r.name, r.expectedType, err)
default:
klog.Warningf("%s: watch of %v ended with: %v", r.name, r.expectedTypeName, err)
klog.Warningf("%s: watch of %v ended with: %v", r.name, r.expectedType, err)
}
}
return nil
@@ -363,17 +336,9 @@ loop:
if event.Type == watch.Error {
return apierrs.FromObject(event.Object)
}
if r.expectedType != nil {
if e, a := r.expectedType, reflect.TypeOf(event.Object); e != a {
utilruntime.HandleError(fmt.Errorf("%s: expected type %v, but watch event object had type %v", r.name, e, a))
continue
}
}
if r.expectedGVK != nil {
if e, a := *r.expectedGVK, event.Object.GetObjectKind().GroupVersionKind(); e != a {
utilruntime.HandleError(fmt.Errorf("%s: expected gvk %v, but watch event object had gvk %v", r.name, e, a))
continue
}
if e, a := r.expectedType, reflect.TypeOf(event.Object); e != nil && e != a {
utilruntime.HandleError(fmt.Errorf("%s: expected type %v, but watch event object had type %v", r.name, e, a))
continue
}
meta, err := meta.Accessor(event.Object)
if err != nil {
@@ -415,7 +380,7 @@ loop:
if watchDuration < 1*time.Second && eventCount == 0 {
return fmt.Errorf("very short watch: %s: Unexpected watch close - watch lasted less than a second and no items received", r.name)
}
klog.V(4).Infof("%s: Watch close - %v total %v items received", r.name, r.expectedTypeName, eventCount)
klog.V(4).Infof("%s: Watch close - %v total %v items received", r.name, r.expectedType, eventCount)
return nil
}

View File

@@ -47,6 +47,19 @@ func (noopMetric) Dec() {}
func (noopMetric) Observe(float64) {}
func (noopMetric) Set(float64) {}
type reflectorMetrics struct {
numberOfLists CounterMetric
listDuration SummaryMetric
numberOfItemsInList SummaryMetric
numberOfWatches CounterMetric
numberOfShortWatches CounterMetric
watchDuration SummaryMetric
numberOfItemsInWatch SummaryMetric
lastResourceVersion GaugeMetric
}
// MetricsProvider generates various metrics used by the reflector.
type MetricsProvider interface {
NewListsMetric(name string) CounterMetric

View File

@@ -20,16 +20,13 @@ import (
"errors"
"fmt"
"math/rand"
"reflect"
"strconv"
"testing"
"time"
"k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/apimachinery/pkg/watch"
)
@@ -433,58 +430,3 @@ func TestReflectorWatchListPageSize(t *testing.T) {
t.Errorf("Expected 10 results, got %d", len(results))
}
}
func TestReflectorSetExpectedType(t *testing.T) {
obj := &unstructured.Unstructured{}
gvk := schema.GroupVersionKind{
Group: "mygroup",
Version: "v1",
Kind: "MyKind",
}
obj.SetGroupVersionKind(gvk)
testCases := map[string]struct {
inputType interface{}
expectedTypeName string
expectedType reflect.Type
expectedGVK *schema.GroupVersionKind
}{
"Nil type": {
expectedTypeName: defaultExpectedTypeName,
},
"Normal type": {
inputType: &v1.Pod{},
expectedTypeName: "*v1.Pod",
expectedType: reflect.TypeOf(&v1.Pod{}),
},
"Unstructured type without GVK": {
inputType: &unstructured.Unstructured{},
expectedTypeName: "*unstructured.Unstructured",
expectedType: reflect.TypeOf(&unstructured.Unstructured{}),
},
"Unstructured type with GVK": {
inputType: obj,
expectedTypeName: gvk.String(),
expectedType: reflect.TypeOf(&unstructured.Unstructured{}),
expectedGVK: &gvk,
},
}
for testName, tc := range testCases {
t.Run(testName, func(t *testing.T) {
r := &Reflector{}
r.setExpectedType(tc.inputType)
if tc.expectedType != r.expectedType {
t.Fatalf("Expected expectedType %v, got %v", tc.expectedType, r.expectedType)
}
if tc.expectedTypeName != r.expectedTypeName {
t.Fatalf("Expected expectedTypeName %v, got %v", tc.expectedTypeName, r.expectedTypeName)
}
gvkNotEqual := (tc.expectedGVK == nil) != (r.expectedGVK == nil)
if tc.expectedGVK != nil && r.expectedGVK != nil {
gvkNotEqual = *tc.expectedGVK != *r.expectedGVK
}
if gvkNotEqual {
t.Fatalf("Expected expectedGVK %v, got %v", tc.expectedGVK, r.expectedGVK)
}
})
}
}

View File

@@ -449,15 +449,13 @@ func (config *DirectClientConfig) getCluster() (clientcmdapi.Cluster, error) {
return clientcmdapi.Cluster{}, fmt.Errorf("cluster %q does not exist", clusterInfoName)
}
mergo.MergeWithOverwrite(mergedClusterInfo, config.overrides.ClusterInfo)
// * An override of --insecure-skip-tls-verify=true and no accompanying CA/CA data should clear already-set CA/CA data
// otherwise, a kubeconfig containing a CA reference would return an error that "CA and insecure-skip-tls-verify couldn't both be set".
// * An override of --certificate-authority should also override TLS skip settings and CA data, otherwise existing CA data will take precedence.
// An override of --insecure-skip-tls-verify=true and no accompanying CA/CA data should clear already-set CA/CA data
// otherwise, a kubeconfig containing a CA reference would return an error that "CA and insecure-skip-tls-verify couldn't both be set"
caLen := len(config.overrides.ClusterInfo.CertificateAuthority)
caDataLen := len(config.overrides.ClusterInfo.CertificateAuthorityData)
if config.overrides.ClusterInfo.InsecureSkipTLSVerify || caLen > 0 || caDataLen > 0 {
mergedClusterInfo.InsecureSkipTLSVerify = config.overrides.ClusterInfo.InsecureSkipTLSVerify
mergedClusterInfo.CertificateAuthority = config.overrides.ClusterInfo.CertificateAuthority
mergedClusterInfo.CertificateAuthorityData = config.overrides.ClusterInfo.CertificateAuthorityData
if config.overrides.ClusterInfo.InsecureSkipTLSVerify && caLen == 0 && caDataLen == 0 {
mergedClusterInfo.CertificateAuthority = ""
mergedClusterInfo.CertificateAuthorityData = nil
}
return *mergedClusterInfo, nil

View File

@@ -148,7 +148,7 @@ func TestInsecureOverridesCA(t *testing.T) {
actualCfg, err := clientBuilder.ClientConfig()
if err != nil {
t.Fatalf("Unexpected error: %v", err)
t.Errorf("Unexpected error: %v", err)
}
matchBoolArg(true, actualCfg.Insecure, t)
@@ -156,30 +156,6 @@ func TestInsecureOverridesCA(t *testing.T) {
matchByteArg(nil, actualCfg.TLSClientConfig.CAData, t)
}
func TestCAOverridesCAData(t *testing.T) {
file, err := ioutil.TempFile("", "my.ca")
if err != nil {
t.Fatalf("could not create tempfile: %v", err)
}
defer os.Remove(file.Name())
config := createCAValidTestConfig()
clientBuilder := NewNonInteractiveClientConfig(*config, "clean", &ConfigOverrides{
ClusterInfo: clientcmdapi.Cluster{
CertificateAuthority: file.Name(),
},
}, nil)
actualCfg, err := clientBuilder.ClientConfig()
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
matchBoolArg(false, actualCfg.Insecure, t)
matchStringArg(file.Name(), actualCfg.TLSClientConfig.CAFile, t)
matchByteArg(nil, actualCfg.TLSClientConfig.CAData, t)
}
func TestMergeContext(t *testing.T) {
const namespace = "overridden-namespace"

View File

@@ -53,6 +53,7 @@ type eventKey struct {
action string
reason string
reportingController string
reportingInstance string
regarding corev1.ObjectReference
related corev1.ObjectReference
}
@@ -102,10 +103,6 @@ func newBroadcaster(sink EventSink, sleepDuration time.Duration, eventCache map[
}
}
func (e *eventBroadcasterImpl) Shutdown() {
e.Broadcaster.Shutdown()
}
// refreshExistingEventSeries refresh events TTL
func (e *eventBroadcasterImpl) refreshExistingEventSeries() {
// TODO: Investigate whether lock contention won't be a problem
@@ -267,6 +264,7 @@ func getKey(event *v1beta1.Event) eventKey {
action: event.Action,
reason: event.Reason,
reportingController: event.ReportingController,
reportingInstance: event.ReportingInstance,
regarding: event.Regarding,
}
if event.Related != nil {

View File

@@ -33,9 +33,6 @@ type EventRecorder interface {
// should be in UpperCamelCase format (starting with a capital letter). "reason" will be used
// to automate handling of events, so imagine people writing switch statements to handle them.
// You want to make that easy.
// 'action' explains what happened with regarding/what action did the ReportingController
// (ReportingController is a type of a Controller reporting an Event, e.g. k8s.io/node-controller, k8s.io/kubelet.)
// take in regarding's name; it should be in UpperCamelCase format (starting with a capital letter).
// 'note' is intended to be human readable.
Eventf(regarding runtime.Object, related runtime.Object, eventtype, reason, action, note string, args ...interface{})
}
@@ -54,9 +51,6 @@ type EventBroadcaster interface {
// NOTE: events received on your eventHandler should be copied before being used.
// TODO: figure out if this can be removed.
StartEventWatcher(eventHandler func(event runtime.Object)) func()
// Shutdown shuts down the broadcaster
Shutdown()
}
// EventSink knows how to store events (client-go implements it.)

View File

@@ -21,10 +21,9 @@ import (
"testing"
"time"
"net/http"
"k8s.io/apimachinery/pkg/util/clock"
rl "k8s.io/client-go/tools/leaderelection/resourcelock"
"net/http"
)
type fakeLock struct {
@@ -32,8 +31,8 @@ type fakeLock struct {
}
// Get is a dummy to allow us to have a fakeLock for testing.
func (fl *fakeLock) Get() (ler *rl.LeaderElectionRecord, rawRecord []byte, err error) {
return nil, nil, nil
func (fl *fakeLock) Get() (ler *rl.LeaderElectionRecord, err error) {
return nil, nil
}
// Create is a dummy to allow us to have a fakeLock for testing.

View File

@@ -53,9 +53,9 @@ limitations under the License.
package leaderelection
import (
"bytes"
"context"
"fmt"
"reflect"
"time"
"k8s.io/apimachinery/pkg/api/errors"
@@ -176,9 +176,8 @@ type LeaderCallbacks struct {
type LeaderElector struct {
config LeaderElectionConfig
// internal bookkeeping
observedRecord rl.LeaderElectionRecord
observedRawRecord []byte
observedTime time.Time
observedRecord rl.LeaderElectionRecord
observedTime time.Time
// used to implement OnNewLeader(), may lag slightly from the
// value observedRecord.HolderIdentity if the transition has
// not yet been reported.
@@ -325,7 +324,7 @@ func (le *LeaderElector) tryAcquireOrRenew() bool {
}
// 1. obtain or create the ElectionRecord
oldLeaderElectionRecord, oldLeaderElectionRawRecord, err := le.config.Lock.Get()
oldLeaderElectionRecord, err := le.config.Lock.Get()
if err != nil {
if !errors.IsNotFound(err) {
klog.Errorf("error retrieving resource lock %v: %v", le.config.Lock.Describe(), err)
@@ -341,9 +340,8 @@ func (le *LeaderElector) tryAcquireOrRenew() bool {
}
// 2. Record obtained, check the Identity & Time
if !bytes.Equal(le.observedRawRecord, oldLeaderElectionRawRecord) {
if !reflect.DeepEqual(le.observedRecord, *oldLeaderElectionRecord) {
le.observedRecord = *oldLeaderElectionRecord
le.observedRawRecord = oldLeaderElectionRawRecord
le.observedTime = le.clock.Now()
}
if len(oldLeaderElectionRecord.HolderIdentity) > 0 &&
@@ -367,7 +365,6 @@ func (le *LeaderElector) tryAcquireOrRenew() bool {
klog.Errorf("Failed to update lock: %v", err)
return false
}
le.observedRecord = leaderElectionRecord
le.observedTime = le.clock.Now()
return true

View File

@@ -37,7 +37,7 @@ import (
"k8s.io/client-go/tools/record"
)
func createLockObject(t *testing.T, objectType, namespace, name string, record rl.LeaderElectionRecord) (obj runtime.Object) {
func createLockObject(objectType, namespace, name string, record rl.LeaderElectionRecord) (obj runtime.Object) {
objectMeta := metav1.ObjectMeta{
Namespace: namespace,
Name: name,
@@ -59,7 +59,7 @@ func createLockObject(t *testing.T, objectType, namespace, name string, record r
spec := rl.LeaderElectionRecordToLeaseSpec(&record)
obj = &coordinationv1.Lease{ObjectMeta: objectMeta, Spec: spec}
default:
t.Fatal("unexpected objType:" + objectType)
panic("unexpected objType:" + objectType)
}
return
}
@@ -69,12 +69,6 @@ func TestTryAcquireOrRenewEndpoints(t *testing.T) {
testTryAcquireOrRenew(t, "endpoints")
}
type Reactor struct {
verb string
objectType string
reaction fakeclient.ReactionFunc
}
func testTryAcquireOrRenew(t *testing.T, objectType string) {
future := time.Now().Add(1000 * time.Hour)
past := time.Now().Add(-1000 * time.Hour)
@@ -83,7 +77,10 @@ func testTryAcquireOrRenew(t *testing.T, objectType string) {
name string
observedRecord rl.LeaderElectionRecord
observedTime time.Time
reactors []Reactor
reactors []struct {
verb string
reaction fakeclient.ReactionFunc
}
expectSuccess bool
transitionLeader bool
@@ -91,7 +88,10 @@ func testTryAcquireOrRenew(t *testing.T, objectType string) {
}{
{
name: "acquire from no object",
reactors: []Reactor{
reactors: []struct {
verb string
reaction fakeclient.ReactionFunc
}{
{
verb: "get",
reaction: func(action fakeclient.Action) (handled bool, ret runtime.Object, err error) {
@@ -110,11 +110,14 @@ func testTryAcquireOrRenew(t *testing.T, objectType string) {
},
{
name: "acquire from unled object",
reactors: []Reactor{
reactors: []struct {
verb string
reaction fakeclient.ReactionFunc
}{
{
verb: "get",
reaction: func(action fakeclient.Action) (handled bool, ret runtime.Object, err error) {
return true, createLockObject(t, objectType, action.GetNamespace(), action.(fakeclient.GetAction).GetName(), rl.LeaderElectionRecord{}), nil
return true, createLockObject(objectType, action.GetNamespace(), action.(fakeclient.GetAction).GetName(), rl.LeaderElectionRecord{}), nil
},
},
{
@@ -131,11 +134,14 @@ func testTryAcquireOrRenew(t *testing.T, objectType string) {
},
{
name: "acquire from led, unacked object",
reactors: []Reactor{
reactors: []struct {
verb string
reaction fakeclient.ReactionFunc
}{
{
verb: "get",
reaction: func(action fakeclient.Action) (handled bool, ret runtime.Object, err error) {
return true, createLockObject(t, objectType, action.GetNamespace(), action.(fakeclient.GetAction).GetName(), rl.LeaderElectionRecord{HolderIdentity: "bing"}), nil
return true, createLockObject(objectType, action.GetNamespace(), action.(fakeclient.GetAction).GetName(), rl.LeaderElectionRecord{HolderIdentity: "bing"}), nil
},
},
{
@@ -154,11 +160,14 @@ func testTryAcquireOrRenew(t *testing.T, objectType string) {
},
{
name: "acquire from empty led, acked object",
reactors: []Reactor{
reactors: []struct {
verb string
reaction fakeclient.ReactionFunc
}{
{
verb: "get",
reaction: func(action fakeclient.Action) (handled bool, ret runtime.Object, err error) {
return true, createLockObject(t, objectType, action.GetNamespace(), action.(fakeclient.GetAction).GetName(), rl.LeaderElectionRecord{HolderIdentity: ""}), nil
return true, createLockObject(objectType, action.GetNamespace(), action.(fakeclient.GetAction).GetName(), rl.LeaderElectionRecord{HolderIdentity: ""}), nil
},
},
{
@@ -176,11 +185,14 @@ func testTryAcquireOrRenew(t *testing.T, objectType string) {
},
{
name: "don't acquire from led, acked object",
reactors: []Reactor{
reactors: []struct {
verb string
reaction fakeclient.ReactionFunc
}{
{
verb: "get",
reaction: func(action fakeclient.Action) (handled bool, ret runtime.Object, err error) {
return true, createLockObject(t, objectType, action.GetNamespace(), action.(fakeclient.GetAction).GetName(), rl.LeaderElectionRecord{HolderIdentity: "bing"}), nil
return true, createLockObject(objectType, action.GetNamespace(), action.(fakeclient.GetAction).GetName(), rl.LeaderElectionRecord{HolderIdentity: "bing"}), nil
},
},
},
@@ -191,11 +203,14 @@ func testTryAcquireOrRenew(t *testing.T, objectType string) {
},
{
name: "renew already acquired object",
reactors: []Reactor{
reactors: []struct {
verb string
reaction fakeclient.ReactionFunc
}{
{
verb: "get",
reaction: func(action fakeclient.Action) (handled bool, ret runtime.Object, err error) {
return true, createLockObject(t, objectType, action.GetNamespace(), action.(fakeclient.GetAction).GetName(), rl.LeaderElectionRecord{HolderIdentity: "baz"}), nil
return true, createLockObject(objectType, action.GetNamespace(), action.(fakeclient.GetAction).GetName(), rl.LeaderElectionRecord{HolderIdentity: "baz"}), nil
},
},
{
@@ -267,14 +282,13 @@ func testTryAcquireOrRenew(t *testing.T, objectType string) {
},
},
}
observedRawRecord := GetRawRecordOrDie(t, objectType, test.observedRecord)
le := &LeaderElector{
config: lec,
observedRecord: test.observedRecord,
observedRawRecord: observedRawRecord,
observedTime: test.observedTime,
clock: clock.RealClock{},
config: lec,
observedRecord: test.observedRecord,
observedTime: test.observedTime,
clock: clock.RealClock{},
}
if test.expectSuccess != le.tryAcquireOrRenew() {
t.Errorf("unexpected result of tryAcquireOrRenew: [succeeded=%v]", !test.expectSuccess)
}
@@ -338,560 +352,3 @@ func TestLeaseSpecToLeaderElectionRecordRoundTrip(t *testing.T) {
t.Errorf("diff: %v", diff.ObjectReflectDiff(oldRecord, newRecord))
}
}
func multiLockType(t *testing.T, objectType string) (primaryType, secondaryType string) {
switch objectType {
case rl.EndpointsLeasesResourceLock:
return rl.EndpointsResourceLock, rl.LeasesResourceLock
case rl.ConfigMapsLeasesResourceLock:
return rl.ConfigMapsResourceLock, rl.LeasesResourceLock
default:
t.Fatal("unexpected objType:" + objectType)
}
return
}
func GetRawRecordOrDie(t *testing.T, objectType string, ler rl.LeaderElectionRecord) (ret []byte) {
var err error
switch objectType {
case "endpoints", "configmaps", "leases":
ret, err = json.Marshal(ler)
if err != nil {
t.Fatalf("lock %s get raw record %v failed: %v", objectType, ler, err)
}
case "endpointsleases", "configmapsleases":
recordBytes, err := json.Marshal(ler)
if err != nil {
t.Fatalf("lock %s get raw record %v failed: %v", objectType, ler, err)
}
ret = rl.ConcatRawRecord(recordBytes, recordBytes)
default:
t.Fatal("unexpected objType:" + objectType)
}
return
}
func testTryAcquireOrRenewMultiLock(t *testing.T, objectType string) {
future := time.Now().Add(1000 * time.Hour)
past := time.Now().Add(-1000 * time.Hour)
primaryType, secondaryType := multiLockType(t, objectType)
tests := []struct {
name string
observedRecord rl.LeaderElectionRecord
observedRawRecord []byte
observedTime time.Time
reactors []Reactor
expectSuccess bool
transitionLeader bool
outHolder string
}{
{
name: "acquire from no object",
reactors: []Reactor{
{
verb: "get",
objectType: primaryType,
reaction: func(action fakeclient.Action) (handled bool, ret runtime.Object, err error) {
return true, nil, errors.NewNotFound(action.(fakeclient.GetAction).GetResource().GroupResource(), action.(fakeclient.GetAction).GetName())
},
},
{
verb: "create",
objectType: primaryType,
reaction: func(action fakeclient.Action) (handled bool, ret runtime.Object, err error) {
return true, action.(fakeclient.CreateAction).GetObject(), nil
},
},
{
verb: "create",
objectType: secondaryType,
reaction: func(action fakeclient.Action) (handled bool, ret runtime.Object, err error) {
return true, action.(fakeclient.CreateAction).GetObject(), nil
},
},
},
expectSuccess: true,
outHolder: "baz",
},
{
name: "acquire from unled old object",
reactors: []Reactor{
{
verb: "get",
objectType: primaryType,
reaction: func(action fakeclient.Action) (handled bool, ret runtime.Object, err error) {
return true, createLockObject(t, primaryType, action.GetNamespace(), action.(fakeclient.GetAction).GetName(), rl.LeaderElectionRecord{}), nil
},
},
{
verb: "get",
objectType: secondaryType,
reaction: func(action fakeclient.Action) (handled bool, ret runtime.Object, err error) {
return true, nil, errors.NewNotFound(action.(fakeclient.GetAction).GetResource().GroupResource(), action.(fakeclient.GetAction).GetName())
},
},
{
verb: "update",
objectType: primaryType,
reaction: func(action fakeclient.Action) (handled bool, ret runtime.Object, err error) {
return true, action.(fakeclient.UpdateAction).GetObject(), nil
},
},
{
verb: "get",
objectType: secondaryType,
reaction: func(action fakeclient.Action) (handled bool, ret runtime.Object, err error) {
return true, nil, errors.NewNotFound(action.(fakeclient.GetAction).GetResource().GroupResource(), action.(fakeclient.GetAction).GetName())
},
},
{
verb: "create",
objectType: secondaryType,
reaction: func(action fakeclient.Action) (handled bool, ret runtime.Object, err error) {
return true, action.(fakeclient.CreateAction).GetObject(), nil
},
},
},
expectSuccess: true,
transitionLeader: true,
outHolder: "baz",
},
{
name: "acquire from unled transition object",
reactors: []Reactor{
{
verb: "get",
objectType: primaryType,
reaction: func(action fakeclient.Action) (handled bool, ret runtime.Object, err error) {
return true, createLockObject(t, primaryType, action.GetNamespace(), action.(fakeclient.GetAction).GetName(), rl.LeaderElectionRecord{}), nil
},
},
{
verb: "get",
objectType: secondaryType,
reaction: func(action fakeclient.Action) (handled bool, ret runtime.Object, err error) {
return true, createLockObject(t, secondaryType, action.GetNamespace(), action.(fakeclient.GetAction).GetName(), rl.LeaderElectionRecord{}), nil
},
},
{
verb: "update",
objectType: primaryType,
reaction: func(action fakeclient.Action) (handled bool, ret runtime.Object, err error) {
return true, action.(fakeclient.UpdateAction).GetObject(), nil
},
},
{
verb: "get",
objectType: secondaryType,
reaction: func(action fakeclient.Action) (handled bool, ret runtime.Object, err error) {
return true, createLockObject(t, secondaryType, action.GetNamespace(), action.(fakeclient.GetAction).GetName(), rl.LeaderElectionRecord{}), nil
},
},
{
verb: "update",
objectType: secondaryType,
reaction: func(action fakeclient.Action) (handled bool, ret runtime.Object, err error) {
return true, action.(fakeclient.UpdateAction).GetObject(), nil
},
},
},
expectSuccess: true,
transitionLeader: true,
outHolder: "baz",
},
{
name: "acquire from led, unack old object",
reactors: []Reactor{
{
verb: "get",
objectType: primaryType,
reaction: func(action fakeclient.Action) (handled bool, ret runtime.Object, err error) {
return true, createLockObject(t, primaryType, action.GetNamespace(), action.(fakeclient.GetAction).GetName(), rl.LeaderElectionRecord{HolderIdentity: "bing"}), nil
},
},
{
verb: "get",
objectType: secondaryType,
reaction: func(action fakeclient.Action) (handled bool, ret runtime.Object, err error) {
return true, nil, errors.NewNotFound(action.(fakeclient.GetAction).GetResource().GroupResource(), action.(fakeclient.GetAction).GetName())
},
},
{
verb: "update",
objectType: primaryType,
reaction: func(action fakeclient.Action) (handled bool, ret runtime.Object, err error) {
return true, action.(fakeclient.UpdateAction).GetObject(), nil
},
},
{
verb: "get",
objectType: secondaryType,
reaction: func(action fakeclient.Action) (handled bool, ret runtime.Object, err error) {
return true, createLockObject(t, secondaryType, action.GetNamespace(), action.(fakeclient.GetAction).GetName(), rl.LeaderElectionRecord{HolderIdentity: "bing"}), nil
},
},
{
verb: "create",
objectType: secondaryType,
reaction: func(action fakeclient.Action) (handled bool, ret runtime.Object, err error) {
return true, action.(fakeclient.CreateAction).GetObject(), nil
},
},
},
observedRecord: rl.LeaderElectionRecord{HolderIdentity: "bing"},
observedRawRecord: GetRawRecordOrDie(t, primaryType, rl.LeaderElectionRecord{HolderIdentity: "bing"}),
observedTime: past,
expectSuccess: true,
transitionLeader: true,
outHolder: "baz",
},
{
name: "acquire from led, unack transition object",
reactors: []Reactor{
{
verb: "get",
objectType: primaryType,
reaction: func(action fakeclient.Action) (handled bool, ret runtime.Object, err error) {
return true, createLockObject(t, primaryType, action.GetNamespace(), action.(fakeclient.GetAction).GetName(), rl.LeaderElectionRecord{HolderIdentity: "bing"}), nil
},
},
{
verb: "get",
objectType: secondaryType,
reaction: func(action fakeclient.Action) (handled bool, ret runtime.Object, err error) {
return true, createLockObject(t, secondaryType, action.GetNamespace(), action.(fakeclient.GetAction).GetName(), rl.LeaderElectionRecord{HolderIdentity: "bing"}), nil
},
},
{
verb: "update",
objectType: primaryType,
reaction: func(action fakeclient.Action) (handled bool, ret runtime.Object, err error) {
return true, action.(fakeclient.UpdateAction).GetObject(), nil
},
},
{
verb: "get",
objectType: secondaryType,
reaction: func(action fakeclient.Action) (handled bool, ret runtime.Object, err error) {
return true, createLockObject(t, secondaryType, action.GetNamespace(), action.(fakeclient.GetAction).GetName(), rl.LeaderElectionRecord{HolderIdentity: "bing"}), nil
},
},
{
verb: "update",
objectType: secondaryType,
reaction: func(action fakeclient.Action) (handled bool, ret runtime.Object, err error) {
return true, action.(fakeclient.UpdateAction).GetObject(), nil
},
},
},
observedRecord: rl.LeaderElectionRecord{HolderIdentity: "bing"},
observedRawRecord: GetRawRecordOrDie(t, objectType, rl.LeaderElectionRecord{HolderIdentity: "bing"}),
observedTime: past,
expectSuccess: true,
transitionLeader: true,
outHolder: "baz",
},
{
name: "acquire from conflict led, ack transition object",
reactors: []Reactor{
{
verb: "get",
objectType: primaryType,
reaction: func(action fakeclient.Action) (handled bool, ret runtime.Object, err error) {
return true, createLockObject(t, primaryType, action.GetNamespace(), action.(fakeclient.GetAction).GetName(), rl.LeaderElectionRecord{HolderIdentity: "bing"}), nil
},
},
{
verb: "get",
objectType: secondaryType,
reaction: func(action fakeclient.Action) (handled bool, ret runtime.Object, err error) {
return true, createLockObject(t, secondaryType, action.GetNamespace(), action.(fakeclient.GetAction).GetName(), rl.LeaderElectionRecord{HolderIdentity: "baz"}), nil
},
},
},
observedRecord: rl.LeaderElectionRecord{HolderIdentity: "bing"},
observedRawRecord: GetRawRecordOrDie(t, objectType, rl.LeaderElectionRecord{HolderIdentity: "bing"}),
observedTime: future,
expectSuccess: false,
outHolder: rl.UnknownLeader,
},
{
name: "acquire from led, unack unknown object",
reactors: []Reactor{
{
verb: "get",
objectType: primaryType,
reaction: func(action fakeclient.Action) (handled bool, ret runtime.Object, err error) {
return true, createLockObject(t, primaryType, action.GetNamespace(), action.(fakeclient.GetAction).GetName(), rl.LeaderElectionRecord{HolderIdentity: rl.UnknownLeader}), nil
},
},
{
verb: "get",
objectType: secondaryType,
reaction: func(action fakeclient.Action) (handled bool, ret runtime.Object, err error) {
return true, createLockObject(t, secondaryType, action.GetNamespace(), action.(fakeclient.GetAction).GetName(), rl.LeaderElectionRecord{HolderIdentity: rl.UnknownLeader}), nil
},
},
{
verb: "update",
objectType: primaryType,
reaction: func(action fakeclient.Action) (handled bool, ret runtime.Object, err error) {
return true, action.(fakeclient.UpdateAction).GetObject(), nil
},
},
{
verb: "get",
objectType: secondaryType,
reaction: func(action fakeclient.Action) (handled bool, ret runtime.Object, err error) {
return true, createLockObject(t, secondaryType, action.GetNamespace(), action.(fakeclient.GetAction).GetName(), rl.LeaderElectionRecord{HolderIdentity: rl.UnknownLeader}), nil
},
},
{
verb: "update",
objectType: secondaryType,
reaction: func(action fakeclient.Action) (handled bool, ret runtime.Object, err error) {
return true, action.(fakeclient.UpdateAction).GetObject(), nil
},
},
},
observedRecord: rl.LeaderElectionRecord{HolderIdentity: rl.UnknownLeader},
observedRawRecord: GetRawRecordOrDie(t, objectType, rl.LeaderElectionRecord{HolderIdentity: rl.UnknownLeader}),
observedTime: past,
expectSuccess: true,
transitionLeader: true,
outHolder: "baz",
},
{
name: "don't acquire from led, ack old object",
reactors: []Reactor{
{
verb: "get",
objectType: primaryType,
reaction: func(action fakeclient.Action) (handled bool, ret runtime.Object, err error) {
return true, createLockObject(t, primaryType, action.GetNamespace(), action.(fakeclient.GetAction).GetName(), rl.LeaderElectionRecord{HolderIdentity: "bing"}), nil
},
},
{
verb: "get",
objectType: secondaryType,
reaction: func(action fakeclient.Action) (handled bool, ret runtime.Object, err error) {
return true, nil, errors.NewNotFound(action.(fakeclient.GetAction).GetResource().GroupResource(), action.(fakeclient.GetAction).GetName())
},
},
},
observedRecord: rl.LeaderElectionRecord{HolderIdentity: "bing"},
observedRawRecord: GetRawRecordOrDie(t, primaryType, rl.LeaderElectionRecord{HolderIdentity: "bing"}),
observedTime: future,
expectSuccess: false,
outHolder: "bing",
},
{
name: "don't acquire from led, acked new object, observe new record",
reactors: []Reactor{
{
verb: "get",
objectType: primaryType,
reaction: func(action fakeclient.Action) (handled bool, ret runtime.Object, err error) {
return true, createLockObject(t, primaryType, action.GetNamespace(), action.(fakeclient.GetAction).GetName(), rl.LeaderElectionRecord{HolderIdentity: "baz"}), nil
},
},
{
verb: "get",
objectType: secondaryType,
reaction: func(action fakeclient.Action) (handled bool, ret runtime.Object, err error) {
return true, createLockObject(t, secondaryType, action.GetNamespace(), action.(fakeclient.GetAction).GetName(), rl.LeaderElectionRecord{HolderIdentity: "bing"}), nil
},
},
},
observedRecord: rl.LeaderElectionRecord{HolderIdentity: "bing"},
observedRawRecord: GetRawRecordOrDie(t, secondaryType, rl.LeaderElectionRecord{HolderIdentity: "bing"}),
observedTime: future,
expectSuccess: false,
outHolder: rl.UnknownLeader,
},
{
name: "don't acquire from led, acked new object, observe transition record",
reactors: []Reactor{
{
verb: "get",
objectType: primaryType,
reaction: func(action fakeclient.Action) (handled bool, ret runtime.Object, err error) {
return true, createLockObject(t, primaryType, action.GetNamespace(), action.(fakeclient.GetAction).GetName(), rl.LeaderElectionRecord{HolderIdentity: "bing"}), nil
},
},
{
verb: "get",
objectType: secondaryType,
reaction: func(action fakeclient.Action) (handled bool, ret runtime.Object, err error) {
return true, createLockObject(t, secondaryType, action.GetNamespace(), action.(fakeclient.GetAction).GetName(), rl.LeaderElectionRecord{HolderIdentity: "bing"}), nil
},
},
},
observedRecord: rl.LeaderElectionRecord{HolderIdentity: "bing"},
observedRawRecord: GetRawRecordOrDie(t, objectType, rl.LeaderElectionRecord{HolderIdentity: "bing"}),
observedTime: future,
expectSuccess: false,
outHolder: "bing",
},
{
name: "renew already required object",
reactors: []Reactor{
{
verb: "get",
objectType: primaryType,
reaction: func(action fakeclient.Action) (handled bool, ret runtime.Object, err error) {
return true, createLockObject(t, primaryType, action.GetNamespace(), action.(fakeclient.GetAction).GetName(), rl.LeaderElectionRecord{HolderIdentity: "baz"}), nil
},
},
{
verb: "get",
objectType: secondaryType,
reaction: func(action fakeclient.Action) (handled bool, ret runtime.Object, err error) {
return true, createLockObject(t, secondaryType, action.GetNamespace(), action.(fakeclient.GetAction).GetName(), rl.LeaderElectionRecord{HolderIdentity: "baz"}), nil
},
},
{
verb: "update",
objectType: primaryType,
reaction: func(action fakeclient.Action) (handled bool, ret runtime.Object, err error) {
return true, action.(fakeclient.UpdateAction).GetObject(), nil
},
},
{
verb: "get",
objectType: secondaryType,
reaction: func(action fakeclient.Action) (handled bool, ret runtime.Object, err error) {
return true, createLockObject(t, secondaryType, action.GetNamespace(), action.(fakeclient.GetAction).GetName(), rl.LeaderElectionRecord{HolderIdentity: "baz"}), nil
},
},
{
verb: "update",
objectType: secondaryType,
reaction: func(action fakeclient.Action) (handled bool, ret runtime.Object, err error) {
return true, action.(fakeclient.UpdateAction).GetObject(), nil
},
},
},
observedRecord: rl.LeaderElectionRecord{HolderIdentity: "baz"},
observedRawRecord: GetRawRecordOrDie(t, objectType, rl.LeaderElectionRecord{HolderIdentity: "baz"}),
observedTime: future,
expectSuccess: true,
outHolder: "baz",
},
}
for i := range tests {
test := &tests[i]
t.Run(test.name, func(t *testing.T) {
// OnNewLeader is called async so we have to wait for it.
var wg sync.WaitGroup
wg.Add(1)
var reportedLeader string
var lock rl.Interface
objectMeta := metav1.ObjectMeta{Namespace: "foo", Name: "bar"}
resourceLockConfig := rl.ResourceLockConfig{
Identity: "baz",
EventRecorder: &record.FakeRecorder{},
}
c := &fake.Clientset{}
for _, reactor := range test.reactors {
c.AddReactor(reactor.verb, reactor.objectType, reactor.reaction)
}
c.AddReactor("*", "*", func(action fakeclient.Action) (bool, runtime.Object, error) {
t.Errorf("unreachable action. testclient called too many times: %+v", action)
return true, nil, fmt.Errorf("unreachable action")
})
switch objectType {
case rl.EndpointsLeasesResourceLock:
lock = &rl.MultiLock{
Primary: &rl.EndpointsLock{
EndpointsMeta: objectMeta,
LockConfig: resourceLockConfig,
Client: c.CoreV1(),
},
Secondary: &rl.LeaseLock{
LeaseMeta: objectMeta,
LockConfig: resourceLockConfig,
Client: c.CoordinationV1(),
},
}
case rl.ConfigMapsLeasesResourceLock:
lock = &rl.MultiLock{
Primary: &rl.ConfigMapLock{
ConfigMapMeta: objectMeta,
LockConfig: resourceLockConfig,
Client: c.CoreV1(),
},
Secondary: &rl.LeaseLock{
LeaseMeta: objectMeta,
LockConfig: resourceLockConfig,
Client: c.CoordinationV1(),
},
}
}
lec := LeaderElectionConfig{
Lock: lock,
LeaseDuration: 10 * time.Second,
Callbacks: LeaderCallbacks{
OnNewLeader: func(l string) {
defer wg.Done()
reportedLeader = l
},
},
}
le := &LeaderElector{
config: lec,
observedRecord: test.observedRecord,
observedRawRecord: test.observedRawRecord,
observedTime: test.observedTime,
clock: clock.RealClock{},
}
if test.expectSuccess != le.tryAcquireOrRenew() {
t.Errorf("unexpected result of tryAcquireOrRenew: [succeeded=%v]", !test.expectSuccess)
}
le.observedRecord.AcquireTime = metav1.Time{}
le.observedRecord.RenewTime = metav1.Time{}
if le.observedRecord.HolderIdentity != test.outHolder {
t.Errorf("expected holder:\n\t%+v\ngot:\n\t%+v", test.outHolder, le.observedRecord.HolderIdentity)
}
if len(test.reactors) != len(c.Actions()) {
t.Errorf("wrong number of api interactions")
}
if test.transitionLeader && le.observedRecord.LeaderTransitions != 1 {
t.Errorf("leader should have transitioned but did not")
}
if !test.transitionLeader && le.observedRecord.LeaderTransitions != 0 {
t.Errorf("leader should not have transitioned but did")
}
le.maybeReportTransition()
wg.Wait()
if reportedLeader != test.outHolder {
t.Errorf("reported leader was not the new leader. expected %q, got %q", test.outHolder, reportedLeader)
}
})
}
}
// Will test leader election using endpointsleases as the resource
func TestTryAcquireOrRenewEndpointsLeases(t *testing.T) {
testTryAcquireOrRenewMultiLock(t, "endpointsleases")
}
// Will test leader election using configmapsleases as the resource
func TestTryAcquireOrRenewConfigMapsLeases(t *testing.T) {
testTryAcquireOrRenewMultiLock(t, "configmapsleases")
}

View File

@@ -41,23 +41,22 @@ type ConfigMapLock struct {
}
// Get returns the election record from a ConfigMap Annotation
func (cml *ConfigMapLock) Get() (*LeaderElectionRecord, []byte, error) {
func (cml *ConfigMapLock) Get() (*LeaderElectionRecord, error) {
var record LeaderElectionRecord
var err error
cml.cm, err = cml.Client.ConfigMaps(cml.ConfigMapMeta.Namespace).Get(cml.ConfigMapMeta.Name, metav1.GetOptions{})
if err != nil {
return nil, nil, err
return nil, err
}
if cml.cm.Annotations == nil {
cml.cm.Annotations = make(map[string]string)
}
recordBytes, found := cml.cm.Annotations[LeaderElectionRecordAnnotationKey]
if found {
if recordBytes, found := cml.cm.Annotations[LeaderElectionRecordAnnotationKey]; found {
if err := json.Unmarshal([]byte(recordBytes), &record); err != nil {
return nil, nil, err
return nil, err
}
}
return &record, []byte(recordBytes), nil
return &record, nil
}
// Create attempts to create a LeaderElectionRecord annotation
@@ -107,7 +106,7 @@ func (cml *ConfigMapLock) Describe() string {
return fmt.Sprintf("%v/%v", cml.ConfigMapMeta.Namespace, cml.ConfigMapMeta.Name)
}
// Identity returns the Identity of the lock
// returns the Identity of the lock
func (cml *ConfigMapLock) Identity() string {
return cml.LockConfig.Identity
}

View File

@@ -36,23 +36,22 @@ type EndpointsLock struct {
}
// Get returns the election record from a Endpoints Annotation
func (el *EndpointsLock) Get() (*LeaderElectionRecord, []byte, error) {
func (el *EndpointsLock) Get() (*LeaderElectionRecord, error) {
var record LeaderElectionRecord
var err error
el.e, err = el.Client.Endpoints(el.EndpointsMeta.Namespace).Get(el.EndpointsMeta.Name, metav1.GetOptions{})
if err != nil {
return nil, nil, err
return nil, err
}
if el.e.Annotations == nil {
el.e.Annotations = make(map[string]string)
}
recordBytes, found := el.e.Annotations[LeaderElectionRecordAnnotationKey]
if found {
if recordBytes, found := el.e.Annotations[LeaderElectionRecordAnnotationKey]; found {
if err := json.Unmarshal([]byte(recordBytes), &record); err != nil {
return nil, nil, err
return nil, err
}
}
return &record, []byte(recordBytes), nil
return &record, nil
}
// Create attempts to create a LeaderElectionRecord annotation
@@ -102,7 +101,7 @@ func (el *EndpointsLock) Describe() string {
return fmt.Sprintf("%v/%v", el.EndpointsMeta.Namespace, el.EndpointsMeta.Name)
}
// Identity returns the Identity of the lock
// returns the Identity of the lock
func (el *EndpointsLock) Identity() string {
return el.LockConfig.Identity
}

View File

@@ -30,8 +30,6 @@ const (
EndpointsResourceLock = "endpoints"
ConfigMapsResourceLock = "configmaps"
LeasesResourceLock = "leases"
EndpointsLeasesResourceLock = "endpointsleases"
ConfigMapsLeasesResourceLock = "configmapsleases"
)
// LeaderElectionRecord is the record that is stored in the leader election annotation.
@@ -73,7 +71,7 @@ type ResourceLockConfig struct {
// by the leaderelection code.
type Interface interface {
// Get returns the LeaderElectionRecord
Get() (*LeaderElectionRecord, []byte, error)
Get() (*LeaderElectionRecord, error)
// Create attempts to create a LeaderElectionRecord
Create(ler LeaderElectionRecord) error
@@ -94,46 +92,33 @@ type Interface interface {
// Manufacture will create a lock of a given type according to the input parameters
func New(lockType string, ns string, name string, coreClient corev1.CoreV1Interface, coordinationClient coordinationv1.CoordinationV1Interface, rlc ResourceLockConfig) (Interface, error) {
endpointsLock := &EndpointsLock{
EndpointsMeta: metav1.ObjectMeta{
Namespace: ns,
Name: name,
},
Client: coreClient,
LockConfig: rlc,
}
configmapLock := &ConfigMapLock{
ConfigMapMeta: metav1.ObjectMeta{
Namespace: ns,
Name: name,
},
Client: coreClient,
LockConfig: rlc,
}
leaseLock := &LeaseLock{
LeaseMeta: metav1.ObjectMeta{
Namespace: ns,
Name: name,
},
Client: coordinationClient,
LockConfig: rlc,
}
switch lockType {
case EndpointsResourceLock:
return endpointsLock, nil
case ConfigMapsResourceLock:
return configmapLock, nil
case LeasesResourceLock:
return leaseLock, nil
case EndpointsLeasesResourceLock:
return &MultiLock{
Primary: endpointsLock,
Secondary: leaseLock,
return &EndpointsLock{
EndpointsMeta: metav1.ObjectMeta{
Namespace: ns,
Name: name,
},
Client: coreClient,
LockConfig: rlc,
}, nil
case ConfigMapsLeasesResourceLock:
return &MultiLock{
Primary: configmapLock,
Secondary: leaseLock,
case ConfigMapsResourceLock:
return &ConfigMapLock{
ConfigMapMeta: metav1.ObjectMeta{
Namespace: ns,
Name: name,
},
Client: coreClient,
LockConfig: rlc,
}, nil
case LeasesResourceLock:
return &LeaseLock{
LeaseMeta: metav1.ObjectMeta{
Namespace: ns,
Name: name,
},
Client: coordinationClient,
LockConfig: rlc,
}, nil
default:
return nil, fmt.Errorf("Invalid lock-type %s", lockType)

View File

@@ -17,7 +17,6 @@ limitations under the License.
package resourcelock
import (
"encoding/json"
"errors"
"fmt"
@@ -37,18 +36,13 @@ type LeaseLock struct {
}
// Get returns the election record from a Lease spec
func (ll *LeaseLock) Get() (*LeaderElectionRecord, []byte, error) {
func (ll *LeaseLock) Get() (*LeaderElectionRecord, error) {
var err error
ll.lease, err = ll.Client.Leases(ll.LeaseMeta.Namespace).Get(ll.LeaseMeta.Name, metav1.GetOptions{})
if err != nil {
return nil, nil, err
return nil, err
}
record := LeaseSpecToLeaderElectionRecord(&ll.lease.Spec)
recordByte, err := json.Marshal(*record)
if err != nil {
return nil, nil, err
}
return record, recordByte, nil
return LeaseSpecToLeaderElectionRecord(&ll.lease.Spec), nil
}
// Create attempts to create a Lease
@@ -90,7 +84,7 @@ func (ll *LeaseLock) Describe() string {
return fmt.Sprintf("%v/%v", ll.LeaseMeta.Namespace, ll.LeaseMeta.Name)
}
// Identity returns the Identity of the lock
// returns the Identity of the lock
func (ll *LeaseLock) Identity() string {
return ll.LockConfig.Identity
}

View File

@@ -1,103 +0,0 @@
/*
Copyright 2019 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package resourcelock
import (
"bytes"
"encoding/json"
apierrors "k8s.io/apimachinery/pkg/api/errors"
)
const (
UnknownLeader = "leaderelection.k8s.io/unknown"
)
// MultiLock is used for lock's migration
type MultiLock struct {
Primary Interface
Secondary Interface
}
// Get returns the older election record of the lock
func (ml *MultiLock) Get() (*LeaderElectionRecord, []byte, error) {
primary, primaryRaw, err := ml.Primary.Get()
if err != nil {
return nil, nil, err
}
secondary, secondaryRaw, err := ml.Secondary.Get()
if err != nil {
// Lock is held by old client
if apierrors.IsNotFound(err) && primary.HolderIdentity != ml.Identity() {
return primary, primaryRaw, nil
}
return nil, nil, err
}
if primary.HolderIdentity != secondary.HolderIdentity {
primary.HolderIdentity = UnknownLeader
primaryRaw, err = json.Marshal(primary)
if err != nil {
return nil, nil, err
}
}
return primary, ConcatRawRecord(primaryRaw, secondaryRaw), nil
}
// Create attempts to create both primary lock and secondary lock
func (ml *MultiLock) Create(ler LeaderElectionRecord) error {
err := ml.Primary.Create(ler)
if err != nil && !apierrors.IsAlreadyExists(err) {
return err
}
return ml.Secondary.Create(ler)
}
// Update will update and existing annotation on both two resources.
func (ml *MultiLock) Update(ler LeaderElectionRecord) error {
err := ml.Primary.Update(ler)
if err != nil {
return err
}
_, _, err = ml.Secondary.Get()
if err != nil && apierrors.IsNotFound(err) {
return ml.Secondary.Create(ler)
}
return ml.Secondary.Update(ler)
}
// RecordEvent in leader election while adding meta-data
func (ml *MultiLock) RecordEvent(s string) {
ml.Primary.RecordEvent(s)
ml.Secondary.RecordEvent(s)
}
// Describe is used to convert details on current resource lock
// into a string
func (ml *MultiLock) Describe() string {
return ml.Primary.Describe()
}
// Identity returns the Identity of the lock
func (ml *MultiLock) Identity() string {
return ml.Primary.Identity()
}
func ConcatRawRecord(primaryRaw, secondaryRaw []byte) []byte {
return bytes.Join([][]byte{primaryRaw, secondaryRaw}, []byte(","))
}

View File

@@ -127,9 +127,6 @@ type EventBroadcaster interface {
// NewRecorder returns an EventRecorder that can be used to send events to this EventBroadcaster
// with the event source set to the given event source.
NewRecorder(scheme *runtime.Scheme, source v1.EventSource) EventRecorder
// Shutdown shuts down the broadcaster
Shutdown()
}
// Creates a new event broadcaster.
@@ -164,18 +161,14 @@ type eventBroadcasterImpl struct {
// StartRecordingToSink starts sending events received from the specified eventBroadcaster to the given sink.
// The return value can be ignored or used to stop recording, if desired.
// TODO: make me an object with parameterizable queue length and retry interval
func (e *eventBroadcasterImpl) StartRecordingToSink(sink EventSink) watch.Interface {
eventCorrelator := NewEventCorrelatorWithOptions(e.options)
return e.StartEventWatcher(
func (eventBroadcaster *eventBroadcasterImpl) StartRecordingToSink(sink EventSink) watch.Interface {
eventCorrelator := NewEventCorrelatorWithOptions(eventBroadcaster.options)
return eventBroadcaster.StartEventWatcher(
func(event *v1.Event) {
recordToSink(sink, event, eventCorrelator, e.sleepDuration)
recordToSink(sink, event, eventCorrelator, eventBroadcaster.sleepDuration)
})
}
func (e *eventBroadcasterImpl) Shutdown() {
e.Broadcaster.Shutdown()
}
func recordToSink(sink EventSink, event *v1.Event, eventCorrelator *EventCorrelator, sleepDuration time.Duration) {
// Make a copy before modification, because there could be multiple listeners.
// Events are safe to copy like this.
@@ -256,8 +249,8 @@ func recordEvent(sink EventSink, event *v1.Event, patch []byte, updateExistingEv
// StartLogging starts sending events received from this EventBroadcaster to the given logging function.
// The return value can be ignored or used to stop recording, if desired.
func (e *eventBroadcasterImpl) StartLogging(logf func(format string, args ...interface{})) watch.Interface {
return e.StartEventWatcher(
func (eventBroadcaster *eventBroadcasterImpl) StartLogging(logf func(format string, args ...interface{})) watch.Interface {
return eventBroadcaster.StartEventWatcher(
func(e *v1.Event) {
logf("Event(%#v): type: '%v' reason: '%v' %v", e.InvolvedObject, e.Type, e.Reason, e.Message)
})
@@ -265,8 +258,8 @@ func (e *eventBroadcasterImpl) StartLogging(logf func(format string, args ...int
// StartEventWatcher starts sending events received from this EventBroadcaster to the given event handler function.
// The return value can be ignored or used to stop recording, if desired.
func (e *eventBroadcasterImpl) StartEventWatcher(eventHandler func(*v1.Event)) watch.Interface {
watcher := e.Watch()
func (eventBroadcaster *eventBroadcasterImpl) StartEventWatcher(eventHandler func(*v1.Event)) watch.Interface {
watcher := eventBroadcaster.Watch()
go func() {
defer utilruntime.HandleCrash()
for watchEvent := range watcher.ResultChan() {
@@ -283,8 +276,8 @@ func (e *eventBroadcasterImpl) StartEventWatcher(eventHandler func(*v1.Event)) w
}
// NewRecorder returns an EventRecorder that records events with the given event source.
func (e *eventBroadcasterImpl) NewRecorder(scheme *runtime.Scheme, source v1.EventSource) EventRecorder {
return &recorderImpl{scheme, source, e.Broadcaster, clock.RealClock{}}
func (eventBroadcaster *eventBroadcasterImpl) NewRecorder(scheme *runtime.Scheme, source v1.EventSource) EventRecorder {
return &recorderImpl{scheme, source, eventBroadcaster.Broadcaster, clock.RealClock{}}
}
type recorderImpl struct {

View File

@@ -72,22 +72,7 @@ func WriteCert(certPath string, data []byte) error {
// NewPool returns an x509.CertPool containing the certificates in the given PEM-encoded file.
// Returns an error if the file could not be read, a certificate could not be parsed, or if the file does not contain any certificates
func NewPool(filename string) (*x509.CertPool, error) {
pemBlock, err := ioutil.ReadFile(filename)
if err != nil {
return nil, err
}
pool, err := NewPoolFromBytes(pemBlock)
if err != nil {
return nil, fmt.Errorf("error creating pool from %s: %s", filename, err)
}
return pool, nil
}
// NewPoolFromBytes returns an x509.CertPool containing the certificates in the given PEM-encoded bytes.
// Returns an error if the file could not be read, a certificate could not be parsed, or if the file does not contain any certificates
func NewPoolFromBytes(pemBlock []byte) (*x509.CertPool, error) {
certs, err := ParseCertsPEM(pemBlock)
certs, err := CertsFromFile(filename)
if err != nil {
return nil, err
}

View File

@@ -42,64 +42,43 @@ var DefaultBackoff = wait.Backoff{
Jitter: 0.1,
}
// OnError allows the caller to retry fn in case the error returned by fn is retriable
// according to the provided function. backoff defines the maximum retries and the wait
// interval between two retries.
func OnError(backoff wait.Backoff, retriable func(error) bool, fn func() error) error {
var lastErr error
// OnError executes the provided function repeatedly, retrying if the server returns a specified
// error. Callers should preserve previous executions if they wish to retry changes. It performs an
// exponential backoff.
//
// var pod *api.Pod
// err := retry.OnError(DefaultBackoff, errors.IsConflict, func() (err error) {
// pod, err = c.Pods("mynamespace").UpdateStatus(podStatus)
// return
// })
// if err != nil {
// // may be conflict if max retries were hit
// return err
// }
// ...
//
// TODO: Make Backoff an interface?
func OnError(backoff wait.Backoff, errorFunc func(error) bool, fn func() error) error {
var lastConflictErr error
err := wait.ExponentialBackoff(backoff, func() (bool, error) {
err := fn()
switch {
case err == nil:
return true, nil
case retriable(err):
lastErr = err
case errorFunc(err):
lastConflictErr = err
return false, nil
default:
return false, err
}
})
if err == wait.ErrWaitTimeout {
err = lastErr
err = lastConflictErr
}
return err
}
// RetryOnConflict is used to make an update to a resource when you have to worry about
// conflicts caused by other code making unrelated updates to the resource at the same
// time. fn should fetch the resource to be modified, make appropriate changes to it, try
// to update it, and return (unmodified) the error from the update function. On a
// successful update, RetryOnConflict will return nil. If the update function returns a
// "Conflict" error, RetryOnConflict will wait some amount of time as described by
// backoff, and then try again. On a non-"Conflict" error, or if it retries too many times
// and gives up, RetryOnConflict will return an error to the caller.
//
// err := retry.RetryOnConflict(retry.DefaultRetry, func() error {
// // Fetch the resource here; you need to refetch it on every try, since
// // if you got a conflict on the last update attempt then you need to get
// // the current version before making your own changes.
// pod, err := c.Pods("mynamespace").Get(name, metav1.GetOptions{})
// if err ! nil {
// return err
// }
//
// // Make whatever updates to the resource are needed
// pod.Status.Phase = v1.PodFailed
//
// // Try to update
// _, err = c.Pods("mynamespace").UpdateStatus(pod)
// // You have to return err itself here (not wrapped inside another error)
// // so that RetryOnConflict can identify it correctly.
// return err
// })
// if err != nil {
// // May be conflict if max retries were hit, or may be something unrelated
// // like permissions or a network error
// return err
// }
// ...
//
// TODO: Make Backoff an interface?
// RetryOnConflict executes the function function repeatedly, retrying if the server returns a conflicting
func RetryOnConflict(backoff wait.Backoff, fn func() error) error {
return OnError(backoff, errors.IsConflict, fn)
}