mirror of
https://github.com/kubernetes/client-go.git
synced 2026-05-16 04:12:13 +00:00
Compare commits
36 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
5be835d31a | ||
|
|
f20171e958 | ||
|
|
d98cb4daa7 | ||
|
|
a1640357a4 | ||
|
|
94abfc9324 | ||
|
|
9b7807ea0b | ||
|
|
25e88fd0c2 | ||
|
|
a3636b916c | ||
|
|
5ddf75f6a2 | ||
|
|
d32b1d7c08 | ||
|
|
e53b407e85 | ||
|
|
51c17ec9ab | ||
|
|
3c81ccc036 | ||
|
|
8205c38649 | ||
|
|
3bb5a08497 | ||
|
|
e9d3761795 | ||
|
|
8a3d757be0 | ||
|
|
58381003a1 | ||
|
|
d51156025a | ||
|
|
c3942e6cb3 | ||
|
|
705901c8a7 | ||
|
|
e42bae1174 | ||
|
|
27f722275a | ||
|
|
212fb452f3 | ||
|
|
133a487bfa | ||
|
|
ab9a2e8199 | ||
|
|
b2883ba878 | ||
|
|
fcb591bc0a | ||
|
|
b63afdf260 | ||
|
|
9c1395ec93 | ||
|
|
5aa9be71ec | ||
|
|
166ab05d11 | ||
|
|
4b5a63801d | ||
|
|
8e9dfea2c4 | ||
|
|
75d93aae71 | ||
|
|
9f44018370 |
20
go.mod
20
go.mod
@@ -15,19 +15,19 @@ require (
|
||||
github.com/google/gnostic v0.5.7-v3refs
|
||||
github.com/google/go-cmp v0.5.8
|
||||
github.com/google/gofuzz v1.1.0
|
||||
github.com/google/uuid v1.1.2
|
||||
github.com/google/uuid v1.3.0
|
||||
github.com/gregjones/httpcache v0.0.0-20180305231024-9cad4c3443a7
|
||||
github.com/imdario/mergo v0.3.6
|
||||
github.com/peterbourgon/diskv v2.0.1+incompatible
|
||||
github.com/spf13/pflag v1.0.5
|
||||
github.com/stretchr/testify v1.7.0
|
||||
golang.org/x/net v0.0.0-20220722155237-a158d28d115b
|
||||
github.com/stretchr/testify v1.8.0
|
||||
golang.org/x/net v0.8.0
|
||||
golang.org/x/oauth2 v0.0.0-20211104180415-d3ed0bb246c8
|
||||
golang.org/x/term v0.0.0-20210927222741-03fcf44c2211
|
||||
golang.org/x/term v0.6.0
|
||||
golang.org/x/time v0.0.0-20220210224613-90d013bbcef8
|
||||
google.golang.org/protobuf v1.28.0
|
||||
k8s.io/api v0.0.0-20220909104543-fce3016c0f35
|
||||
k8s.io/apimachinery v0.0.0-20220909103826-10b456c523c7
|
||||
k8s.io/api v0.25.13
|
||||
k8s.io/apimachinery v0.25.13
|
||||
k8s.io/klog/v2 v2.70.1
|
||||
k8s.io/kube-openapi v0.0.0-20220803162953-67bda5d908f1
|
||||
k8s.io/utils v0.0.0-20220728103510-ee6ede2d64ed
|
||||
@@ -60,8 +60,8 @@ require (
|
||||
github.com/pkg/errors v0.9.1 // indirect
|
||||
github.com/pmezard/go-difflib v1.0.0 // indirect
|
||||
golang.org/x/crypto v0.0.0-20220315160706-3147a52a75dd // indirect
|
||||
golang.org/x/sys v0.0.0-20220722155257-8c9f86f7a55f // indirect
|
||||
golang.org/x/text v0.3.7 // indirect
|
||||
golang.org/x/sys v0.6.0 // indirect
|
||||
golang.org/x/text v0.8.0 // indirect
|
||||
google.golang.org/appengine v1.6.7 // indirect
|
||||
gopkg.in/inf.v0 v0.9.1 // indirect
|
||||
gopkg.in/yaml.v2 v2.4.0 // indirect
|
||||
@@ -70,6 +70,6 @@ require (
|
||||
)
|
||||
|
||||
replace (
|
||||
k8s.io/api => k8s.io/api v0.0.0-20220909104543-fce3016c0f35
|
||||
k8s.io/apimachinery => k8s.io/apimachinery v0.0.0-20220909103826-10b456c523c7
|
||||
k8s.io/api => k8s.io/api v0.25.13
|
||||
k8s.io/apimachinery => k8s.io/apimachinery v0.25.13
|
||||
)
|
||||
|
||||
33
go.sum
33
go.sum
@@ -194,8 +194,9 @@ github.com/google/pprof v0.0.0-20210601050228-01bbb1931b22/go.mod h1:kpwsk12EmLe
|
||||
github.com/google/pprof v0.0.0-20210609004039-a478d1d731e9/go.mod h1:kpwsk12EmLew5upagYY7GY0pfYCcupk39gWOCRROcvE=
|
||||
github.com/google/pprof v0.0.0-20210720184732-4bb14d4b1be1/go.mod h1:kpwsk12EmLew5upagYY7GY0pfYCcupk39gWOCRROcvE=
|
||||
github.com/google/renameio v0.1.0/go.mod h1:KWCgfxg9yswjAJkECMjeO8J8rahYeXnNhOm40UhjYkI=
|
||||
github.com/google/uuid v1.1.2 h1:EVhdT+1Kseyi1/pUmXKaFxYsDNy9RQYkMWRH68J/W7Y=
|
||||
github.com/google/uuid v1.1.2/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
|
||||
github.com/google/uuid v1.3.0 h1:t6JiXgmwXMjEs8VusXIJk2BXHsn+wx8BZdTaoZ5fu7I=
|
||||
github.com/google/uuid v1.3.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
|
||||
github.com/googleapis/gax-go/v2 v2.0.4/go.mod h1:0Wqv26UfaUD9n4G6kQubkQ+KchISgw+vpHVxEJEs9eg=
|
||||
github.com/googleapis/gax-go/v2 v2.0.5/go.mod h1:DWXyrwAJ9X0FpwwEdw+IPEYBICEFu5mhpdKc/us6bOk=
|
||||
github.com/googleapis/gax-go/v2 v2.1.0/go.mod h1:Q3nei7sK6ybPYH7twZdmQpAd1MKb7pfu6SK+H1/DsU0=
|
||||
@@ -255,12 +256,14 @@ github.com/spf13/pflag v1.0.5 h1:iy+VFUOCP1a+8yFto/drg2CJ5u0yRoB7fZw3DKv/JXA=
|
||||
github.com/spf13/pflag v1.0.5/go.mod h1:McXfInJRrz4CZXVZOBLb0bTZqETkiAhM9Iw0y3An2Bg=
|
||||
github.com/stoewer/go-strcase v1.2.0/go.mod h1:IBiWB2sKIp3wVVQ3Y035++gc+knqhUQag1KpM8ahLw8=
|
||||
github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
|
||||
github.com/stretchr/objx v0.4.0/go.mod h1:YvHI0jy2hoMjB+UWwv71VJQ9isScKT/TqJzVSSt89Yw=
|
||||
github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI=
|
||||
github.com/stretchr/testify v1.4.0/go.mod h1:j7eGeouHqKxXV5pUuKE4zz7dFj8WfuZ+81PSLYec5m4=
|
||||
github.com/stretchr/testify v1.5.1/go.mod h1:5W2xD1RspED5o8YsWQXVCued0rvSQ+mT+I5cxcmMvtA=
|
||||
github.com/stretchr/testify v1.6.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
|
||||
github.com/stretchr/testify v1.7.0 h1:nwc3DEeHmmLAfoZucVR881uASk0Mfjw8xYJ99tb5CcY=
|
||||
github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
|
||||
github.com/stretchr/testify v1.7.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
|
||||
github.com/stretchr/testify v1.8.0 h1:pSgiaMZlXftHpm5L7V1+rVB+AZJydKsMxsQBIJw4PKk=
|
||||
github.com/stretchr/testify v1.8.0/go.mod h1:yNjHg4UonilssWZ8iaSj1OCr/vHnekPRkoO+kdMU+MU=
|
||||
github.com/yuin/goldmark v1.1.25/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74=
|
||||
github.com/yuin/goldmark v1.1.27/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74=
|
||||
github.com/yuin/goldmark v1.1.32/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74=
|
||||
@@ -355,8 +358,8 @@ golang.org/x/net v0.0.0-20210316092652-d523dce5a7f4/go.mod h1:RBQZq4jEuRlivfhVLd
|
||||
golang.org/x/net v0.0.0-20210405180319-a5a99cb37ef4/go.mod h1:p54w0d4576C0XHj96bSt6lcn1PtDYWL6XObtHCRCNQM=
|
||||
golang.org/x/net v0.0.0-20210503060351-7fd8e65b6420/go.mod h1:9nx3DQGgdP8bBQD5qxJ1jj9UTztislL4KSBs9R2vV5Y=
|
||||
golang.org/x/net v0.0.0-20211112202133-69e39bad7dc2/go.mod h1:9nx3DQGgdP8bBQD5qxJ1jj9UTztislL4KSBs9R2vV5Y=
|
||||
golang.org/x/net v0.0.0-20220722155237-a158d28d115b h1:PxfKdU9lEEDYjdIzOtC4qFWgkU2rGHdKlKowJSMN9h0=
|
||||
golang.org/x/net v0.0.0-20220722155237-a158d28d115b/go.mod h1:XRhObCWvk6IyKnWLug+ECip1KBveYUHfp+8e9klMJ9c=
|
||||
golang.org/x/net v0.8.0 h1:Zrh2ngAOFYneWTAIAPethzeaQLuHwhuBkuV6ZiRnUaQ=
|
||||
golang.org/x/net v0.8.0/go.mod h1:QVkue5JL9kW//ek3r6jTKnTFis1tRmNAW2P1shuFdJc=
|
||||
golang.org/x/oauth2 v0.0.0-20180821212333-d2e6202438be/go.mod h1:N/0e6XlmueqKjAGxoOufVs8QHGRruUQn6yWY3a++T0U=
|
||||
golang.org/x/oauth2 v0.0.0-20190226205417-e64efc72b421/go.mod h1:gOpvHmFTYa4IltrdGE7lF6nIHvwfUNPOp7c8zoXwtLw=
|
||||
golang.org/x/oauth2 v0.0.0-20190604053449-0f29369cfe45/go.mod h1:gOpvHmFTYa4IltrdGE7lF6nIHvwfUNPOp7c8zoXwtLw=
|
||||
@@ -431,11 +434,11 @@ golang.org/x/sys v0.0.0-20210630005230-0f9fa26af87c/go.mod h1:oPkhp1MJrh7nUepCBc
|
||||
golang.org/x/sys v0.0.0-20210806184541-e5e7981a1069/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
|
||||
golang.org/x/sys v0.0.0-20210823070655-63515b42dcdf/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
|
||||
golang.org/x/sys v0.0.0-20210908233432-aa78b53d3365/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
|
||||
golang.org/x/sys v0.0.0-20220722155257-8c9f86f7a55f h1:v4INt8xihDGvnrfjMDVXGxw9wrfxYyCjk0KbXjhR55s=
|
||||
golang.org/x/sys v0.0.0-20220722155257-8c9f86f7a55f/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
|
||||
golang.org/x/sys v0.6.0 h1:MVltZSvRTcU2ljQOhs94SXPftV6DCNnZViHeQps87pQ=
|
||||
golang.org/x/sys v0.6.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
|
||||
golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo=
|
||||
golang.org/x/term v0.0.0-20210927222741-03fcf44c2211 h1:JGgROgKl9N8DuW20oFS5gxc+lE67/N3FcwmBPMe7ArY=
|
||||
golang.org/x/term v0.0.0-20210927222741-03fcf44c2211/go.mod h1:jbD1KX2456YbFQfuXm/mYQcufACuNUgVhRMnK/tPxf8=
|
||||
golang.org/x/term v0.6.0 h1:clScbb1cHjoCkyRbWwBEUZ5H/tIFu5TAXIqaZD0Gcjw=
|
||||
golang.org/x/term v0.6.0/go.mod h1:m6U89DPEgQRMq3DNkDClhWw02AUbt2daBVO4cn4Hv9U=
|
||||
golang.org/x/text v0.0.0-20170915032832-14c0d48ead0c/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
|
||||
golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
|
||||
golang.org/x/text v0.3.1-0.20180807135948-17ff2d5776d2/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
|
||||
@@ -444,8 +447,8 @@ golang.org/x/text v0.3.3/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ=
|
||||
golang.org/x/text v0.3.4/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ=
|
||||
golang.org/x/text v0.3.5/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ=
|
||||
golang.org/x/text v0.3.6/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ=
|
||||
golang.org/x/text v0.3.7 h1:olpwvP2KacW1ZWvsR7uQhoyTYvKAupfQrRGBFM352Gk=
|
||||
golang.org/x/text v0.3.7/go.mod h1:u+2+/6zg+i71rQMx5EYifcz6MCKuco9NR6JIITiCfzQ=
|
||||
golang.org/x/text v0.8.0 h1:57P1ETyNKtuIjB4SRd15iJxuhj8Gc416Y78H3qgMh68=
|
||||
golang.org/x/text v0.8.0/go.mod h1:e1OnstbJyHTd6l/uOt8jFFHp6TRDWZR/bV3emEE/zU8=
|
||||
golang.org/x/time v0.0.0-20181108054448-85acf8d2951c/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ=
|
||||
golang.org/x/time v0.0.0-20190308202827-9d24e82272b4/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ=
|
||||
golang.org/x/time v0.0.0-20191024005414-555d28b269f0/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ=
|
||||
@@ -665,10 +668,10 @@ honnef.co/go/tools v0.0.0-20190523083050-ea95bdfd59fc/go.mod h1:rf3lG4BRIbNafJWh
|
||||
honnef.co/go/tools v0.0.1-2019.2.3/go.mod h1:a3bituU0lyd329TUQxRnasdCoJDkEUEAqEt0JzvZhAg=
|
||||
honnef.co/go/tools v0.0.1-2020.1.3/go.mod h1:X/FiERA/W4tHapMX5mGpAtMSVEeEUOyHaw9vFzvIQ3k=
|
||||
honnef.co/go/tools v0.0.1-2020.1.4/go.mod h1:X/FiERA/W4tHapMX5mGpAtMSVEeEUOyHaw9vFzvIQ3k=
|
||||
k8s.io/api v0.0.0-20220909104543-fce3016c0f35 h1:u8CuY5/HNkUPwechzT9y48BarB4yAtn/meKrJn5j+38=
|
||||
k8s.io/api v0.0.0-20220909104543-fce3016c0f35/go.mod h1:FeqpcvGi6C+D7XYd+eThTUHnFpNx4enpIdClTz7Ww5Q=
|
||||
k8s.io/apimachinery v0.0.0-20220909103826-10b456c523c7 h1:kRQIdYUKC7FP4M4xmU03qFA+BVcX8BVFZDiyIZ2wCuI=
|
||||
k8s.io/apimachinery v0.0.0-20220909103826-10b456c523c7/go.mod h1:hqqA1X0bsgsxI6dXsJ4HnNTBOmJNxyPp8dw3u2fSHwA=
|
||||
k8s.io/api v0.25.13 h1:nOQWK5/ngLIG2CqmVV7uTFDsPCGkDk4kIGJ26t2AwIo=
|
||||
k8s.io/api v0.25.13/go.mod h1:yGpHyrivZ0enqWqT5s1pN98a4Q834rZkIUEABpleEtw=
|
||||
k8s.io/apimachinery v0.25.13 h1:byRHkSinOOVdo0pvjdblauFYfwAnx+JB8Pqi9w9weik=
|
||||
k8s.io/apimachinery v0.25.13/go.mod h1:IFwbcNi3gKkfDhuy0VYu3+BwbxbiIov3p6FR8ge1Epc=
|
||||
k8s.io/klog/v2 v2.0.0/go.mod h1:PBfzABfn139FHAV07az/IF9Wp1bkk3vpT2XSJ76fSDE=
|
||||
k8s.io/klog/v2 v2.70.1 h1:7aaoSdahviPmR+XkS7FyxlkkXs6tHISSG03RxleQAVQ=
|
||||
k8s.io/klog/v2 v2.70.1/go.mod h1:y1WjHnz7Dj687irZUWR/WLkLc5N1YHtjLdmgWjndZn0=
|
||||
|
||||
@@ -34,6 +34,7 @@ import (
|
||||
"time"
|
||||
|
||||
"golang.org/x/net/http2"
|
||||
|
||||
"k8s.io/apimachinery/pkg/api/errors"
|
||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||
"k8s.io/apimachinery/pkg/runtime"
|
||||
@@ -116,8 +117,11 @@ type Request struct {
|
||||
subresource string
|
||||
|
||||
// output
|
||||
err error
|
||||
body io.Reader
|
||||
err error
|
||||
|
||||
// only one of body / bodyBytes may be set. requests using body are not retriable.
|
||||
body io.Reader
|
||||
bodyBytes []byte
|
||||
|
||||
retryFn requestRetryFunc
|
||||
}
|
||||
@@ -443,12 +447,15 @@ func (r *Request) Body(obj interface{}) *Request {
|
||||
return r
|
||||
}
|
||||
glogBody("Request Body", data)
|
||||
r.body = bytes.NewReader(data)
|
||||
r.body = nil
|
||||
r.bodyBytes = data
|
||||
case []byte:
|
||||
glogBody("Request Body", t)
|
||||
r.body = bytes.NewReader(t)
|
||||
r.body = nil
|
||||
r.bodyBytes = t
|
||||
case io.Reader:
|
||||
r.body = t
|
||||
r.bodyBytes = nil
|
||||
case runtime.Object:
|
||||
// callers may pass typed interface pointers, therefore we must check nil with reflection
|
||||
if reflect.ValueOf(t).IsNil() {
|
||||
@@ -465,7 +472,8 @@ func (r *Request) Body(obj interface{}) *Request {
|
||||
return r
|
||||
}
|
||||
glogBody("Request Body", data)
|
||||
r.body = bytes.NewReader(data)
|
||||
r.body = nil
|
||||
r.bodyBytes = data
|
||||
r.SetHeader("Content-Type", r.c.content.ContentType)
|
||||
default:
|
||||
r.err = fmt.Errorf("unknown type used for body: %+v", obj)
|
||||
@@ -825,9 +833,7 @@ func (r *Request) Stream(ctx context.Context) (io.ReadCloser, error) {
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if r.body != nil {
|
||||
req.Body = ioutil.NopCloser(r.body)
|
||||
}
|
||||
|
||||
resp, err := client.Do(req)
|
||||
updateURLMetrics(ctx, r, resp, err)
|
||||
retry.After(ctx, r, resp, err)
|
||||
@@ -889,8 +895,20 @@ func (r *Request) requestPreflightCheck() error {
|
||||
}
|
||||
|
||||
func (r *Request) newHTTPRequest(ctx context.Context) (*http.Request, error) {
|
||||
var body io.Reader
|
||||
switch {
|
||||
case r.body != nil && r.bodyBytes != nil:
|
||||
return nil, fmt.Errorf("cannot set both body and bodyBytes")
|
||||
case r.body != nil:
|
||||
body = r.body
|
||||
case r.bodyBytes != nil:
|
||||
// Create a new reader specifically for this request.
|
||||
// Giving each request a dedicated reader allows retries to avoid races resetting the request body.
|
||||
body = bytes.NewReader(r.bodyBytes)
|
||||
}
|
||||
|
||||
url := r.URL().String()
|
||||
req, err := http.NewRequest(r.verb, url, r.body)
|
||||
req, err := http.NewRequest(r.verb, url, body)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
@@ -1123,42 +1123,6 @@ func TestRequestWatch(t *testing.T) {
|
||||
},
|
||||
Empty: true,
|
||||
},
|
||||
{
|
||||
name: "max retries 1, server returns a retry-after response, request body seek error",
|
||||
Request: &Request{
|
||||
body: &readSeeker{err: io.EOF},
|
||||
c: &RESTClient{
|
||||
base: &url.URL{},
|
||||
},
|
||||
},
|
||||
maxRetries: 1,
|
||||
attemptsExpected: 1,
|
||||
serverReturns: []responseErr{
|
||||
{response: retryAfterResponse(), err: nil},
|
||||
},
|
||||
Err: true,
|
||||
ErrFn: func(err error) bool {
|
||||
return !apierrors.IsInternalError(err) && strings.Contains(err.Error(), "failed to reset the request body while retrying a request: EOF")
|
||||
},
|
||||
},
|
||||
{
|
||||
name: "max retries 1, server returns a retryable error, request body seek error",
|
||||
Request: &Request{
|
||||
body: &readSeeker{err: io.EOF},
|
||||
c: &RESTClient{
|
||||
base: &url.URL{},
|
||||
},
|
||||
},
|
||||
maxRetries: 1,
|
||||
attemptsExpected: 1,
|
||||
serverReturns: []responseErr{
|
||||
{response: nil, err: io.EOF},
|
||||
},
|
||||
Err: true,
|
||||
ErrFn: func(err error) bool {
|
||||
return !apierrors.IsInternalError(err)
|
||||
},
|
||||
},
|
||||
{
|
||||
name: "max retries 2, server always returns a response with Retry-After header",
|
||||
Request: &Request{
|
||||
@@ -1320,7 +1284,7 @@ func TestRequestStream(t *testing.T) {
|
||||
},
|
||||
},
|
||||
{
|
||||
name: "max retries 1, server returns a retry-after response, request body seek error",
|
||||
name: "max retries 1, server returns a retry-after response, non-bytes request, no retry",
|
||||
Request: &Request{
|
||||
body: &readSeeker{err: io.EOF},
|
||||
c: &RESTClient{
|
||||
@@ -1333,9 +1297,6 @@ func TestRequestStream(t *testing.T) {
|
||||
{response: retryAfterResponse(), err: nil},
|
||||
},
|
||||
Err: true,
|
||||
ErrFn: func(err error) bool {
|
||||
return !apierrors.IsInternalError(err) && strings.Contains(err.Error(), "failed to reset the request body while retrying a request: EOF")
|
||||
},
|
||||
},
|
||||
{
|
||||
name: "max retries 2, server always returns a response with Retry-After header",
|
||||
@@ -2017,20 +1978,24 @@ func TestBody(t *testing.T) {
|
||||
}
|
||||
}
|
||||
|
||||
if r.body == nil {
|
||||
req, err := r.newHTTPRequest(context.Background())
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
if req.Body == nil {
|
||||
if len(tt.expected) != 0 {
|
||||
t.Errorf("%d: r.body = %q; want %q", i, r.body, tt.expected)
|
||||
t.Errorf("%d: req.Body = %q; want %q", i, req.Body, tt.expected)
|
||||
}
|
||||
continue
|
||||
}
|
||||
buf := make([]byte, len(tt.expected))
|
||||
if _, err := r.body.Read(buf); err != nil {
|
||||
t.Errorf("%d: r.body.Read error: %v", i, err)
|
||||
if _, err := req.Body.Read(buf); err != nil {
|
||||
t.Errorf("%d: req.Body.Read error: %v", i, err)
|
||||
continue
|
||||
}
|
||||
body := string(buf)
|
||||
if body != tt.expected {
|
||||
t.Errorf("%d: r.body = %q; want %q", i, body, tt.expected)
|
||||
t.Errorf("%d: req.Body = %q; want %q", i, body, tt.expected)
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -2641,6 +2606,7 @@ func TestRequestWithRetry(t *testing.T) {
|
||||
tests := []struct {
|
||||
name string
|
||||
body io.Reader
|
||||
bodyBytes []byte
|
||||
serverReturns responseErr
|
||||
errExpected error
|
||||
errContains string
|
||||
@@ -2648,53 +2614,53 @@ func TestRequestWithRetry(t *testing.T) {
|
||||
roundTripInvokedExpected int
|
||||
}{
|
||||
{
|
||||
name: "server returns retry-after response, request body is not io.Seeker, retry goes ahead",
|
||||
body: ioutil.NopCloser(bytes.NewReader([]byte{})),
|
||||
name: "server returns retry-after response, no request body, retry goes ahead",
|
||||
bodyBytes: nil,
|
||||
serverReturns: responseErr{response: retryAfterResponse(), err: nil},
|
||||
errExpected: nil,
|
||||
transformFuncInvokedExpected: 1,
|
||||
roundTripInvokedExpected: 2,
|
||||
},
|
||||
{
|
||||
name: "server returns retry-after response, request body Seek returns error, retry aborted",
|
||||
body: &readSeeker{err: io.EOF},
|
||||
serverReturns: responseErr{response: retryAfterResponse(), err: nil},
|
||||
errExpected: nil,
|
||||
transformFuncInvokedExpected: 0,
|
||||
roundTripInvokedExpected: 1,
|
||||
},
|
||||
{
|
||||
name: "server returns retry-after response, request body Seek returns no error, retry goes ahead",
|
||||
body: &readSeeker{err: nil},
|
||||
name: "server returns retry-after response, bytes request body, retry goes ahead",
|
||||
bodyBytes: []byte{},
|
||||
serverReturns: responseErr{response: retryAfterResponse(), err: nil},
|
||||
errExpected: nil,
|
||||
transformFuncInvokedExpected: 1,
|
||||
roundTripInvokedExpected: 2,
|
||||
},
|
||||
{
|
||||
name: "server returns retryable err, request body is not io.Seek, retry goes ahead",
|
||||
body: ioutil.NopCloser(bytes.NewReader([]byte{})),
|
||||
serverReturns: responseErr{response: nil, err: io.ErrUnexpectedEOF},
|
||||
errExpected: io.ErrUnexpectedEOF,
|
||||
transformFuncInvokedExpected: 0,
|
||||
roundTripInvokedExpected: 2,
|
||||
},
|
||||
{
|
||||
name: "server returns retryable err, request body Seek returns error, retry aborted",
|
||||
body: &readSeeker{err: io.EOF},
|
||||
serverReturns: responseErr{response: nil, err: io.ErrUnexpectedEOF},
|
||||
errContains: "failed to reset the request body while retrying a request: EOF",
|
||||
transformFuncInvokedExpected: 0,
|
||||
name: "server returns retry-after response, opaque request body, retry aborted",
|
||||
body: &readSeeker{},
|
||||
serverReturns: responseErr{response: retryAfterResponse(), err: nil},
|
||||
errExpected: nil,
|
||||
transformFuncInvokedExpected: 1,
|
||||
roundTripInvokedExpected: 1,
|
||||
},
|
||||
{
|
||||
name: "server returns retryable err, request body Seek returns no err, retry goes ahead",
|
||||
body: &readSeeker{err: nil},
|
||||
name: "server returns retryable err, no request body, retry goes ahead",
|
||||
bodyBytes: nil,
|
||||
serverReturns: responseErr{response: nil, err: io.ErrUnexpectedEOF},
|
||||
errExpected: io.ErrUnexpectedEOF,
|
||||
transformFuncInvokedExpected: 0,
|
||||
roundTripInvokedExpected: 2,
|
||||
},
|
||||
{
|
||||
name: "server returns retryable err, bytes request body, retry goes ahead",
|
||||
bodyBytes: []byte{},
|
||||
serverReturns: responseErr{response: nil, err: io.ErrUnexpectedEOF},
|
||||
errExpected: io.ErrUnexpectedEOF,
|
||||
transformFuncInvokedExpected: 0,
|
||||
roundTripInvokedExpected: 2,
|
||||
},
|
||||
{
|
||||
name: "server returns retryable err, opaque request body, retry aborted",
|
||||
body: &readSeeker{},
|
||||
serverReturns: responseErr{response: nil, err: io.ErrUnexpectedEOF},
|
||||
errExpected: io.ErrUnexpectedEOF,
|
||||
transformFuncInvokedExpected: 0,
|
||||
roundTripInvokedExpected: 1,
|
||||
},
|
||||
}
|
||||
|
||||
for _, test := range tests {
|
||||
@@ -2865,7 +2831,8 @@ func testRequestWithRetry(t *testing.T, key string, doFunc func(ctx context.Cont
|
||||
tests := []struct {
|
||||
name string
|
||||
verb string
|
||||
body func() io.Reader
|
||||
body io.Reader
|
||||
bodyBytes []byte
|
||||
maxRetries int
|
||||
serverReturns []responseErr
|
||||
|
||||
@@ -2875,7 +2842,7 @@ func testRequestWithRetry(t *testing.T, key string, doFunc func(ctx context.Cont
|
||||
{
|
||||
name: "server always returns retry-after response",
|
||||
verb: "GET",
|
||||
body: func() io.Reader { return bytes.NewReader([]byte{}) },
|
||||
bodyBytes: []byte{},
|
||||
maxRetries: 2,
|
||||
serverReturns: []responseErr{
|
||||
{response: retryAfterResponse(), err: nil},
|
||||
@@ -2903,7 +2870,7 @@ func testRequestWithRetry(t *testing.T, key string, doFunc func(ctx context.Cont
|
||||
{
|
||||
name: "server always returns retryable error",
|
||||
verb: "GET",
|
||||
body: func() io.Reader { return bytes.NewReader([]byte{}) },
|
||||
bodyBytes: []byte{},
|
||||
maxRetries: 2,
|
||||
serverReturns: []responseErr{
|
||||
{response: nil, err: io.EOF},
|
||||
@@ -2932,7 +2899,7 @@ func testRequestWithRetry(t *testing.T, key string, doFunc func(ctx context.Cont
|
||||
{
|
||||
name: "server returns success on the final retry",
|
||||
verb: "GET",
|
||||
body: func() io.Reader { return bytes.NewReader([]byte{}) },
|
||||
bodyBytes: []byte{},
|
||||
maxRetries: 2,
|
||||
serverReturns: []responseErr{
|
||||
{response: retryAfterResponse(), err: nil},
|
||||
@@ -2979,13 +2946,10 @@ func testRequestWithRetry(t *testing.T, key string, doFunc func(ctx context.Cont
|
||||
return resp, test.serverReturns[attempts].err
|
||||
})
|
||||
|
||||
reqCountGot := newCount()
|
||||
reqRecorder := newReadTracker(reqCountGot)
|
||||
reqRecorder.delegated = test.body()
|
||||
|
||||
req := &Request{
|
||||
verb: test.verb,
|
||||
body: reqRecorder,
|
||||
verb: test.verb,
|
||||
body: test.body,
|
||||
bodyBytes: test.bodyBytes,
|
||||
c: &RESTClient{
|
||||
content: defaultContentConfig(),
|
||||
Client: client,
|
||||
@@ -3005,9 +2969,6 @@ func testRequestWithRetry(t *testing.T, key string, doFunc func(ctx context.Cont
|
||||
t.Errorf("Expected retries: %d, but got: %d", expected.attempts, attempts)
|
||||
}
|
||||
|
||||
if !reflect.DeepEqual(expected.reqCount.seeks, reqCountGot.seeks) {
|
||||
t.Errorf("Expected request body to have seek invocation: %v, but got: %v", expected.reqCount.seeks, reqCountGot.seeks)
|
||||
}
|
||||
if expected.respCount.closes != respCountGot.getCloseCount() {
|
||||
t.Errorf("Expected response body Close to be invoked %d times, but got: %d", expected.respCount.closes, respCountGot.getCloseCount())
|
||||
}
|
||||
@@ -3264,8 +3225,8 @@ func testRetryWithRateLimiterBackoffAndMetrics(t *testing.T, key string, doFunc
|
||||
t.Fatalf("Wrong test setup - did not find expected for: %s", key)
|
||||
}
|
||||
req := &Request{
|
||||
verb: "GET",
|
||||
body: bytes.NewReader([]byte{}),
|
||||
verb: "GET",
|
||||
bodyBytes: []byte{},
|
||||
c: &RESTClient{
|
||||
base: base,
|
||||
content: defaultContentConfig(),
|
||||
@@ -3400,8 +3361,8 @@ func testWithRetryInvokeOrder(t *testing.T, key string, doFunc func(ctx context.
|
||||
t.Fatalf("Wrong test setup - did not find expected for: %s", key)
|
||||
}
|
||||
req := &Request{
|
||||
verb: "GET",
|
||||
body: bytes.NewReader([]byte{}),
|
||||
verb: "GET",
|
||||
bodyBytes: []byte{},
|
||||
c: &RESTClient{
|
||||
base: base,
|
||||
content: defaultContentConfig(),
|
||||
@@ -3575,8 +3536,8 @@ func testWithWrapPreviousError(t *testing.T, doFunc func(ctx context.Context, r
|
||||
t.Fatalf("Failed to create new HTTP request - %v", err)
|
||||
}
|
||||
req := &Request{
|
||||
verb: "GET",
|
||||
body: bytes.NewReader([]byte{}),
|
||||
verb: "GET",
|
||||
bodyBytes: []byte{},
|
||||
c: &RESTClient{
|
||||
base: base,
|
||||
content: defaultContentConfig(),
|
||||
@@ -3811,104 +3772,3 @@ func TestTransportConcurrency(t *testing.T) {
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
// TODO: see if we can consolidate the other trackers into one.
|
||||
type requestBodyTracker struct {
|
||||
io.ReadSeeker
|
||||
f func(string)
|
||||
}
|
||||
|
||||
func (t *requestBodyTracker) Read(p []byte) (int, error) {
|
||||
t.f("Request.Body.Read")
|
||||
return t.ReadSeeker.Read(p)
|
||||
}
|
||||
|
||||
func (t *requestBodyTracker) Seek(offset int64, whence int) (int64, error) {
|
||||
t.f("Request.Body.Seek")
|
||||
return t.ReadSeeker.Seek(offset, whence)
|
||||
}
|
||||
|
||||
type responseBodyTracker struct {
|
||||
io.ReadCloser
|
||||
f func(string)
|
||||
}
|
||||
|
||||
func (t *responseBodyTracker) Read(p []byte) (int, error) {
|
||||
t.f("Response.Body.Read")
|
||||
return t.ReadCloser.Read(p)
|
||||
}
|
||||
|
||||
func (t *responseBodyTracker) Close() error {
|
||||
t.f("Response.Body.Close")
|
||||
return t.ReadCloser.Close()
|
||||
}
|
||||
|
||||
type recorder struct {
|
||||
order []string
|
||||
}
|
||||
|
||||
func (r *recorder) record(call string) {
|
||||
r.order = append(r.order, call)
|
||||
}
|
||||
|
||||
func TestRequestBodyResetOrder(t *testing.T) {
|
||||
recorder := &recorder{}
|
||||
respBodyTracker := &responseBodyTracker{
|
||||
ReadCloser: nil, // the server will fill it
|
||||
f: recorder.record,
|
||||
}
|
||||
|
||||
var attempts int
|
||||
client := clientForFunc(func(req *http.Request) (*http.Response, error) {
|
||||
defer func() {
|
||||
attempts++
|
||||
}()
|
||||
|
||||
// read the request body.
|
||||
ioutil.ReadAll(req.Body)
|
||||
|
||||
// first attempt, we send a retry-after
|
||||
if attempts == 0 {
|
||||
resp := retryAfterResponse()
|
||||
respBodyTracker.ReadCloser = ioutil.NopCloser(bytes.NewReader([]byte{}))
|
||||
resp.Body = respBodyTracker
|
||||
return resp, nil
|
||||
}
|
||||
|
||||
return &http.Response{StatusCode: http.StatusOK}, nil
|
||||
})
|
||||
|
||||
reqBodyTracker := &requestBodyTracker{
|
||||
ReadSeeker: bytes.NewReader([]byte{}), // empty body ensures one Read operation at most.
|
||||
f: recorder.record,
|
||||
}
|
||||
req := &Request{
|
||||
verb: "POST",
|
||||
body: reqBodyTracker,
|
||||
c: &RESTClient{
|
||||
content: defaultContentConfig(),
|
||||
Client: client,
|
||||
},
|
||||
backoff: &noSleepBackOff{},
|
||||
maxRetries: 1,
|
||||
retryFn: defaultRequestRetryFn,
|
||||
}
|
||||
|
||||
req.Do(context.Background())
|
||||
|
||||
expected := []string{
|
||||
// 1st attempt: the server handler reads the request body
|
||||
"Request.Body.Read",
|
||||
// the server sends a retry-after, client reads the
|
||||
// response body, and closes it
|
||||
"Response.Body.Read",
|
||||
"Response.Body.Close",
|
||||
// client retry logic seeks to the beginning of the request body
|
||||
"Request.Body.Seek",
|
||||
// 2nd attempt: the server reads the request body
|
||||
"Request.Body.Read",
|
||||
}
|
||||
if !reflect.DeepEqual(expected, recorder.order) {
|
||||
t.Errorf("Expected invocation request and response body operations for retry do not match: %s", cmp.Diff(expected, recorder.order))
|
||||
}
|
||||
}
|
||||
|
||||
@@ -154,6 +154,11 @@ func (r *withRetry) IsNextRetry(ctx context.Context, restReq *Request, httpReq *
|
||||
return false
|
||||
}
|
||||
|
||||
if restReq.body != nil {
|
||||
// we have an opaque reader, we can't safely reset it
|
||||
return false
|
||||
}
|
||||
|
||||
r.attempts++
|
||||
r.retryAfter = &RetryAfter{Attempt: r.attempts}
|
||||
if r.attempts > r.maxRetries {
|
||||
@@ -210,18 +215,6 @@ func (r *withRetry) Before(ctx context.Context, request *Request) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
// At this point we've made atleast one attempt, post which the response
|
||||
// body should have been fully read and closed in order for it to be safe
|
||||
// to reset the request body before we reconnect, in order for us to reuse
|
||||
// the same TCP connection.
|
||||
if seeker, ok := request.body.(io.Seeker); ok && request.body != nil {
|
||||
if _, err := seeker.Seek(0, io.SeekStart); err != nil {
|
||||
err = fmt.Errorf("failed to reset the request body while retrying a request: %v", err)
|
||||
r.trackPreviousError(err)
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
// if we are here, we have made attempt(s) at least once before.
|
||||
if request.backoff != nil {
|
||||
delay := request.backoff.CalculateBackoff(url)
|
||||
|
||||
@@ -17,7 +17,6 @@ limitations under the License.
|
||||
package rest
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"context"
|
||||
"errors"
|
||||
"fmt"
|
||||
@@ -212,7 +211,7 @@ func TestIsNextRetry(t *testing.T) {
|
||||
for _, test := range tests {
|
||||
t.Run(test.name, func(t *testing.T) {
|
||||
restReq := &Request{
|
||||
body: bytes.NewReader([]byte{}),
|
||||
bodyBytes: []byte{},
|
||||
c: &RESTClient{
|
||||
base: &url.URL{},
|
||||
},
|
||||
|
||||
22
tools/cache/controller.go
vendored
22
tools/cache/controller.go
vendored
@@ -353,17 +353,6 @@ func NewIndexerInformer(
|
||||
return clientState, newInformer(lw, objType, resyncPeriod, h, clientState, nil)
|
||||
}
|
||||
|
||||
// TransformFunc allows for transforming an object before it will be processed
|
||||
// and put into the controller cache and before the corresponding handlers will
|
||||
// be called on it.
|
||||
// TransformFunc (similarly to ResourceEventHandler functions) should be able
|
||||
// to correctly handle the tombstone of type cache.DeletedFinalStateUnknown
|
||||
//
|
||||
// The most common usage pattern is to clean-up some parts of the object to
|
||||
// reduce component memory usage if a given component doesn't care about them.
|
||||
// given controller doesn't care for them
|
||||
type TransformFunc func(interface{}) (interface{}, error)
|
||||
|
||||
// NewTransformingInformer returns a Store and a controller for populating
|
||||
// the store while also providing event notifications. You should only used
|
||||
// the returned Store for Get/List operations; Add/Modify/Deletes will cause
|
||||
@@ -411,19 +400,11 @@ func processDeltas(
|
||||
// Object which receives event notifications from the given deltas
|
||||
handler ResourceEventHandler,
|
||||
clientState Store,
|
||||
transformer TransformFunc,
|
||||
deltas Deltas,
|
||||
) error {
|
||||
// from oldest to newest
|
||||
for _, d := range deltas {
|
||||
obj := d.Object
|
||||
if transformer != nil {
|
||||
var err error
|
||||
obj, err = transformer(obj)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
switch d.Type {
|
||||
case Sync, Replaced, Added, Updated:
|
||||
@@ -475,6 +456,7 @@ func newInformer(
|
||||
fifo := NewDeltaFIFOWithOptions(DeltaFIFOOptions{
|
||||
KnownObjects: clientState,
|
||||
EmitDeltaTypeReplaced: true,
|
||||
Transformer: transformer,
|
||||
})
|
||||
|
||||
cfg := &Config{
|
||||
@@ -486,7 +468,7 @@ func newInformer(
|
||||
|
||||
Process: func(obj interface{}) error {
|
||||
if deltas, ok := obj.(Deltas); ok {
|
||||
return processDeltas(h, clientState, transformer, deltas)
|
||||
return processDeltas(h, clientState, deltas)
|
||||
}
|
||||
return errors.New("object given as Process argument is not Deltas")
|
||||
},
|
||||
|
||||
4
tools/cache/controller_test.go
vendored
4
tools/cache/controller_test.go
vendored
@@ -23,7 +23,7 @@ import (
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"k8s.io/api/core/v1"
|
||||
v1 "k8s.io/api/core/v1"
|
||||
apiequality "k8s.io/apimachinery/pkg/api/equality"
|
||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||
"k8s.io/apimachinery/pkg/runtime"
|
||||
@@ -32,7 +32,7 @@ import (
|
||||
"k8s.io/apimachinery/pkg/watch"
|
||||
fcache "k8s.io/client-go/tools/cache/testing"
|
||||
|
||||
"github.com/google/gofuzz"
|
||||
fuzz "github.com/google/gofuzz"
|
||||
)
|
||||
|
||||
func Example() {
|
||||
|
||||
135
tools/cache/delta_fifo.go
vendored
135
tools/cache/delta_fifo.go
vendored
@@ -51,6 +51,10 @@ type DeltaFIFOOptions struct {
|
||||
// When true, `Replaced` events will be sent for items passed to a Replace() call.
|
||||
// When false, `Sync` events will be sent instead.
|
||||
EmitDeltaTypeReplaced bool
|
||||
|
||||
// If set, will be called for objects before enqueueing them. Please
|
||||
// see the comment on TransformFunc for details.
|
||||
Transformer TransformFunc
|
||||
}
|
||||
|
||||
// DeltaFIFO is like FIFO, but differs in two ways. One is that the
|
||||
@@ -129,8 +133,32 @@ type DeltaFIFO struct {
|
||||
// emitDeltaTypeReplaced is whether to emit the Replaced or Sync
|
||||
// DeltaType when Replace() is called (to preserve backwards compat).
|
||||
emitDeltaTypeReplaced bool
|
||||
|
||||
// Called with every object if non-nil.
|
||||
transformer TransformFunc
|
||||
}
|
||||
|
||||
// TransformFunc allows for transforming an object before it will be processed.
|
||||
// TransformFunc (similarly to ResourceEventHandler functions) should be able
|
||||
// to correctly handle the tombstone of type cache.DeletedFinalStateUnknown.
|
||||
//
|
||||
// New in v1.27: In such cases, the contained object will already have gone
|
||||
// through the transform object separately (when it was added / updated prior
|
||||
// to the delete), so the TransformFunc can likely safely ignore such objects
|
||||
// (i.e., just return the input object).
|
||||
//
|
||||
// The most common usage pattern is to clean-up some parts of the object to
|
||||
// reduce component memory usage if a given component doesn't care about them.
|
||||
//
|
||||
// New in v1.27: unless the object is a DeletedFinalStateUnknown, TransformFunc
|
||||
// sees the object before any other actor, and it is now safe to mutate the
|
||||
// object in place instead of making a copy.
|
||||
//
|
||||
// Note that TransformFunc is called while inserting objects into the
|
||||
// notification queue and is therefore extremely performance sensitive; please
|
||||
// do not do anything that will take a long time.
|
||||
type TransformFunc func(interface{}) (interface{}, error)
|
||||
|
||||
// DeltaType is the type of a change (addition, deletion, etc)
|
||||
type DeltaType string
|
||||
|
||||
@@ -227,6 +255,7 @@ func NewDeltaFIFOWithOptions(opts DeltaFIFOOptions) *DeltaFIFO {
|
||||
knownObjects: opts.KnownObjects,
|
||||
|
||||
emitDeltaTypeReplaced: opts.EmitDeltaTypeReplaced,
|
||||
transformer: opts.Transformer,
|
||||
}
|
||||
f.cond.L = &f.lock
|
||||
return f
|
||||
@@ -411,6 +440,21 @@ func (f *DeltaFIFO) queueActionLocked(actionType DeltaType, obj interface{}) err
|
||||
if err != nil {
|
||||
return KeyError{obj, err}
|
||||
}
|
||||
|
||||
// Every object comes through this code path once, so this is a good
|
||||
// place to call the transform func. If obj is a
|
||||
// DeletedFinalStateUnknown tombstone, then the containted inner object
|
||||
// will already have gone through the transformer, but we document that
|
||||
// this can happen. In cases involving Replace(), such an object can
|
||||
// come through multiple times.
|
||||
if f.transformer != nil {
|
||||
var err error
|
||||
obj, err = f.transformer(obj)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
oldDeltas := f.items[id]
|
||||
newDeltas := append(oldDeltas, Delta{actionType, obj})
|
||||
newDeltas = dedupDeltas(newDeltas)
|
||||
@@ -566,12 +610,11 @@ func (f *DeltaFIFO) Pop(process PopProcessFunc) (interface{}, error) {
|
||||
// using the Sync or Replace DeltaType and then (2) it does some deletions.
|
||||
// In particular: for every pre-existing key K that is not the key of
|
||||
// an object in `list` there is the effect of
|
||||
// `Delete(DeletedFinalStateUnknown{K, O})` where O is current object
|
||||
// of K. If `f.knownObjects == nil` then the pre-existing keys are
|
||||
// those in `f.items` and the current object of K is the `.Newest()`
|
||||
// of the Deltas associated with K. Otherwise the pre-existing keys
|
||||
// are those listed by `f.knownObjects` and the current object of K is
|
||||
// what `f.knownObjects.GetByKey(K)` returns.
|
||||
// `Delete(DeletedFinalStateUnknown{K, O})` where O is the latest known
|
||||
// object of K. The pre-existing keys are those in the union set of the keys in
|
||||
// `f.items` and `f.knownObjects` (if not nil). The last known object for key K is
|
||||
// the one present in the last delta in `f.items`. If there is no delta for K
|
||||
// in `f.items`, it is the object in `f.knownObjects`
|
||||
func (f *DeltaFIFO) Replace(list []interface{}, _ string) error {
|
||||
f.lock.Lock()
|
||||
defer f.lock.Unlock()
|
||||
@@ -595,56 +638,54 @@ func (f *DeltaFIFO) Replace(list []interface{}, _ string) error {
|
||||
}
|
||||
}
|
||||
|
||||
if f.knownObjects == nil {
|
||||
// Do deletion detection against our own list.
|
||||
queuedDeletions := 0
|
||||
for k, oldItem := range f.items {
|
||||
// Do deletion detection against objects in the queue
|
||||
queuedDeletions := 0
|
||||
for k, oldItem := range f.items {
|
||||
if keys.Has(k) {
|
||||
continue
|
||||
}
|
||||
// Delete pre-existing items not in the new list.
|
||||
// This could happen if watch deletion event was missed while
|
||||
// disconnected from apiserver.
|
||||
var deletedObj interface{}
|
||||
if n := oldItem.Newest(); n != nil {
|
||||
deletedObj = n.Object
|
||||
|
||||
// if the previous object is a DeletedFinalStateUnknown, we have to extract the actual Object
|
||||
if d, ok := deletedObj.(DeletedFinalStateUnknown); ok {
|
||||
deletedObj = d.Obj
|
||||
}
|
||||
}
|
||||
queuedDeletions++
|
||||
if err := f.queueActionLocked(Deleted, DeletedFinalStateUnknown{k, deletedObj}); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
if f.knownObjects != nil {
|
||||
// Detect deletions for objects not present in the queue, but present in KnownObjects
|
||||
knownKeys := f.knownObjects.ListKeys()
|
||||
for _, k := range knownKeys {
|
||||
if keys.Has(k) {
|
||||
continue
|
||||
}
|
||||
// Delete pre-existing items not in the new list.
|
||||
// This could happen if watch deletion event was missed while
|
||||
// disconnected from apiserver.
|
||||
var deletedObj interface{}
|
||||
if n := oldItem.Newest(); n != nil {
|
||||
deletedObj = n.Object
|
||||
if len(f.items[k]) > 0 {
|
||||
continue
|
||||
}
|
||||
|
||||
deletedObj, exists, err := f.knownObjects.GetByKey(k)
|
||||
if err != nil {
|
||||
deletedObj = nil
|
||||
klog.Errorf("Unexpected error %v during lookup of key %v, placing DeleteFinalStateUnknown marker without object", err, k)
|
||||
} else if !exists {
|
||||
deletedObj = nil
|
||||
klog.Infof("Key %v does not exist in known objects store, placing DeleteFinalStateUnknown marker without object", k)
|
||||
}
|
||||
queuedDeletions++
|
||||
if err := f.queueActionLocked(Deleted, DeletedFinalStateUnknown{k, deletedObj}); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
if !f.populated {
|
||||
f.populated = true
|
||||
// While there shouldn't be any queued deletions in the initial
|
||||
// population of the queue, it's better to be on the safe side.
|
||||
f.initialPopulationCount = keys.Len() + queuedDeletions
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// Detect deletions not already in the queue.
|
||||
knownKeys := f.knownObjects.ListKeys()
|
||||
queuedDeletions := 0
|
||||
for _, k := range knownKeys {
|
||||
if keys.Has(k) {
|
||||
continue
|
||||
}
|
||||
|
||||
deletedObj, exists, err := f.knownObjects.GetByKey(k)
|
||||
if err != nil {
|
||||
deletedObj = nil
|
||||
klog.Errorf("Unexpected error %v during lookup of key %v, placing DeleteFinalStateUnknown marker without object", err, k)
|
||||
} else if !exists {
|
||||
deletedObj = nil
|
||||
klog.Infof("Key %v does not exist in known objects store, placing DeleteFinalStateUnknown marker without object", k)
|
||||
}
|
||||
queuedDeletions++
|
||||
if err := f.queueActionLocked(Deleted, DeletedFinalStateUnknown{k, deletedObj}); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
if !f.populated {
|
||||
|
||||
269
tools/cache/delta_fifo_test.go
vendored
269
tools/cache/delta_fifo_test.go
vendored
@@ -121,6 +121,130 @@ func TestDeltaFIFO_replaceWithDeleteDeltaIn(t *testing.T) {
|
||||
}
|
||||
}
|
||||
|
||||
func TestDeltaFIFOW_ReplaceMakesDeletionsForObjectsOnlyInQueue(t *testing.T) {
|
||||
obj := mkFifoObj("foo", 2)
|
||||
objV2 := mkFifoObj("foo", 3)
|
||||
table := []struct {
|
||||
name string
|
||||
operations func(f *DeltaFIFO)
|
||||
expectedDeltas Deltas
|
||||
}{
|
||||
{
|
||||
name: "Added object should be deleted on Replace",
|
||||
operations: func(f *DeltaFIFO) {
|
||||
f.Add(obj)
|
||||
f.Replace([]interface{}{}, "0")
|
||||
},
|
||||
expectedDeltas: Deltas{
|
||||
{Added, obj},
|
||||
{Deleted, DeletedFinalStateUnknown{Key: "foo", Obj: obj}},
|
||||
},
|
||||
},
|
||||
{
|
||||
name: "Replaced object should have only a single Delete",
|
||||
operations: func(f *DeltaFIFO) {
|
||||
f.emitDeltaTypeReplaced = true
|
||||
f.Add(obj)
|
||||
f.Replace([]interface{}{obj}, "0")
|
||||
f.Replace([]interface{}{}, "0")
|
||||
},
|
||||
expectedDeltas: Deltas{
|
||||
{Added, obj},
|
||||
{Replaced, obj},
|
||||
{Deleted, DeletedFinalStateUnknown{Key: "foo", Obj: obj}},
|
||||
},
|
||||
},
|
||||
{
|
||||
name: "Deleted object should have only a single Delete",
|
||||
operations: func(f *DeltaFIFO) {
|
||||
f.Add(obj)
|
||||
f.Delete(obj)
|
||||
f.Replace([]interface{}{}, "0")
|
||||
},
|
||||
expectedDeltas: Deltas{
|
||||
{Added, obj},
|
||||
{Deleted, obj},
|
||||
},
|
||||
},
|
||||
{
|
||||
name: "Synced objects should have a single delete",
|
||||
operations: func(f *DeltaFIFO) {
|
||||
f.Add(obj)
|
||||
f.Replace([]interface{}{obj}, "0")
|
||||
f.Replace([]interface{}{obj}, "0")
|
||||
f.Replace([]interface{}{}, "0")
|
||||
},
|
||||
expectedDeltas: Deltas{
|
||||
{Added, obj},
|
||||
{Sync, obj},
|
||||
{Sync, obj},
|
||||
{Deleted, DeletedFinalStateUnknown{Key: "foo", Obj: obj}},
|
||||
},
|
||||
},
|
||||
{
|
||||
name: "Added objects should have a single delete on multiple Replaces",
|
||||
operations: func(f *DeltaFIFO) {
|
||||
f.Add(obj)
|
||||
f.Replace([]interface{}{}, "0")
|
||||
f.Replace([]interface{}{}, "1")
|
||||
},
|
||||
expectedDeltas: Deltas{
|
||||
{Added, obj},
|
||||
{Deleted, DeletedFinalStateUnknown{Key: "foo", Obj: obj}},
|
||||
},
|
||||
},
|
||||
{
|
||||
name: "Added and deleted and added object should be deleted",
|
||||
operations: func(f *DeltaFIFO) {
|
||||
f.Add(obj)
|
||||
f.Delete(obj)
|
||||
f.Add(objV2)
|
||||
f.Replace([]interface{}{}, "0")
|
||||
},
|
||||
expectedDeltas: Deltas{
|
||||
{Added, obj},
|
||||
{Deleted, obj},
|
||||
{Added, objV2},
|
||||
{Deleted, DeletedFinalStateUnknown{Key: "foo", Obj: objV2}},
|
||||
},
|
||||
},
|
||||
}
|
||||
for _, tt := range table {
|
||||
tt := tt
|
||||
|
||||
t.Run(tt.name, func(t *testing.T) {
|
||||
// Test with a DeltaFIFO with a backing KnownObjects
|
||||
fWithKnownObjects := NewDeltaFIFOWithOptions(DeltaFIFOOptions{
|
||||
KeyFunction: testFifoObjectKeyFunc,
|
||||
KnownObjects: literalListerGetter(func() []testFifoObject {
|
||||
return []testFifoObject{}
|
||||
}),
|
||||
})
|
||||
tt.operations(fWithKnownObjects)
|
||||
actualDeltasWithKnownObjects := Pop(fWithKnownObjects)
|
||||
if !reflect.DeepEqual(tt.expectedDeltas, actualDeltasWithKnownObjects) {
|
||||
t.Errorf("expected %#v, got %#v", tt.expectedDeltas, actualDeltasWithKnownObjects)
|
||||
}
|
||||
if len(fWithKnownObjects.items) != 0 {
|
||||
t.Errorf("expected no extra deltas (empty map), got %#v", fWithKnownObjects.items)
|
||||
}
|
||||
|
||||
// Test with a DeltaFIFO without a backing KnownObjects
|
||||
fWithoutKnownObjects := NewDeltaFIFOWithOptions(DeltaFIFOOptions{
|
||||
KeyFunction: testFifoObjectKeyFunc,
|
||||
})
|
||||
tt.operations(fWithoutKnownObjects)
|
||||
actualDeltasWithoutKnownObjects := Pop(fWithoutKnownObjects)
|
||||
if !reflect.DeepEqual(tt.expectedDeltas, actualDeltasWithoutKnownObjects) {
|
||||
t.Errorf("expected %#v, got %#v", tt.expectedDeltas, actualDeltasWithoutKnownObjects)
|
||||
}
|
||||
if len(fWithoutKnownObjects.items) != 0 {
|
||||
t.Errorf("expected no extra deltas (empty map), got %#v", fWithoutKnownObjects.items)
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
func TestDeltaFIFO_requeueOnPop(t *testing.T) {
|
||||
f := NewDeltaFIFOWithOptions(DeltaFIFOOptions{KeyFunction: testFifoObjectKeyFunc})
|
||||
|
||||
@@ -203,6 +327,88 @@ func TestDeltaFIFO_addUpdate(t *testing.T) {
|
||||
}
|
||||
}
|
||||
|
||||
type rvAndXfrm struct {
|
||||
rv int
|
||||
xfrm int
|
||||
}
|
||||
|
||||
func TestDeltaFIFO_transformer(t *testing.T) {
|
||||
mk := func(name string, rv int) testFifoObject {
|
||||
return mkFifoObj(name, &rvAndXfrm{rv, 0})
|
||||
}
|
||||
xfrm := TransformFunc(func(obj interface{}) (interface{}, error) {
|
||||
switch v := obj.(type) {
|
||||
case testFifoObject:
|
||||
v.val.(*rvAndXfrm).xfrm++
|
||||
case DeletedFinalStateUnknown:
|
||||
if x := v.Obj.(testFifoObject).val.(*rvAndXfrm).xfrm; x != 1 {
|
||||
return nil, fmt.Errorf("object has been transformed wrong number of times: %#v", obj)
|
||||
}
|
||||
default:
|
||||
return nil, fmt.Errorf("unexpected object: %#v", obj)
|
||||
}
|
||||
return obj, nil
|
||||
})
|
||||
|
||||
must := func(err error) {
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
}
|
||||
|
||||
f := NewDeltaFIFOWithOptions(DeltaFIFOOptions{
|
||||
KeyFunction: testFifoObjectKeyFunc,
|
||||
Transformer: xfrm,
|
||||
})
|
||||
must(f.Add(mk("foo", 10)))
|
||||
must(f.Add(mk("bar", 11)))
|
||||
must(f.Update(mk("foo", 12)))
|
||||
must(f.Delete(mk("foo", 15)))
|
||||
must(f.Replace([]interface{}{}, ""))
|
||||
must(f.Add(mk("bar", 16)))
|
||||
must(f.Replace([]interface{}{}, ""))
|
||||
|
||||
// Should be empty
|
||||
if e, a := []string{"foo", "bar"}, f.ListKeys(); !reflect.DeepEqual(e, a) {
|
||||
t.Errorf("Expected %+v, got %+v", e, a)
|
||||
}
|
||||
|
||||
for i := 0; i < 2; i++ {
|
||||
obj, err := f.Pop(func(o interface{}) error { return nil })
|
||||
if err != nil {
|
||||
t.Fatalf("got nothing on try %v?", i)
|
||||
}
|
||||
obj = obj.(Deltas).Newest().Object
|
||||
switch v := obj.(type) {
|
||||
case testFifoObject:
|
||||
if v.name != "foo" {
|
||||
t.Errorf("expected regular deletion of foo, got %q", v.name)
|
||||
}
|
||||
rx := v.val.(*rvAndXfrm)
|
||||
if rx.rv != 15 {
|
||||
t.Errorf("expected last message, got %#v", obj)
|
||||
}
|
||||
if rx.xfrm != 1 {
|
||||
t.Errorf("obj %v transformed wrong number of times.", obj)
|
||||
}
|
||||
case DeletedFinalStateUnknown:
|
||||
tf := v.Obj.(testFifoObject)
|
||||
rx := tf.val.(*rvAndXfrm)
|
||||
if tf.name != "bar" {
|
||||
t.Errorf("expected tombstone deletion of bar, got %q", tf.name)
|
||||
}
|
||||
if rx.rv != 16 {
|
||||
t.Errorf("expected last message, got %#v", obj)
|
||||
}
|
||||
if rx.xfrm != 1 {
|
||||
t.Errorf("tombstoned obj %v transformed wrong number of times.", obj)
|
||||
}
|
||||
default:
|
||||
t.Errorf("unknown item %#v", obj)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func TestDeltaFIFO_enqueueingNoLister(t *testing.T) {
|
||||
f := NewDeltaFIFOWithOptions(DeltaFIFOOptions{KeyFunction: testFifoObjectKeyFunc})
|
||||
f.Add(mkFifoObj("foo", 10))
|
||||
@@ -371,7 +577,7 @@ func TestDeltaFIFO_ReplaceMakesDeletions(t *testing.T) {
|
||||
|
||||
expectedList = []Deltas{
|
||||
{{Added, mkFifoObj("baz", 10)},
|
||||
{Deleted, DeletedFinalStateUnknown{Key: "baz", Obj: mkFifoObj("baz", 7)}}},
|
||||
{Deleted, DeletedFinalStateUnknown{Key: "baz", Obj: mkFifoObj("baz", 10)}}},
|
||||
{{Sync, mkFifoObj("foo", 5)}},
|
||||
// Since "bar" didn't have a delete event and wasn't in the Replace list
|
||||
// it should get a tombstone key with the right Obj.
|
||||
@@ -385,6 +591,67 @@ func TestDeltaFIFO_ReplaceMakesDeletions(t *testing.T) {
|
||||
}
|
||||
}
|
||||
|
||||
// Now try deleting and recreating the object in the queue, then delete it by a Replace call
|
||||
f = NewDeltaFIFOWithOptions(DeltaFIFOOptions{
|
||||
KeyFunction: testFifoObjectKeyFunc,
|
||||
KnownObjects: literalListerGetter(func() []testFifoObject {
|
||||
return []testFifoObject{mkFifoObj("foo", 5), mkFifoObj("bar", 6), mkFifoObj("baz", 7)}
|
||||
}),
|
||||
})
|
||||
f.Delete(mkFifoObj("bar", 6))
|
||||
f.Add(mkFifoObj("bar", 100))
|
||||
f.Replace([]interface{}{mkFifoObj("foo", 5)}, "0")
|
||||
|
||||
expectedList = []Deltas{
|
||||
{
|
||||
{Deleted, mkFifoObj("bar", 6)},
|
||||
{Added, mkFifoObj("bar", 100)},
|
||||
// Since "bar" has a newer object in the queue than in the state,
|
||||
// it should get a tombstone key with the latest object from the queue
|
||||
{Deleted, DeletedFinalStateUnknown{Key: "bar", Obj: mkFifoObj("bar", 100)}},
|
||||
},
|
||||
{{Sync, mkFifoObj("foo", 5)}},
|
||||
{{Deleted, DeletedFinalStateUnknown{Key: "baz", Obj: mkFifoObj("baz", 7)}}},
|
||||
}
|
||||
|
||||
for _, expected := range expectedList {
|
||||
cur := Pop(f).(Deltas)
|
||||
if e, a := expected, cur; !reflect.DeepEqual(e, a) {
|
||||
t.Errorf("Expected %#v, got %#v", e, a)
|
||||
}
|
||||
}
|
||||
|
||||
// Now try syncing it first to ensure the delete use the latest version
|
||||
f = NewDeltaFIFOWithOptions(DeltaFIFOOptions{
|
||||
KeyFunction: testFifoObjectKeyFunc,
|
||||
KnownObjects: literalListerGetter(func() []testFifoObject {
|
||||
return []testFifoObject{mkFifoObj("foo", 5), mkFifoObj("bar", 6), mkFifoObj("baz", 7)}
|
||||
}),
|
||||
})
|
||||
f.Replace([]interface{}{mkFifoObj("bar", 100), mkFifoObj("foo", 5)}, "0")
|
||||
f.Replace([]interface{}{mkFifoObj("foo", 5)}, "0")
|
||||
|
||||
expectedList = []Deltas{
|
||||
{
|
||||
{Sync, mkFifoObj("bar", 100)},
|
||||
// Since "bar" didn't have a delete event and wasn't in the Replace list
|
||||
// it should get a tombstone key with the right Obj.
|
||||
{Deleted, DeletedFinalStateUnknown{Key: "bar", Obj: mkFifoObj("bar", 100)}},
|
||||
},
|
||||
{
|
||||
{Sync, mkFifoObj("foo", 5)},
|
||||
{Sync, mkFifoObj("foo", 5)},
|
||||
},
|
||||
{{Deleted, DeletedFinalStateUnknown{Key: "baz", Obj: mkFifoObj("baz", 7)}}},
|
||||
}
|
||||
|
||||
for _, expected := range expectedList {
|
||||
cur := Pop(f).(Deltas)
|
||||
if e, a := expected, cur; !reflect.DeepEqual(e, a) {
|
||||
t.Errorf("Expected %#v, got %#v", e, a)
|
||||
}
|
||||
}
|
||||
|
||||
// Now try starting without an explicit KeyListerGetter
|
||||
f = NewDeltaFIFOWithOptions(DeltaFIFOOptions{KeyFunction: testFifoObjectKeyFunc})
|
||||
f.Add(mkFifoObj("baz", 10))
|
||||
|
||||
8
tools/cache/shared_informer.go
vendored
8
tools/cache/shared_informer.go
vendored
@@ -190,10 +190,7 @@ type SharedInformer interface {
|
||||
//
|
||||
// Must be set before starting the informer.
|
||||
//
|
||||
// Note: Since the object given to the handler may be already shared with
|
||||
// other goroutines, it is advisable to copy the object being
|
||||
// transform before mutating it at all and returning the copy to prevent
|
||||
// data races.
|
||||
// Please see the comment on TransformFunc for more details.
|
||||
SetTransform(handler TransformFunc) error
|
||||
}
|
||||
|
||||
@@ -404,6 +401,7 @@ func (s *sharedIndexInformer) Run(stopCh <-chan struct{}) {
|
||||
fifo := NewDeltaFIFOWithOptions(DeltaFIFOOptions{
|
||||
KnownObjects: s.indexer,
|
||||
EmitDeltaTypeReplaced: true,
|
||||
Transformer: s.transform,
|
||||
})
|
||||
|
||||
cfg := &Config{
|
||||
@@ -568,7 +566,7 @@ func (s *sharedIndexInformer) HandleDeltas(obj interface{}) error {
|
||||
defer s.blockDeltas.Unlock()
|
||||
|
||||
if deltas, ok := obj.(Deltas); ok {
|
||||
return processDeltas(s, s.indexer, s.transform, deltas)
|
||||
return processDeltas(s, s.indexer, deltas)
|
||||
}
|
||||
return errors.New("object given as Process argument is not Deltas")
|
||||
}
|
||||
|
||||
5
tools/cache/shared_informer_test.go
vendored
5
tools/cache/shared_informer_test.go
vendored
@@ -371,9 +371,8 @@ func TestSharedInformerTransformer(t *testing.T) {
|
||||
name := pod.GetName()
|
||||
|
||||
if upper := strings.ToUpper(name); upper != name {
|
||||
copied := pod.DeepCopyObject().(*v1.Pod)
|
||||
copied.SetName(upper)
|
||||
return copied, nil
|
||||
pod.SetName(upper)
|
||||
return pod, nil
|
||||
}
|
||||
}
|
||||
return obj, nil
|
||||
|
||||
@@ -181,22 +181,24 @@ func (e *eventBroadcasterImpl) recordToSink(event *eventsv1.Event, clock clock.C
|
||||
return nil
|
||||
}
|
||||
isomorphicEvent.Series = &eventsv1.EventSeries{
|
||||
Count: 1,
|
||||
Count: 2,
|
||||
LastObservedTime: metav1.MicroTime{Time: clock.Now()},
|
||||
}
|
||||
return isomorphicEvent
|
||||
// Make a copy of the Event to make sure that recording it
|
||||
// doesn't mess with the object stored in cache.
|
||||
return isomorphicEvent.DeepCopy()
|
||||
}
|
||||
e.eventCache[eventKey] = eventCopy
|
||||
return eventCopy
|
||||
// Make a copy of the Event to make sure that recording it doesn't
|
||||
// mess with the object stored in cache.
|
||||
return eventCopy.DeepCopy()
|
||||
}()
|
||||
if evToRecord != nil {
|
||||
recordedEvent := e.attemptRecording(evToRecord)
|
||||
if recordedEvent != nil {
|
||||
recordedEventKey := getKey(recordedEvent)
|
||||
e.mu.Lock()
|
||||
defer e.mu.Unlock()
|
||||
e.eventCache[recordedEventKey] = recordedEvent
|
||||
}
|
||||
// TODO: Add a metric counting the number of recording attempts
|
||||
e.attemptRecording(evToRecord)
|
||||
// We don't want the new recorded Event to be reflected in the
|
||||
// client's cache because server-side mutations could mess with the
|
||||
// aggregation mechanism used by the client.
|
||||
}
|
||||
}()
|
||||
}
|
||||
@@ -248,6 +250,14 @@ func recordEvent(sink EventSink, event *eventsv1.Event) (*eventsv1.Event, bool)
|
||||
return nil, false
|
||||
case *errors.StatusError:
|
||||
if errors.IsAlreadyExists(err) {
|
||||
// If we tried to create an Event from an EventSerie, it means that
|
||||
// the original Patch request failed because the Event we were
|
||||
// trying to patch didn't exist. If the creation failed because the
|
||||
// Event now exists, it is safe to retry. This occurs when a new
|
||||
// Event is emitted twice in a very short period of time.
|
||||
if isEventSeries {
|
||||
return nil, true
|
||||
}
|
||||
klog.V(5).Infof("Server rejected event '%#v': '%v' (will not retry!)", event, err)
|
||||
} else {
|
||||
klog.Errorf("Server rejected event '%#v': '%v' (will not retry!)", event, err)
|
||||
|
||||
103
tools/events/event_broadcaster_test.go
Normal file
103
tools/events/event_broadcaster_test.go
Normal file
@@ -0,0 +1,103 @@
|
||||
/*
|
||||
Copyright 2022 The Kubernetes Authors.
|
||||
|
||||
Licensed under the Apache License, Version 2.0 (the "License");
|
||||
you may not use this file except in compliance with the License.
|
||||
You may obtain a copy of the License at
|
||||
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
|
||||
Unless required by applicable law or agreed to in writing, software
|
||||
distributed under the License is distributed on an "AS IS" BASIS,
|
||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
*/
|
||||
|
||||
package events
|
||||
|
||||
import (
|
||||
"context"
|
||||
"reflect"
|
||||
"testing"
|
||||
|
||||
eventsv1 "k8s.io/api/events/v1"
|
||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||
"k8s.io/apimachinery/pkg/util/diff"
|
||||
"k8s.io/client-go/kubernetes/fake"
|
||||
)
|
||||
|
||||
func TestRecordEventToSink(t *testing.T) {
|
||||
nonIsomorphicEvent := eventsv1.Event{
|
||||
ObjectMeta: metav1.ObjectMeta{
|
||||
Name: "test",
|
||||
Namespace: metav1.NamespaceDefault,
|
||||
},
|
||||
Series: nil,
|
||||
}
|
||||
|
||||
isomorphicEvent := *nonIsomorphicEvent.DeepCopy()
|
||||
isomorphicEvent.Series = &eventsv1.EventSeries{Count: 2}
|
||||
|
||||
testCases := []struct {
|
||||
name string
|
||||
eventsToRecord []eventsv1.Event
|
||||
expectedRecordedEvent eventsv1.Event
|
||||
}{
|
||||
{
|
||||
name: "record one Event",
|
||||
eventsToRecord: []eventsv1.Event{
|
||||
nonIsomorphicEvent,
|
||||
},
|
||||
expectedRecordedEvent: nonIsomorphicEvent,
|
||||
},
|
||||
{
|
||||
name: "record one Event followed by an isomorphic one",
|
||||
eventsToRecord: []eventsv1.Event{
|
||||
nonIsomorphicEvent,
|
||||
isomorphicEvent,
|
||||
},
|
||||
expectedRecordedEvent: isomorphicEvent,
|
||||
},
|
||||
{
|
||||
name: "record one isomorphic Event before the original",
|
||||
eventsToRecord: []eventsv1.Event{
|
||||
isomorphicEvent,
|
||||
nonIsomorphicEvent,
|
||||
},
|
||||
expectedRecordedEvent: isomorphicEvent,
|
||||
},
|
||||
{
|
||||
name: "record one isomorphic Event without one already existing",
|
||||
eventsToRecord: []eventsv1.Event{
|
||||
isomorphicEvent,
|
||||
},
|
||||
expectedRecordedEvent: isomorphicEvent,
|
||||
},
|
||||
}
|
||||
|
||||
for _, tc := range testCases {
|
||||
t.Run(tc.name, func(t *testing.T) {
|
||||
kubeClient := fake.NewSimpleClientset()
|
||||
eventSink := &EventSinkImpl{Interface: kubeClient.EventsV1()}
|
||||
|
||||
for _, ev := range tc.eventsToRecord {
|
||||
recordEvent(eventSink, &ev)
|
||||
}
|
||||
|
||||
recordedEvents, err := kubeClient.EventsV1().Events(metav1.NamespaceDefault).List(context.TODO(), metav1.ListOptions{})
|
||||
if err != nil {
|
||||
t.Errorf("expected to be able to list Events from fake client")
|
||||
}
|
||||
|
||||
if len(recordedEvents.Items) != 1 {
|
||||
t.Errorf("expected one Event to be recorded, found: %d", len(recordedEvents.Items))
|
||||
}
|
||||
|
||||
recordedEvent := recordedEvents.Items[0]
|
||||
if !reflect.DeepEqual(recordedEvent, tc.expectedRecordedEvent) {
|
||||
t.Errorf("expected to have recorded Event: %#+v, got: %#+v\n diff: %s", tc.expectedRecordedEvent, recordedEvent, diff.ObjectReflectDiff(tc.expectedRecordedEvent, recordedEvent))
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
||||
@@ -17,6 +17,7 @@ limitations under the License.
|
||||
package events
|
||||
|
||||
import (
|
||||
"context"
|
||||
"strconv"
|
||||
"testing"
|
||||
"time"
|
||||
@@ -29,6 +30,7 @@ import (
|
||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||
k8sruntime "k8s.io/apimachinery/pkg/runtime"
|
||||
"k8s.io/apimachinery/pkg/util/wait"
|
||||
fake "k8s.io/client-go/kubernetes/fake"
|
||||
"k8s.io/client-go/kubernetes/scheme"
|
||||
restclient "k8s.io/client-go/rest"
|
||||
ref "k8s.io/client-go/tools/reference"
|
||||
@@ -106,7 +108,7 @@ func TestEventSeriesf(t *testing.T) {
|
||||
nonIsomorphicEvent := expectedEvent.DeepCopy()
|
||||
nonIsomorphicEvent.Action = "stopped"
|
||||
|
||||
expectedEvent.Series = &eventsv1.EventSeries{Count: 1}
|
||||
expectedEvent.Series = &eventsv1.EventSeries{Count: 2}
|
||||
table := []struct {
|
||||
regarding k8sruntime.Object
|
||||
related k8sruntime.Object
|
||||
@@ -182,6 +184,44 @@ func TestEventSeriesf(t *testing.T) {
|
||||
close(stopCh)
|
||||
}
|
||||
|
||||
// TestEventSeriesWithEventSinkImplRace verifies that when Events are emitted to
|
||||
// an EventSink consecutively there is no data race. This test is meant to be
|
||||
// run with the `-race` option.
|
||||
func TestEventSeriesWithEventSinkImplRace(t *testing.T) {
|
||||
kubeClient := fake.NewSimpleClientset()
|
||||
|
||||
eventSink := &EventSinkImpl{Interface: kubeClient.EventsV1()}
|
||||
eventBroadcaster := NewBroadcaster(eventSink)
|
||||
|
||||
stopCh := make(chan struct{})
|
||||
eventBroadcaster.StartRecordingToSink(stopCh)
|
||||
|
||||
recorder := eventBroadcaster.NewRecorder(scheme.Scheme, "test")
|
||||
|
||||
recorder.Eventf(&v1.ObjectReference{}, nil, v1.EventTypeNormal, "reason", "action", "", "")
|
||||
recorder.Eventf(&v1.ObjectReference{}, nil, v1.EventTypeNormal, "reason", "action", "", "")
|
||||
|
||||
err := wait.PollImmediate(100*time.Millisecond, 5*time.Second, func() (done bool, err error) {
|
||||
events, err := kubeClient.EventsV1().Events(metav1.NamespaceDefault).List(context.TODO(), metav1.ListOptions{})
|
||||
if err != nil {
|
||||
return false, err
|
||||
}
|
||||
|
||||
if len(events.Items) != 1 {
|
||||
return false, nil
|
||||
}
|
||||
|
||||
if events.Items[0].Series == nil {
|
||||
return false, nil
|
||||
}
|
||||
|
||||
return true, nil
|
||||
})
|
||||
if err != nil {
|
||||
t.Fatal("expected that 2 identical Eventf calls would result in the creation of an Event with a Serie")
|
||||
}
|
||||
}
|
||||
|
||||
func validateEvent(messagePrefix string, expectedUpdate bool, actualEvent *eventsv1.Event, expectedEvent *eventsv1.Event, t *testing.T) {
|
||||
recvEvent := *actualEvent
|
||||
|
||||
|
||||
@@ -109,7 +109,7 @@ func (c *tlsTransportCache) get(config *Config) (http.RoundTripper, error) {
|
||||
|
||||
// If we use are reloading files, we need to handle certificate rotation properly
|
||||
// TODO(jackkleeman): We can also add rotation here when config.HasCertCallback() is true
|
||||
if config.TLS.ReloadTLSFiles {
|
||||
if config.TLS.ReloadTLSFiles && tlsConfig != nil && tlsConfig.GetClientCertificate != nil {
|
||||
dynamicCertDialer := certRotatingDialer(tlsConfig.GetClientCertificate, dial)
|
||||
tlsConfig.GetClientCertificate = dynamicCertDialer.GetClientCertificate
|
||||
dial = dynamicCertDialer.connDialer.DialContext
|
||||
|
||||
@@ -26,6 +26,7 @@ import (
|
||||
"encoding/pem"
|
||||
"fmt"
|
||||
"io/ioutil"
|
||||
"math"
|
||||
"math/big"
|
||||
"net"
|
||||
"path/filepath"
|
||||
@@ -44,6 +45,7 @@ type Config struct {
|
||||
Organization []string
|
||||
AltNames AltNames
|
||||
Usages []x509.ExtKeyUsage
|
||||
NotBefore time.Time
|
||||
}
|
||||
|
||||
// AltNames contains the domain names and IP addresses that will be added
|
||||
@@ -57,14 +59,24 @@ type AltNames struct {
|
||||
// NewSelfSignedCACert creates a CA certificate
|
||||
func NewSelfSignedCACert(cfg Config, key crypto.Signer) (*x509.Certificate, error) {
|
||||
now := time.Now()
|
||||
// returns a uniform random value in [0, max-1), then add 1 to serial to make it a uniform random value in [1, max).
|
||||
serial, err := cryptorand.Int(cryptorand.Reader, new(big.Int).SetInt64(math.MaxInt64-1))
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
serial = new(big.Int).Add(serial, big.NewInt(1))
|
||||
notBefore := now.UTC()
|
||||
if !cfg.NotBefore.IsZero() {
|
||||
notBefore = cfg.NotBefore.UTC()
|
||||
}
|
||||
tmpl := x509.Certificate{
|
||||
SerialNumber: new(big.Int).SetInt64(0),
|
||||
SerialNumber: serial,
|
||||
Subject: pkix.Name{
|
||||
CommonName: cfg.CommonName,
|
||||
Organization: cfg.Organization,
|
||||
},
|
||||
DNSNames: []string{cfg.CommonName},
|
||||
NotBefore: now.UTC(),
|
||||
NotBefore: notBefore,
|
||||
NotAfter: now.Add(duration365d * 10).UTC(),
|
||||
KeyUsage: x509.KeyUsageKeyEncipherment | x509.KeyUsageDigitalSignature | x509.KeyUsageCertSign,
|
||||
BasicConstraintsValid: true,
|
||||
@@ -116,9 +128,14 @@ func GenerateSelfSignedCertKeyWithFixtures(host string, alternateIPs []net.IP, a
|
||||
if err != nil {
|
||||
return nil, nil, err
|
||||
}
|
||||
|
||||
// returns a uniform random value in [0, max-1), then add 1 to serial to make it a uniform random value in [1, max).
|
||||
serial, err := cryptorand.Int(cryptorand.Reader, new(big.Int).SetInt64(math.MaxInt64-1))
|
||||
if err != nil {
|
||||
return nil, nil, err
|
||||
}
|
||||
serial = new(big.Int).Add(serial, big.NewInt(1))
|
||||
caTemplate := x509.Certificate{
|
||||
SerialNumber: big.NewInt(1),
|
||||
SerialNumber: serial,
|
||||
Subject: pkix.Name{
|
||||
CommonName: fmt.Sprintf("%s-ca@%d", host, time.Now().Unix()),
|
||||
},
|
||||
@@ -144,9 +161,14 @@ func GenerateSelfSignedCertKeyWithFixtures(host string, alternateIPs []net.IP, a
|
||||
if err != nil {
|
||||
return nil, nil, err
|
||||
}
|
||||
|
||||
// returns a uniform random value in [0, max-1), then add 1 to serial to make it a uniform random value in [1, max).
|
||||
serial, err = cryptorand.Int(cryptorand.Reader, new(big.Int).SetInt64(math.MaxInt64-1))
|
||||
if err != nil {
|
||||
return nil, nil, err
|
||||
}
|
||||
serial = new(big.Int).Add(serial, big.NewInt(1))
|
||||
template := x509.Certificate{
|
||||
SerialNumber: big.NewInt(2),
|
||||
SerialNumber: serial,
|
||||
Subject: pkix.Name{
|
||||
CommonName: fmt.Sprintf("%s@%d", host, time.Now().Unix()),
|
||||
},
|
||||
|
||||
Reference in New Issue
Block a user