mirror of
https://github.com/kubernetes/client-go.git
synced 2026-05-15 11:43:33 +00:00
Compare commits
65 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
0d8ac07342 | ||
|
|
73fd2ddc91 | ||
|
|
e213ccc306 | ||
|
|
5f2132fc43 | ||
|
|
699a46738f | ||
|
|
b2f42092e3 | ||
|
|
539d9f1f03 | ||
|
|
637fc595d1 | ||
|
|
069aa55f4a | ||
|
|
44c2c549a5 | ||
|
|
96d4b4ca2c | ||
|
|
5abdc64bf8 | ||
|
|
e4cdb82809 | ||
|
|
3c67f637e2 | ||
|
|
935aad3790 | ||
|
|
f0e0a63eea | ||
|
|
3e37d3cec6 | ||
|
|
e85d69084a | ||
|
|
e4ab670a95 | ||
|
|
f6af94938b | ||
|
|
b34ddfd55a | ||
|
|
46aef4b64f | ||
|
|
0f8ad595c9 | ||
|
|
ea09349e3b | ||
|
|
f78d31a2bc | ||
|
|
1d2c64df00 | ||
|
|
0810080103 | ||
|
|
0976f4afe2 | ||
|
|
0673906c42 | ||
|
|
a2e13ccd49 | ||
|
|
127c8ce629 | ||
|
|
26011c05ab | ||
|
|
9bcc910985 | ||
|
|
3ed0010e30 | ||
|
|
c0015c0cc1 | ||
|
|
22ef81a898 | ||
|
|
7d76947ccd | ||
|
|
7615511cc0 | ||
|
|
a99ac36e1a | ||
|
|
424bde0e01 | ||
|
|
958c03a8dd | ||
|
|
9a62005bf9 | ||
|
|
0f3c6ef2dc | ||
|
|
4deb1a7db2 | ||
|
|
d64f019211 | ||
|
|
73e7057101 | ||
|
|
2c91b666c2 | ||
|
|
94836903b2 | ||
|
|
4ea001dc73 | ||
|
|
f7ff47eb37 | ||
|
|
45ef4fc0dc | ||
|
|
c428844852 | ||
|
|
a0a9a3744f | ||
|
|
fa64ba9235 | ||
|
|
ed62839de3 | ||
|
|
85e546b378 | ||
|
|
465fab1523 | ||
|
|
c076c6e063 | ||
|
|
171f146b90 | ||
|
|
98cbc2c6ac | ||
|
|
5551f5649d | ||
|
|
501db86fbe | ||
|
|
ce329ac04e | ||
|
|
dd7f3ad83f | ||
|
|
ba984c792f |
14
Godeps/Godeps.json
generated
14
Godeps/Godeps.json
generated
@@ -54,6 +54,10 @@
|
||||
"ImportPath": "github.com/google/btree",
|
||||
"Rev": "7d79101e329e"
|
||||
},
|
||||
{
|
||||
"ImportPath": "github.com/google/go-cmp",
|
||||
"Rev": "v0.3.0"
|
||||
},
|
||||
{
|
||||
"ImportPath": "github.com/google/gofuzz",
|
||||
"Rev": "24818f796faf"
|
||||
@@ -132,7 +136,7 @@
|
||||
},
|
||||
{
|
||||
"ImportPath": "golang.org/x/net",
|
||||
"Rev": "65e2d4e15006"
|
||||
"Rev": "cdfb69ac37fc"
|
||||
},
|
||||
{
|
||||
"ImportPath": "golang.org/x/oauth2",
|
||||
@@ -180,19 +184,19 @@
|
||||
},
|
||||
{
|
||||
"ImportPath": "gopkg.in/yaml.v2",
|
||||
"Rev": "v2.2.1"
|
||||
"Rev": "v2.2.4"
|
||||
},
|
||||
{
|
||||
"ImportPath": "k8s.io/api",
|
||||
"Rev": "e63b5755afac"
|
||||
"Rev": "v0.15.8"
|
||||
},
|
||||
{
|
||||
"ImportPath": "k8s.io/apimachinery",
|
||||
"Rev": "ad85901afca0"
|
||||
"Rev": "v0.15.8"
|
||||
},
|
||||
{
|
||||
"ImportPath": "k8s.io/klog",
|
||||
"Rev": "v0.3.0"
|
||||
"Rev": "v0.3.1"
|
||||
},
|
||||
{
|
||||
"ImportPath": "k8s.io/kube-openapi",
|
||||
|
||||
@@ -172,7 +172,7 @@ func (d *CachedDiscoveryClient) getCachedFile(filename string) ([]byte, error) {
|
||||
}
|
||||
|
||||
func (d *CachedDiscoveryClient) writeCachedFile(filename string, obj runtime.Object) error {
|
||||
if err := os.MkdirAll(filepath.Dir(filename), 0755); err != nil {
|
||||
if err := os.MkdirAll(filepath.Dir(filename), 0750); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
@@ -191,7 +191,7 @@ func (d *CachedDiscoveryClient) writeCachedFile(filename string, obj runtime.Obj
|
||||
return err
|
||||
}
|
||||
|
||||
err = os.Chmod(f.Name(), 0755)
|
||||
err = os.Chmod(f.Name(), 0660)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
@@ -19,6 +19,7 @@ package disk
|
||||
import (
|
||||
"io/ioutil"
|
||||
"os"
|
||||
"path/filepath"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
@@ -96,6 +97,32 @@ func TestNewCachedDiscoveryClient_TTL(t *testing.T) {
|
||||
assert.Equal(c.groupCalls, 2)
|
||||
}
|
||||
|
||||
func TestNewCachedDiscoveryClient_PathPerm(t *testing.T) {
|
||||
assert := assert.New(t)
|
||||
|
||||
d, err := ioutil.TempDir("", "")
|
||||
assert.NoError(err)
|
||||
os.RemoveAll(d)
|
||||
defer os.RemoveAll(d)
|
||||
|
||||
c := fakeDiscoveryClient{}
|
||||
cdc := newCachedDiscoveryClient(&c, d, 1*time.Nanosecond)
|
||||
cdc.ServerGroups()
|
||||
|
||||
err = filepath.Walk(d, func(path string, info os.FileInfo, err error) error {
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if info.IsDir() {
|
||||
assert.Equal(os.FileMode(0750), info.Mode().Perm())
|
||||
} else {
|
||||
assert.Equal(os.FileMode(0660), info.Mode().Perm())
|
||||
}
|
||||
return nil
|
||||
})
|
||||
assert.NoError(err)
|
||||
}
|
||||
|
||||
type fakeDiscoveryClient struct {
|
||||
groupCalls int
|
||||
resourceCalls int
|
||||
|
||||
@@ -18,6 +18,7 @@ package disk
|
||||
|
||||
import (
|
||||
"net/http"
|
||||
"os"
|
||||
"path/filepath"
|
||||
|
||||
"github.com/gregjones/httpcache"
|
||||
@@ -35,6 +36,8 @@ type cacheRoundTripper struct {
|
||||
// corresponding requests.
|
||||
func newCacheRoundTripper(cacheDir string, rt http.RoundTripper) http.RoundTripper {
|
||||
d := diskv.New(diskv.Options{
|
||||
PathPerm: os.FileMode(0750),
|
||||
FilePerm: os.FileMode(0660),
|
||||
BasePath: cacheDir,
|
||||
TempDir: filepath.Join(cacheDir, ".diskv-temp"),
|
||||
})
|
||||
|
||||
@@ -22,7 +22,10 @@ import (
|
||||
"net/http"
|
||||
"net/url"
|
||||
"os"
|
||||
"path/filepath"
|
||||
"testing"
|
||||
|
||||
"github.com/stretchr/testify/assert"
|
||||
)
|
||||
|
||||
// copied from k8s.io/client-go/transport/round_trippers_test.go
|
||||
@@ -93,3 +96,52 @@ func TestCacheRoundTripper(t *testing.T) {
|
||||
t.Errorf("Invalid content read from cache %q", string(content))
|
||||
}
|
||||
}
|
||||
|
||||
func TestCacheRoundTripperPathPerm(t *testing.T) {
|
||||
assert := assert.New(t)
|
||||
|
||||
rt := &testRoundTripper{}
|
||||
cacheDir, err := ioutil.TempDir("", "cache-rt")
|
||||
os.RemoveAll(cacheDir)
|
||||
defer os.RemoveAll(cacheDir)
|
||||
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
cache := newCacheRoundTripper(cacheDir, rt)
|
||||
|
||||
// First call, caches the response
|
||||
req := &http.Request{
|
||||
Method: http.MethodGet,
|
||||
URL: &url.URL{Host: "localhost"},
|
||||
}
|
||||
rt.Response = &http.Response{
|
||||
Header: http.Header{"ETag": []string{`"123456"`}},
|
||||
Body: ioutil.NopCloser(bytes.NewReader([]byte("Content"))),
|
||||
StatusCode: http.StatusOK,
|
||||
}
|
||||
resp, err := cache.RoundTrip(req)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
content, err := ioutil.ReadAll(resp.Body)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
if string(content) != "Content" {
|
||||
t.Errorf(`Expected Body to be "Content", got %q`, string(content))
|
||||
}
|
||||
|
||||
err = filepath.Walk(cacheDir, func(path string, info os.FileInfo, err error) error {
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if info.IsDir() {
|
||||
assert.Equal(os.FileMode(0750), info.Mode().Perm())
|
||||
} else {
|
||||
assert.Equal(os.FileMode(0660), info.Mode().Perm())
|
||||
}
|
||||
return nil
|
||||
})
|
||||
assert.NoError(err)
|
||||
}
|
||||
|
||||
@@ -42,7 +42,7 @@ func NewFilteredDynamicSharedInformerFactory(client dynamic.Interface, defaultRe
|
||||
return &dynamicSharedInformerFactory{
|
||||
client: client,
|
||||
defaultResync: defaultResync,
|
||||
namespace: metav1.NamespaceAll,
|
||||
namespace: namespace,
|
||||
informers: map[schema.GroupVersionResource]informers.GenericInformer{},
|
||||
startedInformers: make(map[schema.GroupVersionResource]bool),
|
||||
tweakListOptions: tweakListOptions,
|
||||
|
||||
12
go.mod
12
go.mod
@@ -22,13 +22,13 @@ require (
|
||||
github.com/spf13/pflag v1.0.1
|
||||
github.com/stretchr/testify v1.2.2
|
||||
golang.org/x/crypto v0.0.0-20181025213731-e84da0312774
|
||||
golang.org/x/net v0.0.0-20190206173232-65e2d4e15006
|
||||
golang.org/x/net v0.0.0-20190812203447-cdfb69ac37fc
|
||||
golang.org/x/oauth2 v0.0.0-20190402181905-9f3314589c9a
|
||||
golang.org/x/time v0.0.0-20161028155119-f51c12702a4d
|
||||
google.golang.org/appengine v1.5.0 // indirect
|
||||
k8s.io/api v0.0.0-20190511023547-e63b5755afac
|
||||
k8s.io/apimachinery v0.0.0-20190511023455-ad85901afca0
|
||||
k8s.io/klog v0.3.0
|
||||
k8s.io/api v0.15.8
|
||||
k8s.io/apimachinery v0.15.8
|
||||
k8s.io/klog v0.3.1
|
||||
k8s.io/utils v0.0.0-20190221042446-c2654d5206da
|
||||
sigs.k8s.io/yaml v1.1.0
|
||||
)
|
||||
@@ -37,6 +37,6 @@ replace (
|
||||
golang.org/x/sync => golang.org/x/sync v0.0.0-20181108010431-42b317875d0f
|
||||
golang.org/x/sys => golang.org/x/sys v0.0.0-20190209173611-3b5209105503
|
||||
golang.org/x/tools => golang.org/x/tools v0.0.0-20190313210603-aa82965741a9
|
||||
k8s.io/api => k8s.io/api v0.0.0-20190511023547-e63b5755afac
|
||||
k8s.io/apimachinery => k8s.io/apimachinery v0.0.0-20190511023455-ad85901afca0
|
||||
k8s.io/api => k8s.io/api v0.15.8
|
||||
k8s.io/apimachinery => k8s.io/apimachinery v0.15.8
|
||||
)
|
||||
|
||||
17
go.sum
17
go.sum
@@ -22,8 +22,11 @@ github.com/golang/protobuf v1.2.0 h1:P3YflyNX/ehuJFLhxviNdFxQPkGK5cDcApsge1SqnvM
|
||||
github.com/golang/protobuf v1.2.0/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U=
|
||||
github.com/google/btree v0.0.0-20160524151835-7d79101e329e h1:JHB7F/4TJCrYBW8+GZO8VkWDj1jxcWuCl6uxKODiyi4=
|
||||
github.com/google/btree v0.0.0-20160524151835-7d79101e329e/go.mod h1:lNA+9X1NB3Zf8V7Ke586lFgjr2dZNuvo3lPJSGZ5JPQ=
|
||||
github.com/google/go-cmp v0.3.0 h1:crn/baboCvb5fXaQ0IJ1SGTsTVrWpDsCWC8EGETZijY=
|
||||
github.com/google/go-cmp v0.3.0/go.mod h1:8QqcDgzrUqlUb/G2PQTWiueGozuR1884gddMywk6iLU=
|
||||
github.com/google/gofuzz v0.0.0-20170612174753-24818f796faf h1:+RRA9JqSOZFfKrOeqr2z77+8R2RKyh8PG66dcu1V0ck=
|
||||
github.com/google/gofuzz v0.0.0-20170612174753-24818f796faf/go.mod h1:HP5RmnzzSNb993RKQDq4+1A4ia9nllfqcQFTQJedwGI=
|
||||
github.com/google/uuid v1.0.0 h1:b4Gk+7WdP/d3HZH8EJsZpvV7EtDOgaZLtnaNGIu1adA=
|
||||
github.com/google/uuid v1.0.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
|
||||
github.com/googleapis/gnostic v0.0.0-20170729233727-0c5108395e2d h1:7XGaL1e6bYS1yIonGp9761ExpPPV1ui0SAC59Yube9k=
|
||||
github.com/googleapis/gnostic v0.0.0-20170729233727-0c5108395e2d/go.mod h1:sJBsCZ4ayReDTBIg8b9dl28c5xFWyhBTVRp3pOg5EKY=
|
||||
@@ -61,8 +64,8 @@ golang.org/x/crypto v0.0.0-20181025213731-e84da0312774/go.mod h1:6SG95UA2DQfeDnf
|
||||
golang.org/x/net v0.0.0-20180724234803-3673e40ba225/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4=
|
||||
golang.org/x/net v0.0.0-20180906233101-161cd47e91fd/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4=
|
||||
golang.org/x/net v0.0.0-20190108225652-1e06a53dbb7e/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4=
|
||||
golang.org/x/net v0.0.0-20190206173232-65e2d4e15006 h1:bfLnR+k0tq5Lqt6dflRLcZiz6UaXCMt3vhYJ1l4FQ80=
|
||||
golang.org/x/net v0.0.0-20190206173232-65e2d4e15006/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4=
|
||||
golang.org/x/net v0.0.0-20190812203447-cdfb69ac37fc h1:gkKoSkUmnU6bpS/VhkuO27bzQeSA51uaEfbOW5dNb68=
|
||||
golang.org/x/net v0.0.0-20190812203447-cdfb69ac37fc/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4=
|
||||
golang.org/x/oauth2 v0.0.0-20190402181905-9f3314589c9a h1:tImsplftrFpALCYumobsd0K86vlAs/eXGFms2txfJfA=
|
||||
golang.org/x/oauth2 v0.0.0-20190402181905-9f3314589c9a/go.mod h1:gOpvHmFTYa4IltrdGE7lF6nIHvwfUNPOp7c8zoXwtLw=
|
||||
golang.org/x/sync v0.0.0-20181108010431-42b317875d0f h1:Bl/8QSvNqXvPGPGXa2z5xUTmV7VDcZyvRZ+QQXkXTZQ=
|
||||
@@ -88,10 +91,12 @@ gopkg.in/tomb.v1 v1.0.0-20141024135613-dd632973f1e7 h1:uRGJdciOHaEIrze2W8Q3AKkep
|
||||
gopkg.in/tomb.v1 v1.0.0-20141024135613-dd632973f1e7/go.mod h1:dt/ZhP58zS4L8KSrWDmTeBkI65Dw0HsyUHuEVlX15mw=
|
||||
gopkg.in/yaml.v2 v2.2.1 h1:mUhvW9EsL+naU5Q3cakzfE91YhliOondGd6ZrsDBHQE=
|
||||
gopkg.in/yaml.v2 v2.2.1/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
|
||||
k8s.io/api v0.0.0-20190511023547-e63b5755afac/go.mod h1:f265Ep4XvHtyggfaaNhtP0AtAFilBaXq2fPjm5kYxwI=
|
||||
k8s.io/apimachinery v0.0.0-20190511023455-ad85901afca0/go.mod h1:5CBnzrKYGHzv9ZsSKmQ8wHt4XI4/TUBPDwYM9FlZMyw=
|
||||
k8s.io/klog v0.3.0 h1:0VPpR+sizsiivjIfIAQH/rl8tan6jvWkS7lU+0di3lE=
|
||||
k8s.io/klog v0.3.0/go.mod h1:Gq+BEi5rUBO/HRz0bTSXDUcqjScdoY3a9IHpCEIOOfk=
|
||||
gopkg.in/yaml.v2 v2.2.4 h1:/eiJrUcujPVeJ3xlSWaiNi3uSVmDGBK1pDHUHAnao1I=
|
||||
gopkg.in/yaml.v2 v2.2.4/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
|
||||
k8s.io/api v0.15.8/go.mod h1:hpDXsOhY/unVSSzhMol7kihWaNMf2snhF4nejjlzUzk=
|
||||
k8s.io/apimachinery v0.15.8/go.mod h1:Xc10RHc1U+F/e9GCloJ8QAeCGevSVP5xhOhqlE+e1kM=
|
||||
k8s.io/klog v0.3.1 h1:RVgyDHY/kFKtLqh67NvEWIgkMneNoIrdkN0CxDSQc68=
|
||||
k8s.io/klog v0.3.1/go.mod h1:Gq+BEi5rUBO/HRz0bTSXDUcqjScdoY3a9IHpCEIOOfk=
|
||||
k8s.io/kube-openapi v0.0.0-20190228160746-b3a7cee44a30 h1:TRb4wNWoBVrH9plmkp2q86FIDppkbrEXdXlxU3a3BMI=
|
||||
k8s.io/kube-openapi v0.0.0-20190228160746-b3a7cee44a30/go.mod h1:BXM9ceUBTj2QnfH2MK1odQs778ajze1RxcmP6S8RVVc=
|
||||
k8s.io/utils v0.0.0-20190221042446-c2654d5206da h1:ElyM7RPonbKnQqOcw7dG2IK5uvQQn3b/WPHqD5mBvP4=
|
||||
|
||||
98
kubernetes/typed/events/v1beta1/event_expansion.go
Normal file
98
kubernetes/typed/events/v1beta1/event_expansion.go
Normal file
@@ -0,0 +1,98 @@
|
||||
/*
|
||||
Copyright 2019 The Kubernetes Authors.
|
||||
|
||||
Licensed under the Apache License, Version 2.0 (the "License");
|
||||
you may not use this file except in compliance with the License.
|
||||
You may obtain a copy of the License at
|
||||
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
|
||||
Unless required by applicable law or agreed to in writing, software
|
||||
distributed under the License is distributed on an "AS IS" BASIS,
|
||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
*/
|
||||
|
||||
package v1beta1
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
|
||||
"k8s.io/api/events/v1beta1"
|
||||
"k8s.io/apimachinery/pkg/types"
|
||||
)
|
||||
|
||||
// The EventExpansion interface allows manually adding extra methods to the EventInterface.
|
||||
// TODO: Add querying functions to the event expansion
|
||||
type EventExpansion interface {
|
||||
// CreateWithEventNamespace is the same as a Create
|
||||
// except that it sends the request to the event.Namespace.
|
||||
CreateWithEventNamespace(event *v1beta1.Event) (*v1beta1.Event, error)
|
||||
// UpdateWithEventNamespace is the same as a Update
|
||||
// except that it sends the request to the event.Namespace.
|
||||
UpdateWithEventNamespace(event *v1beta1.Event) (*v1beta1.Event, error)
|
||||
// PatchWithEventNamespace is the same as an Update
|
||||
// except that it sends the request to the event.Namespace.
|
||||
PatchWithEventNamespace(event *v1beta1.Event, data []byte) (*v1beta1.Event, error)
|
||||
}
|
||||
|
||||
// CreateWithEventNamespace makes a new event.
|
||||
// Returns the copy of the event the server returns, or an error.
|
||||
// The namespace to create the event within is deduced from the event.
|
||||
// it must either match this event client's namespace, or this event client must
|
||||
// have been created with the "" namespace.
|
||||
func (e *events) CreateWithEventNamespace(event *v1beta1.Event) (*v1beta1.Event, error) {
|
||||
if e.ns != "" && event.Namespace != e.ns {
|
||||
return nil, fmt.Errorf("can't create an event with namespace '%v' in namespace '%v'", event.Namespace, e.ns)
|
||||
}
|
||||
result := &v1beta1.Event{}
|
||||
err := e.client.Post().
|
||||
NamespaceIfScoped(event.Namespace, len(event.Namespace) > 0).
|
||||
Resource("events").
|
||||
Body(event).
|
||||
Do().
|
||||
Into(result)
|
||||
return result, err
|
||||
}
|
||||
|
||||
// UpdateWithEventNamespace modifies an existing event.
|
||||
// It returns the copy of the event that the server returns, or an error.
|
||||
// The namespace and key to update the event within is deduced from the event.
|
||||
// The namespace must either match this event client's namespace, or this event client must have been
|
||||
// created with the "" namespace.
|
||||
// Update also requires the ResourceVersion to be set in the event object.
|
||||
func (e *events) UpdateWithEventNamespace(event *v1beta1.Event) (*v1beta1.Event, error) {
|
||||
if e.ns != "" && event.Namespace != e.ns {
|
||||
return nil, fmt.Errorf("can't update an event with namespace '%v' in namespace '%v'", event.Namespace, e.ns)
|
||||
}
|
||||
result := &v1beta1.Event{}
|
||||
err := e.client.Put().
|
||||
NamespaceIfScoped(event.Namespace, len(event.Namespace) > 0).
|
||||
Resource("events").
|
||||
Name(event.Name).
|
||||
Body(event).
|
||||
Do().
|
||||
Into(result)
|
||||
return result, err
|
||||
}
|
||||
|
||||
// PatchWithEventNamespace modifies an existing event.
|
||||
// It returns the copy of the event that the server returns, or an error.
|
||||
// The namespace and name of the target event is deduced from the event.
|
||||
// The namespace must either match this event client's namespace, or this event client must
|
||||
// have been created with the "" namespace.
|
||||
func (e *events) PatchWithEventNamespace(event *v1beta1.Event, data []byte) (*v1beta1.Event, error) {
|
||||
if e.ns != "" && event.Namespace != e.ns {
|
||||
return nil, fmt.Errorf("can't patch an event with namespace '%v' in namespace '%v'", event.Namespace, e.ns)
|
||||
}
|
||||
result := &v1beta1.Event{}
|
||||
err := e.client.Patch(types.StrategicMergePatchType).
|
||||
NamespaceIfScoped(event.Namespace, len(event.Namespace) > 0).
|
||||
Resource("events").
|
||||
Name(event.Name).
|
||||
Body(data).
|
||||
Do().
|
||||
Into(result)
|
||||
return result, err
|
||||
}
|
||||
66
kubernetes/typed/events/v1beta1/fake/fake_event_expansion.go
Normal file
66
kubernetes/typed/events/v1beta1/fake/fake_event_expansion.go
Normal file
@@ -0,0 +1,66 @@
|
||||
/*
|
||||
Copyright 2014 The Kubernetes Authors.
|
||||
|
||||
Licensed under the Apache License, Version 2.0 (the "License");
|
||||
you may not use this file except in compliance with the License.
|
||||
You may obtain a copy of the License at
|
||||
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
|
||||
Unless required by applicable law or agreed to in writing, software
|
||||
distributed under the License is distributed on an "AS IS" BASIS,
|
||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
*/
|
||||
|
||||
package fake
|
||||
|
||||
import (
|
||||
v1beta1 "k8s.io/api/events/v1beta1"
|
||||
types "k8s.io/apimachinery/pkg/types"
|
||||
core "k8s.io/client-go/testing"
|
||||
)
|
||||
|
||||
// CreateWithEventNamespace creats a new event. Returns the copy of the event the server returns, or an error.
|
||||
func (c *FakeEvents) CreateWithEventNamespace(event *v1beta1.Event) (*v1beta1.Event, error) {
|
||||
action := core.NewRootCreateAction(eventsResource, event)
|
||||
if c.ns != "" {
|
||||
action = core.NewCreateAction(eventsResource, c.ns, event)
|
||||
}
|
||||
obj, err := c.Fake.Invokes(action, event)
|
||||
if obj == nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return obj.(*v1beta1.Event), err
|
||||
}
|
||||
|
||||
// UpdateWithEventNamespace replaces an existing event. Returns the copy of the event the server returns, or an error.
|
||||
func (c *FakeEvents) UpdateWithEventNamespace(event *v1beta1.Event) (*v1beta1.Event, error) {
|
||||
action := core.NewRootUpdateAction(eventsResource, event)
|
||||
if c.ns != "" {
|
||||
action = core.NewUpdateAction(eventsResource, c.ns, event)
|
||||
}
|
||||
obj, err := c.Fake.Invokes(action, event)
|
||||
if obj == nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return obj.(*v1beta1.Event), err
|
||||
}
|
||||
|
||||
// PatchWithEventNamespace patches an existing event. Returns the copy of the event the server returns, or an error.
|
||||
func (c *FakeEvents) PatchWithEventNamespace(event *v1beta1.Event, data []byte) (*v1beta1.Event, error) {
|
||||
pt := types.StrategicMergePatchType
|
||||
action := core.NewRootPatchAction(eventsResource, event.Name, pt, data)
|
||||
if c.ns != "" {
|
||||
action = core.NewPatchAction(eventsResource, c.ns, event.Name, pt, data)
|
||||
}
|
||||
obj, err := c.Fake.Invokes(action, event)
|
||||
if obj == nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return obj.(*v1beta1.Event), err
|
||||
}
|
||||
@@ -17,5 +17,3 @@ limitations under the License.
|
||||
// Code generated by client-gen. DO NOT EDIT.
|
||||
|
||||
package v1beta1
|
||||
|
||||
type EventExpansion interface{}
|
||||
|
||||
@@ -287,7 +287,7 @@ func (ts *azureTokenSource) refreshToken(token *azureToken) (*azureToken, error)
|
||||
return nil, err
|
||||
}
|
||||
|
||||
oauthConfig, err := adal.NewOAuthConfig(env.ActiveDirectoryEndpoint, token.tenantID)
|
||||
oauthConfig, err := adal.NewOAuthConfigWithAPIVersion(env.ActiveDirectoryEndpoint, token.tenantID, nil)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("building the OAuth configuration for token refresh: %v", err)
|
||||
}
|
||||
@@ -344,7 +344,7 @@ func newAzureTokenSourceDeviceCode(environment azure.Environment, clientID strin
|
||||
}
|
||||
|
||||
func (ts *azureTokenSourceDeviceCode) Token() (*azureToken, error) {
|
||||
oauthConfig, err := adal.NewOAuthConfig(ts.environment.ActiveDirectoryEndpoint, ts.tenantID)
|
||||
oauthConfig, err := adal.NewOAuthConfigWithAPIVersion(ts.environment.ActiveDirectoryEndpoint, ts.tenantID, nil)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("building the OAuth configuration for device code authentication: %v", err)
|
||||
}
|
||||
|
||||
@@ -74,9 +74,10 @@ func (c *Config) TransportConfig() (*transport.Config, error) {
|
||||
KeyFile: c.KeyFile,
|
||||
KeyData: c.KeyData,
|
||||
},
|
||||
Username: c.Username,
|
||||
Password: c.Password,
|
||||
BearerToken: c.BearerToken,
|
||||
Username: c.Username,
|
||||
Password: c.Password,
|
||||
BearerToken: c.BearerToken,
|
||||
BearerTokenFile: c.BearerTokenFile,
|
||||
Impersonate: transport.ImpersonationConfig{
|
||||
UserName: c.Impersonate.UserName,
|
||||
Groups: c.Impersonate.Groups,
|
||||
|
||||
@@ -105,7 +105,7 @@ func LoadFromFile(path string) (*Info, error) {
|
||||
// The fields of client.Config with a corresponding field in the Info are set
|
||||
// with the value from the Info.
|
||||
func (info Info) MergeWithConfig(c restclient.Config) (restclient.Config, error) {
|
||||
var config restclient.Config = c
|
||||
var config = c
|
||||
config.Username = info.User
|
||||
config.Password = info.Password
|
||||
config.CAFile = info.CAFile
|
||||
@@ -118,6 +118,7 @@ func (info Info) MergeWithConfig(c restclient.Config) (restclient.Config, error)
|
||||
return config, nil
|
||||
}
|
||||
|
||||
// Complete returns true if the Kubernetes API authorization info is complete.
|
||||
func (info Info) Complete() bool {
|
||||
return len(info.User) > 0 ||
|
||||
len(info.CertFile) > 0 ||
|
||||
|
||||
14
tools/cache/delta_fifo.go
vendored
14
tools/cache/delta_fifo.go
vendored
@@ -295,13 +295,6 @@ func isDeletionDup(a, b *Delta) *Delta {
|
||||
return b
|
||||
}
|
||||
|
||||
// willObjectBeDeletedLocked returns true only if the last delta for the
|
||||
// given object is Delete. Caller must lock first.
|
||||
func (f *DeltaFIFO) willObjectBeDeletedLocked(id string) bool {
|
||||
deltas := f.items[id]
|
||||
return len(deltas) > 0 && deltas[len(deltas)-1].Type == Deleted
|
||||
}
|
||||
|
||||
// queueActionLocked appends to the delta list for the object.
|
||||
// Caller must lock first.
|
||||
func (f *DeltaFIFO) queueActionLocked(actionType DeltaType, obj interface{}) error {
|
||||
@@ -310,13 +303,6 @@ func (f *DeltaFIFO) queueActionLocked(actionType DeltaType, obj interface{}) err
|
||||
return KeyError{obj, err}
|
||||
}
|
||||
|
||||
// If object is supposed to be deleted (last event is Deleted),
|
||||
// then we should ignore Sync events, because it would result in
|
||||
// recreation of this object.
|
||||
if actionType == Sync && f.willObjectBeDeletedLocked(id) {
|
||||
return nil
|
||||
}
|
||||
|
||||
newDeltas := append(f.items[id], Delta{actionType, obj})
|
||||
newDeltas = dedupDeltas(newDeltas)
|
||||
|
||||
|
||||
27
tools/cache/delta_fifo_test.go
vendored
27
tools/cache/delta_fifo_test.go
vendored
@@ -85,6 +85,33 @@ func TestDeltaFIFO_basic(t *testing.T) {
|
||||
}
|
||||
}
|
||||
|
||||
// TestDeltaFIFO_replaceWithDeleteDeltaIn tests that a `Sync` delta for an
|
||||
// object `O` with ID `X` is added when .Replace is called and `O` is among the
|
||||
// replacement objects even if the DeltaFIFO already stores in terminal position
|
||||
// a delta of type `Delete` for ID `X`. Not adding the `Sync` delta causes
|
||||
// SharedIndexInformers to miss `O`'s create notification, see https://github.com/kubernetes/kubernetes/issues/83810
|
||||
// for more details.
|
||||
func TestDeltaFIFO_replaceWithDeleteDeltaIn(t *testing.T) {
|
||||
oldObj := mkFifoObj("foo", 1)
|
||||
newObj := mkFifoObj("foo", 2)
|
||||
|
||||
f := NewDeltaFIFO(testFifoObjectKeyFunc, keyLookupFunc(func() []testFifoObject {
|
||||
return []testFifoObject{oldObj}
|
||||
}))
|
||||
|
||||
f.Delete(oldObj)
|
||||
f.Replace([]interface{}{newObj}, "")
|
||||
|
||||
actualDeltas := Pop(f)
|
||||
expectedDeltas := Deltas{
|
||||
Delta{Type: Deleted, Object: oldObj},
|
||||
Delta{Type: Sync, Object: newObj},
|
||||
}
|
||||
if !reflect.DeepEqual(expectedDeltas, actualDeltas) {
|
||||
t.Errorf("expected %#v, got %#v", expectedDeltas, actualDeltas)
|
||||
}
|
||||
}
|
||||
|
||||
func TestDeltaFIFO_requeueOnPop(t *testing.T) {
|
||||
f := NewDeltaFIFO(testFifoObjectKeyFunc, nil)
|
||||
|
||||
|
||||
2
tools/cache/reflector.go
vendored
2
tools/cache/reflector.go
vendored
@@ -83,7 +83,7 @@ var (
|
||||
// NewNamespaceKeyedIndexerAndReflector creates an Indexer and a Reflector
|
||||
// The indexer is configured to key on namespace
|
||||
func NewNamespaceKeyedIndexerAndReflector(lw ListerWatcher, expectedType interface{}, resyncPeriod time.Duration) (indexer Indexer, reflector *Reflector) {
|
||||
indexer = NewIndexer(MetaNamespaceKeyFunc, Indexers{"namespace": MetaNamespaceIndexFunc})
|
||||
indexer = NewIndexer(MetaNamespaceKeyFunc, Indexers{NamespaceIndex: MetaNamespaceIndexFunc})
|
||||
reflector = NewReflector(lw, expectedType, indexer, resyncPeriod)
|
||||
return indexer, reflector
|
||||
}
|
||||
|
||||
@@ -228,6 +228,7 @@ func (config *DirectClientConfig) getUserIdentificationPartialConfig(configAuthI
|
||||
// blindly overwrite existing values based on precedence
|
||||
if len(configAuthInfo.Token) > 0 {
|
||||
mergedConfig.BearerToken = configAuthInfo.Token
|
||||
mergedConfig.BearerTokenFile = configAuthInfo.TokenFile
|
||||
} else if len(configAuthInfo.TokenFile) > 0 {
|
||||
tokenBytes, err := ioutil.ReadFile(configAuthInfo.TokenFile)
|
||||
if err != nil {
|
||||
@@ -489,8 +490,9 @@ func (config *inClusterClientConfig) ClientConfig() (*restclient.Config, error)
|
||||
if server := config.overrides.ClusterInfo.Server; len(server) > 0 {
|
||||
icc.Host = server
|
||||
}
|
||||
if token := config.overrides.AuthInfo.Token; len(token) > 0 {
|
||||
icc.BearerToken = token
|
||||
if len(config.overrides.AuthInfo.Token) > 0 || len(config.overrides.AuthInfo.TokenFile) > 0 {
|
||||
icc.BearerToken = config.overrides.AuthInfo.Token
|
||||
icc.BearerTokenFile = config.overrides.AuthInfo.TokenFile
|
||||
}
|
||||
if certificateAuthorityFile := config.overrides.ClusterInfo.CertificateAuthority; len(certificateAuthorityFile) > 0 {
|
||||
icc.TLSClientConfig.CAFile = certificateAuthorityFile
|
||||
|
||||
@@ -548,6 +548,30 @@ func TestInClusterClientConfigPrecedence(t *testing.T) {
|
||||
},
|
||||
},
|
||||
},
|
||||
{
|
||||
overrides: &ConfigOverrides{
|
||||
ClusterInfo: clientcmdapi.Cluster{
|
||||
Server: "https://host-from-overrides.com",
|
||||
CertificateAuthority: "/path/to/ca-from-overrides.crt",
|
||||
},
|
||||
AuthInfo: clientcmdapi.AuthInfo{
|
||||
Token: "token-from-override",
|
||||
TokenFile: "tokenfile-from-override",
|
||||
},
|
||||
},
|
||||
},
|
||||
{
|
||||
overrides: &ConfigOverrides{
|
||||
ClusterInfo: clientcmdapi.Cluster{
|
||||
Server: "https://host-from-overrides.com",
|
||||
CertificateAuthority: "/path/to/ca-from-overrides.crt",
|
||||
},
|
||||
AuthInfo: clientcmdapi.AuthInfo{
|
||||
Token: "",
|
||||
TokenFile: "tokenfile-from-override",
|
||||
},
|
||||
},
|
||||
},
|
||||
{
|
||||
overrides: &ConfigOverrides{},
|
||||
},
|
||||
@@ -556,13 +580,15 @@ func TestInClusterClientConfigPrecedence(t *testing.T) {
|
||||
for _, tc := range tt {
|
||||
expectedServer := "https://host-from-cluster.com"
|
||||
expectedToken := "token-from-cluster"
|
||||
expectedTokenFile := "tokenfile-from-cluster"
|
||||
expectedCAFile := "/path/to/ca-from-cluster.crt"
|
||||
|
||||
icc := &inClusterClientConfig{
|
||||
inClusterConfigProvider: func() (*restclient.Config, error) {
|
||||
return &restclient.Config{
|
||||
Host: expectedServer,
|
||||
BearerToken: expectedToken,
|
||||
Host: expectedServer,
|
||||
BearerToken: expectedToken,
|
||||
BearerTokenFile: expectedTokenFile,
|
||||
TLSClientConfig: restclient.TLSClientConfig{
|
||||
CAFile: expectedCAFile,
|
||||
},
|
||||
@@ -579,8 +605,9 @@ func TestInClusterClientConfigPrecedence(t *testing.T) {
|
||||
if overridenServer := tc.overrides.ClusterInfo.Server; len(overridenServer) > 0 {
|
||||
expectedServer = overridenServer
|
||||
}
|
||||
if overridenToken := tc.overrides.AuthInfo.Token; len(overridenToken) > 0 {
|
||||
expectedToken = overridenToken
|
||||
if len(tc.overrides.AuthInfo.Token) > 0 || len(tc.overrides.AuthInfo.TokenFile) > 0 {
|
||||
expectedToken = tc.overrides.AuthInfo.Token
|
||||
expectedTokenFile = tc.overrides.AuthInfo.TokenFile
|
||||
}
|
||||
if overridenCAFile := tc.overrides.ClusterInfo.CertificateAuthority; len(overridenCAFile) > 0 {
|
||||
expectedCAFile = overridenCAFile
|
||||
@@ -592,6 +619,9 @@ func TestInClusterClientConfigPrecedence(t *testing.T) {
|
||||
if clientConfig.BearerToken != expectedToken {
|
||||
t.Errorf("Expected token %v, got %v", expectedToken, clientConfig.BearerToken)
|
||||
}
|
||||
if clientConfig.BearerTokenFile != expectedTokenFile {
|
||||
t.Errorf("Expected tokenfile %v, got %v", expectedTokenFile, clientConfig.BearerTokenFile)
|
||||
}
|
||||
if clientConfig.TLSClientConfig.CAFile != expectedCAFile {
|
||||
t.Errorf("Expected Certificate Authority %v, got %v", expectedCAFile, clientConfig.TLSClientConfig.CAFile)
|
||||
}
|
||||
|
||||
312
tools/events/event_broadcaster.go
Normal file
312
tools/events/event_broadcaster.go
Normal file
@@ -0,0 +1,312 @@
|
||||
/*
|
||||
Copyright 2019 The Kubernetes Authors.
|
||||
|
||||
Licensed under the Apache License, Version 2.0 (the "License");
|
||||
you may not use this file except in compliance with the License.
|
||||
You may obtain a copy of the License at
|
||||
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
|
||||
Unless required by applicable law or agreed to in writing, software
|
||||
distributed under the License is distributed on an "AS IS" BASIS,
|
||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
*/
|
||||
|
||||
package events
|
||||
|
||||
import (
|
||||
"os"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
corev1 "k8s.io/api/core/v1"
|
||||
"k8s.io/apimachinery/pkg/api/errors"
|
||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||
"k8s.io/apimachinery/pkg/runtime"
|
||||
"k8s.io/apimachinery/pkg/util/clock"
|
||||
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
|
||||
"k8s.io/apimachinery/pkg/watch"
|
||||
restclient "k8s.io/client-go/rest"
|
||||
|
||||
"k8s.io/api/events/v1beta1"
|
||||
"k8s.io/apimachinery/pkg/util/json"
|
||||
"k8s.io/apimachinery/pkg/util/strategicpatch"
|
||||
"k8s.io/apimachinery/pkg/util/wait"
|
||||
typedv1beta1 "k8s.io/client-go/kubernetes/typed/events/v1beta1"
|
||||
"k8s.io/client-go/tools/record/util"
|
||||
"k8s.io/klog"
|
||||
)
|
||||
|
||||
const (
|
||||
maxTriesPerEvent = 12
|
||||
finishTime = 6 * time.Minute
|
||||
refreshTime = 30 * time.Minute
|
||||
maxQueuedEvents = 1000
|
||||
)
|
||||
|
||||
var defaultSleepDuration = 10 * time.Second
|
||||
|
||||
// TODO: validate impact of copying and investigate hashing
|
||||
type eventKey struct {
|
||||
action string
|
||||
reason string
|
||||
reportingController string
|
||||
reportingInstance string
|
||||
regarding corev1.ObjectReference
|
||||
related corev1.ObjectReference
|
||||
}
|
||||
|
||||
type eventBroadcasterImpl struct {
|
||||
*watch.Broadcaster
|
||||
mu sync.Mutex
|
||||
eventCache map[eventKey]*v1beta1.Event
|
||||
sleepDuration time.Duration
|
||||
sink EventSink
|
||||
}
|
||||
|
||||
// EventSinkImpl wraps EventInterface to implement EventSink.
|
||||
// TODO: this makes it easier for testing purpose and masks the logic of performing API calls.
|
||||
// Note that rollbacking to raw clientset should also be transparent.
|
||||
type EventSinkImpl struct {
|
||||
Interface typedv1beta1.EventInterface
|
||||
}
|
||||
|
||||
// Create is the same as CreateWithEventNamespace of the EventExpansion
|
||||
func (e *EventSinkImpl) Create(event *v1beta1.Event) (*v1beta1.Event, error) {
|
||||
return e.Interface.CreateWithEventNamespace(event)
|
||||
}
|
||||
|
||||
// Update is the same as UpdateithEventNamespace of the EventExpansion
|
||||
func (e *EventSinkImpl) Update(event *v1beta1.Event) (*v1beta1.Event, error) {
|
||||
return e.Interface.UpdateWithEventNamespace(event)
|
||||
}
|
||||
|
||||
// Patch is the same as PatchWithEventNamespace of the EventExpansion
|
||||
func (e *EventSinkImpl) Patch(event *v1beta1.Event, data []byte) (*v1beta1.Event, error) {
|
||||
return e.Interface.PatchWithEventNamespace(event, data)
|
||||
}
|
||||
|
||||
// NewBroadcaster Creates a new event broadcaster.
|
||||
func NewBroadcaster(sink EventSink) EventBroadcaster {
|
||||
return newBroadcaster(sink, defaultSleepDuration, map[eventKey]*v1beta1.Event{})
|
||||
}
|
||||
|
||||
// NewBroadcasterForTest Creates a new event broadcaster for test purposes.
|
||||
func newBroadcaster(sink EventSink, sleepDuration time.Duration, eventCache map[eventKey]*v1beta1.Event) EventBroadcaster {
|
||||
return &eventBroadcasterImpl{
|
||||
Broadcaster: watch.NewBroadcaster(maxQueuedEvents, watch.DropIfChannelFull),
|
||||
eventCache: eventCache,
|
||||
sleepDuration: sleepDuration,
|
||||
sink: sink,
|
||||
}
|
||||
}
|
||||
|
||||
// refreshExistingEventSeries refresh events TTL
|
||||
func (e *eventBroadcasterImpl) refreshExistingEventSeries() {
|
||||
// TODO: Investigate whether lock contention won't be a problem
|
||||
e.mu.Lock()
|
||||
defer e.mu.Unlock()
|
||||
for isomorphicKey, event := range e.eventCache {
|
||||
if event.Series != nil {
|
||||
if recordedEvent, retry := recordEvent(e.sink, event); !retry {
|
||||
e.eventCache[isomorphicKey] = recordedEvent
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// finishSeries checks if a series has ended and either:
|
||||
// - write final count to the apiserver
|
||||
// - delete a singleton event (i.e. series field is nil) from the cache
|
||||
func (e *eventBroadcasterImpl) finishSeries() {
|
||||
// TODO: Investigate whether lock contention won't be a problem
|
||||
e.mu.Lock()
|
||||
defer e.mu.Unlock()
|
||||
for isomorphicKey, event := range e.eventCache {
|
||||
eventSerie := event.Series
|
||||
if eventSerie != nil {
|
||||
if eventSerie.LastObservedTime.Time.Before(time.Now().Add(-finishTime)) {
|
||||
if _, retry := recordEvent(e.sink, event); !retry {
|
||||
delete(e.eventCache, isomorphicKey)
|
||||
}
|
||||
}
|
||||
} else if event.EventTime.Time.Before(time.Now().Add(-finishTime)) {
|
||||
delete(e.eventCache, isomorphicKey)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// NewRecorder returns an EventRecorder that records events with the given event source.
|
||||
func (e *eventBroadcasterImpl) NewRecorder(scheme *runtime.Scheme, reportingController string) EventRecorder {
|
||||
hostname, _ := os.Hostname()
|
||||
reportingInstance := reportingController + "-" + hostname
|
||||
return &recorderImpl{scheme, reportingController, reportingInstance, e.Broadcaster, clock.RealClock{}}
|
||||
}
|
||||
|
||||
func (e *eventBroadcasterImpl) recordToSink(event *v1beta1.Event, clock clock.Clock) {
|
||||
// Make a copy before modification, because there could be multiple listeners.
|
||||
eventCopy := event.DeepCopy()
|
||||
go func() {
|
||||
evToRecord := func() *v1beta1.Event {
|
||||
e.mu.Lock()
|
||||
defer e.mu.Unlock()
|
||||
eventKey := getKey(eventCopy)
|
||||
isomorphicEvent, isIsomorphic := e.eventCache[eventKey]
|
||||
if isIsomorphic {
|
||||
if isomorphicEvent.Series != nil {
|
||||
isomorphicEvent.Series.Count++
|
||||
isomorphicEvent.Series.LastObservedTime = metav1.MicroTime{Time: clock.Now()}
|
||||
return nil
|
||||
}
|
||||
isomorphicEvent.Series = &v1beta1.EventSeries{
|
||||
Count: 1,
|
||||
LastObservedTime: metav1.MicroTime{Time: clock.Now()},
|
||||
}
|
||||
return isomorphicEvent
|
||||
}
|
||||
e.eventCache[eventKey] = eventCopy
|
||||
return eventCopy
|
||||
}()
|
||||
if evToRecord != nil {
|
||||
recordedEvent := e.attemptRecording(evToRecord)
|
||||
if recordedEvent != nil {
|
||||
recordedEventKey := getKey(recordedEvent)
|
||||
e.mu.Lock()
|
||||
defer e.mu.Unlock()
|
||||
e.eventCache[recordedEventKey] = recordedEvent
|
||||
}
|
||||
}
|
||||
}()
|
||||
}
|
||||
|
||||
func (e *eventBroadcasterImpl) attemptRecording(event *v1beta1.Event) *v1beta1.Event {
|
||||
tries := 0
|
||||
for {
|
||||
if recordedEvent, retry := recordEvent(e.sink, event); !retry {
|
||||
return recordedEvent
|
||||
}
|
||||
tries++
|
||||
if tries >= maxTriesPerEvent {
|
||||
klog.Errorf("Unable to write event '%#v' (retry limit exceeded!)", event)
|
||||
return nil
|
||||
}
|
||||
// Randomize sleep so that various clients won't all be
|
||||
// synced up if the master goes down.
|
||||
time.Sleep(wait.Jitter(e.sleepDuration, 0.25))
|
||||
}
|
||||
}
|
||||
|
||||
func recordEvent(sink EventSink, event *v1beta1.Event) (*v1beta1.Event, bool) {
|
||||
var newEvent *v1beta1.Event
|
||||
var err error
|
||||
isEventSeries := event.Series != nil
|
||||
if isEventSeries {
|
||||
patch, err := createPatchBytesForSeries(event)
|
||||
if err != nil {
|
||||
klog.Errorf("Unable to calculate diff, no merge is possible: %v", err)
|
||||
return nil, false
|
||||
}
|
||||
newEvent, err = sink.Patch(event, patch)
|
||||
}
|
||||
// Update can fail because the event may have been removed and it no longer exists.
|
||||
if !isEventSeries || (isEventSeries && util.IsKeyNotFoundError(err)) {
|
||||
// Making sure that ResourceVersion is empty on creation
|
||||
event.ResourceVersion = ""
|
||||
newEvent, err = sink.Create(event)
|
||||
}
|
||||
if err == nil {
|
||||
return newEvent, false
|
||||
}
|
||||
// If we can't contact the server, then hold everything while we keep trying.
|
||||
// Otherwise, something about the event is malformed and we should abandon it.
|
||||
switch err.(type) {
|
||||
case *restclient.RequestConstructionError:
|
||||
// We will construct the request the same next time, so don't keep trying.
|
||||
klog.Errorf("Unable to construct event '%#v': '%v' (will not retry!)", event, err)
|
||||
return nil, false
|
||||
case *errors.StatusError:
|
||||
if errors.IsAlreadyExists(err) {
|
||||
klog.V(5).Infof("Server rejected event '%#v': '%v' (will not retry!)", event, err)
|
||||
} else {
|
||||
klog.Errorf("Server rejected event '%#v': '%v' (will not retry!)", event, err)
|
||||
}
|
||||
return nil, false
|
||||
case *errors.UnexpectedObjectError:
|
||||
// We don't expect this; it implies the server's response didn't match a
|
||||
// known pattern. Go ahead and retry.
|
||||
default:
|
||||
// This case includes actual http transport errors. Go ahead and retry.
|
||||
}
|
||||
klog.Errorf("Unable to write event: '%v' (may retry after sleeping)", err)
|
||||
return nil, true
|
||||
}
|
||||
|
||||
func createPatchBytesForSeries(event *v1beta1.Event) ([]byte, error) {
|
||||
oldEvent := event.DeepCopy()
|
||||
oldEvent.Series = nil
|
||||
oldData, err := json.Marshal(oldEvent)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
newData, err := json.Marshal(event)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return strategicpatch.CreateTwoWayMergePatch(oldData, newData, v1beta1.Event{})
|
||||
}
|
||||
|
||||
func getKey(event *v1beta1.Event) eventKey {
|
||||
key := eventKey{
|
||||
action: event.Action,
|
||||
reason: event.Reason,
|
||||
reportingController: event.ReportingController,
|
||||
reportingInstance: event.ReportingInstance,
|
||||
regarding: event.Regarding,
|
||||
}
|
||||
if event.Related != nil {
|
||||
key.related = *event.Related
|
||||
}
|
||||
return key
|
||||
}
|
||||
|
||||
// startEventWatcher starts sending events received from this EventBroadcaster to the given event handler function.
|
||||
// The return value is used to stop recording
|
||||
func (e *eventBroadcasterImpl) startEventWatcher(eventHandler func(event runtime.Object)) func() {
|
||||
watcher := e.Watch()
|
||||
go func() {
|
||||
defer utilruntime.HandleCrash()
|
||||
for {
|
||||
watchEvent, ok := <-watcher.ResultChan()
|
||||
if !ok {
|
||||
return
|
||||
}
|
||||
eventHandler(watchEvent.Object)
|
||||
}
|
||||
}()
|
||||
return watcher.Stop
|
||||
}
|
||||
|
||||
// StartRecordingToSink starts sending events received from the specified eventBroadcaster to the given sink.
|
||||
func (e *eventBroadcasterImpl) StartRecordingToSink(stopCh <-chan struct{}) {
|
||||
go wait.Until(func() {
|
||||
e.refreshExistingEventSeries()
|
||||
}, refreshTime, stopCh)
|
||||
go wait.Until(func() {
|
||||
e.finishSeries()
|
||||
}, finishTime, stopCh)
|
||||
eventHandler := func(obj runtime.Object) {
|
||||
event, ok := obj.(*v1beta1.Event)
|
||||
if !ok {
|
||||
klog.Errorf("unexpected type, expected v1beta1.Event")
|
||||
return
|
||||
}
|
||||
e.recordToSink(event, clock.RealClock{})
|
||||
}
|
||||
stopWatcher := e.startEventWatcher(eventHandler)
|
||||
go func() {
|
||||
<-stopCh
|
||||
stopWatcher()
|
||||
}()
|
||||
}
|
||||
92
tools/events/event_recorder.go
Normal file
92
tools/events/event_recorder.go
Normal file
@@ -0,0 +1,92 @@
|
||||
/*
|
||||
Copyright 2019 The Kubernetes Authors.
|
||||
|
||||
Licensed under the Apache License, Version 2.0 (the "License");
|
||||
you may not use this file except in compliance with the License.
|
||||
You may obtain a copy of the License at
|
||||
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
|
||||
Unless required by applicable law or agreed to in writing, software
|
||||
distributed under the License is distributed on an "AS IS" BASIS,
|
||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
*/
|
||||
|
||||
package events
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"time"
|
||||
|
||||
v1 "k8s.io/api/core/v1"
|
||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||
"k8s.io/apimachinery/pkg/runtime"
|
||||
"k8s.io/apimachinery/pkg/util/clock"
|
||||
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
|
||||
"k8s.io/apimachinery/pkg/watch"
|
||||
"k8s.io/client-go/tools/reference"
|
||||
|
||||
"k8s.io/api/events/v1beta1"
|
||||
"k8s.io/client-go/tools/record/util"
|
||||
"k8s.io/klog"
|
||||
)
|
||||
|
||||
type recorderImpl struct {
|
||||
scheme *runtime.Scheme
|
||||
reportingController string
|
||||
reportingInstance string
|
||||
*watch.Broadcaster
|
||||
clock clock.Clock
|
||||
}
|
||||
|
||||
func (recorder *recorderImpl) Eventf(regarding runtime.Object, related runtime.Object, eventtype, reason, action, note string, args ...interface{}) {
|
||||
timestamp := metav1.MicroTime{time.Now()}
|
||||
message := fmt.Sprintf(note, args...)
|
||||
refRegarding, err := reference.GetReference(recorder.scheme, regarding)
|
||||
if err != nil {
|
||||
klog.Errorf("Could not construct reference to: '%#v' due to: '%v'. Will not report event: '%v' '%v' '%v'", regarding, err, eventtype, reason, message)
|
||||
return
|
||||
}
|
||||
refRelated, err := reference.GetReference(recorder.scheme, related)
|
||||
if err != nil {
|
||||
klog.Errorf("Could not construct reference to: '%#v' due to: '%v'.", related, err)
|
||||
}
|
||||
if !util.ValidateEventType(eventtype) {
|
||||
klog.Errorf("Unsupported event type: '%v'", eventtype)
|
||||
return
|
||||
}
|
||||
event := recorder.makeEvent(refRegarding, refRelated, timestamp, eventtype, reason, message, recorder.reportingController, recorder.reportingInstance, action)
|
||||
go func() {
|
||||
defer utilruntime.HandleCrash()
|
||||
recorder.Action(watch.Added, event)
|
||||
}()
|
||||
}
|
||||
|
||||
func (recorder *recorderImpl) makeEvent(refRegarding *v1.ObjectReference, refRelated *v1.ObjectReference, timestamp metav1.MicroTime, eventtype, reason, message string, reportingController string, reportingInstance string, action string) *v1beta1.Event {
|
||||
t := metav1.Time{Time: recorder.clock.Now()}
|
||||
namespace := refRegarding.Namespace
|
||||
if namespace == "" {
|
||||
namespace = metav1.NamespaceSystem
|
||||
}
|
||||
return &v1beta1.Event{
|
||||
ObjectMeta: metav1.ObjectMeta{
|
||||
Name: fmt.Sprintf("%v.%x", refRegarding.Name, t.UnixNano()),
|
||||
Namespace: namespace,
|
||||
},
|
||||
EventTime: timestamp,
|
||||
Series: nil,
|
||||
ReportingController: reportingController,
|
||||
ReportingInstance: reportingInstance,
|
||||
Action: action,
|
||||
Reason: reason,
|
||||
Regarding: *refRegarding,
|
||||
Related: refRelated,
|
||||
Note: message,
|
||||
Type: eventtype,
|
||||
// TODO: remove this when we change conversion to convert eventSource
|
||||
// to reportingController
|
||||
DeprecatedSource: v1.EventSource{Component: reportingController},
|
||||
}
|
||||
}
|
||||
349
tools/events/eventseries_test.go
Normal file
349
tools/events/eventseries_test.go
Normal file
@@ -0,0 +1,349 @@
|
||||
/*
|
||||
Copyright 2019 The Kubernetes Authors.
|
||||
|
||||
Licensed under the Apache License, Version 2.0 (the "License");
|
||||
you may not use this file except in compliance with the License.
|
||||
You may obtain a copy of the License at
|
||||
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
|
||||
Unless required by applicable law or agreed to in writing, software
|
||||
distributed under the License is distributed on an "AS IS" BASIS,
|
||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
*/
|
||||
|
||||
package events
|
||||
|
||||
import (
|
||||
"strconv"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"os"
|
||||
"strings"
|
||||
|
||||
v1 "k8s.io/api/core/v1"
|
||||
"k8s.io/api/events/v1beta1"
|
||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||
k8sruntime "k8s.io/apimachinery/pkg/runtime"
|
||||
"k8s.io/apimachinery/pkg/util/wait"
|
||||
"k8s.io/client-go/kubernetes/scheme"
|
||||
ref "k8s.io/client-go/tools/reference"
|
||||
)
|
||||
|
||||
type testEventSeriesSink struct {
|
||||
OnCreate func(e *v1beta1.Event) (*v1beta1.Event, error)
|
||||
OnUpdate func(e *v1beta1.Event) (*v1beta1.Event, error)
|
||||
OnPatch func(e *v1beta1.Event, p []byte) (*v1beta1.Event, error)
|
||||
}
|
||||
|
||||
// Create records the event for testing.
|
||||
func (t *testEventSeriesSink) Create(e *v1beta1.Event) (*v1beta1.Event, error) {
|
||||
if t.OnCreate != nil {
|
||||
return t.OnCreate(e)
|
||||
}
|
||||
return e, nil
|
||||
}
|
||||
|
||||
// Update records the event for testing.
|
||||
func (t *testEventSeriesSink) Update(e *v1beta1.Event) (*v1beta1.Event, error) {
|
||||
if t.OnUpdate != nil {
|
||||
return t.OnUpdate(e)
|
||||
}
|
||||
return e, nil
|
||||
}
|
||||
|
||||
// Patch records the event for testing.
|
||||
func (t *testEventSeriesSink) Patch(e *v1beta1.Event, p []byte) (*v1beta1.Event, error) {
|
||||
if t.OnPatch != nil {
|
||||
return t.OnPatch(e, p)
|
||||
}
|
||||
return e, nil
|
||||
}
|
||||
|
||||
func TestEventSeriesf(t *testing.T) {
|
||||
hostname, _ := os.Hostname()
|
||||
|
||||
testPod := &v1.Pod{
|
||||
ObjectMeta: metav1.ObjectMeta{
|
||||
SelfLink: "/api/version/pods/foo",
|
||||
Name: "foo",
|
||||
Namespace: "baz",
|
||||
UID: "bar",
|
||||
},
|
||||
}
|
||||
|
||||
regarding, err := ref.GetPartialReference(scheme.Scheme, testPod, ".spec.containers[1]")
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
related, err := ref.GetPartialReference(scheme.Scheme, testPod, ".spec.containers[0]")
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
expectedEvent := &v1beta1.Event{
|
||||
ObjectMeta: metav1.ObjectMeta{
|
||||
Name: "foo",
|
||||
Namespace: "baz",
|
||||
},
|
||||
EventTime: metav1.MicroTime{time.Now()},
|
||||
ReportingController: "eventTest",
|
||||
ReportingInstance: "eventTest-" + hostname,
|
||||
Action: "started",
|
||||
Reason: "test",
|
||||
Regarding: *regarding,
|
||||
Related: related,
|
||||
Note: "some verbose message: 1",
|
||||
Type: v1.EventTypeNormal,
|
||||
}
|
||||
|
||||
isomorphicEvent := expectedEvent.DeepCopy()
|
||||
|
||||
nonIsomorphicEvent := expectedEvent.DeepCopy()
|
||||
nonIsomorphicEvent.Action = "stopped"
|
||||
|
||||
expectedEvent.Series = &v1beta1.EventSeries{Count: 1}
|
||||
table := []struct {
|
||||
regarding k8sruntime.Object
|
||||
related k8sruntime.Object
|
||||
actual *v1beta1.Event
|
||||
elements []interface{}
|
||||
expect *v1beta1.Event
|
||||
expectUpdate bool
|
||||
}{
|
||||
{
|
||||
regarding: regarding,
|
||||
related: related,
|
||||
actual: isomorphicEvent,
|
||||
elements: []interface{}{1},
|
||||
expect: expectedEvent,
|
||||
expectUpdate: true,
|
||||
},
|
||||
{
|
||||
regarding: regarding,
|
||||
related: related,
|
||||
actual: nonIsomorphicEvent,
|
||||
elements: []interface{}{1},
|
||||
expect: nonIsomorphicEvent,
|
||||
expectUpdate: false,
|
||||
},
|
||||
}
|
||||
|
||||
stopCh := make(chan struct{})
|
||||
|
||||
createEvent := make(chan *v1beta1.Event)
|
||||
updateEvent := make(chan *v1beta1.Event)
|
||||
patchEvent := make(chan *v1beta1.Event)
|
||||
|
||||
testEvents := testEventSeriesSink{
|
||||
OnCreate: func(event *v1beta1.Event) (*v1beta1.Event, error) {
|
||||
createEvent <- event
|
||||
return event, nil
|
||||
},
|
||||
OnUpdate: func(event *v1beta1.Event) (*v1beta1.Event, error) {
|
||||
updateEvent <- event
|
||||
return event, nil
|
||||
},
|
||||
OnPatch: func(event *v1beta1.Event, patch []byte) (*v1beta1.Event, error) {
|
||||
// event we receive is already patched, usually the sink uses it only to retrieve the name and namespace, here
|
||||
// we'll use it directly
|
||||
patchEvent <- event
|
||||
return event, nil
|
||||
},
|
||||
}
|
||||
eventBroadcaster := newBroadcaster(&testEvents, 0, map[eventKey]*v1beta1.Event{})
|
||||
recorder := eventBroadcaster.NewRecorder(scheme.Scheme, "eventTest")
|
||||
eventBroadcaster.StartRecordingToSink(stopCh)
|
||||
recorder.Eventf(regarding, related, isomorphicEvent.Type, isomorphicEvent.Reason, isomorphicEvent.Action, isomorphicEvent.Note, []interface{}{1})
|
||||
// read from the chan as this was needed only to populate the cache
|
||||
<-createEvent
|
||||
for index, item := range table {
|
||||
actual := item.actual
|
||||
recorder.Eventf(item.regarding, item.related, actual.Type, actual.Reason, actual.Action, actual.Note, item.elements)
|
||||
// validate event
|
||||
if item.expectUpdate {
|
||||
actualEvent := <-patchEvent
|
||||
t.Logf("%v - validating event affected by patch request", index)
|
||||
validateEventSerie(strconv.Itoa(index), true, actualEvent, item.expect, t)
|
||||
} else {
|
||||
actualEvent := <-createEvent
|
||||
t.Logf("%v - validating event affected by a create request", index)
|
||||
validateEventSerie(strconv.Itoa(index), false, actualEvent, item.expect, t)
|
||||
}
|
||||
}
|
||||
close(stopCh)
|
||||
}
|
||||
|
||||
func validateEventSerie(messagePrefix string, expectedUpdate bool, actualEvent *v1beta1.Event, expectedEvent *v1beta1.Event, t *testing.T) {
|
||||
recvEvent := *actualEvent
|
||||
|
||||
// Just check that the timestamp was set.
|
||||
if recvEvent.EventTime.IsZero() {
|
||||
t.Errorf("%v - timestamp wasn't set: %#v", messagePrefix, recvEvent)
|
||||
}
|
||||
|
||||
if expectedUpdate {
|
||||
if recvEvent.Series == nil {
|
||||
t.Errorf("%v - Series was nil but expected: %#v", messagePrefix, recvEvent.Series)
|
||||
|
||||
} else {
|
||||
if recvEvent.Series.Count != expectedEvent.Series.Count {
|
||||
t.Errorf("%v - Series mismatch actual was: %#v but expected: %#v", messagePrefix, recvEvent.Series, expectedEvent.Series)
|
||||
}
|
||||
}
|
||||
|
||||
// Check that name has the right prefix.
|
||||
if n, en := recvEvent.Name, expectedEvent.Name; !strings.HasPrefix(n, en) {
|
||||
t.Errorf("%v - Name '%v' does not contain prefix '%v'", messagePrefix, n, en)
|
||||
}
|
||||
} else {
|
||||
if recvEvent.Series != nil {
|
||||
t.Errorf("%v - series was expected to be nil but was: %#v", messagePrefix, recvEvent.Series)
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
func TestFinishSeries(t *testing.T) {
|
||||
hostname, _ := os.Hostname()
|
||||
testPod := &v1.Pod{
|
||||
ObjectMeta: metav1.ObjectMeta{
|
||||
SelfLink: "/api/version/pods/foo",
|
||||
Name: "foo",
|
||||
Namespace: "baz",
|
||||
UID: "bar",
|
||||
},
|
||||
}
|
||||
regarding, err := ref.GetPartialReference(scheme.Scheme, testPod, ".spec.containers[1]")
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
related, err := ref.GetPartialReference(scheme.Scheme, testPod, ".spec.containers[0]")
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
LastObservedTime := metav1.MicroTime{Time: time.Now().Add(-9 * time.Minute)}
|
||||
|
||||
createEvent := make(chan *v1beta1.Event, 10)
|
||||
updateEvent := make(chan *v1beta1.Event, 10)
|
||||
patchEvent := make(chan *v1beta1.Event, 10)
|
||||
testEvents := testEventSeriesSink{
|
||||
OnCreate: func(event *v1beta1.Event) (*v1beta1.Event, error) {
|
||||
createEvent <- event
|
||||
return event, nil
|
||||
},
|
||||
OnUpdate: func(event *v1beta1.Event) (*v1beta1.Event, error) {
|
||||
updateEvent <- event
|
||||
return event, nil
|
||||
},
|
||||
OnPatch: func(event *v1beta1.Event, patch []byte) (*v1beta1.Event, error) {
|
||||
// event we receive is already patched, usually the sink uses it
|
||||
// only to retrieve the name and namespace, here we'll use it directly
|
||||
patchEvent <- event
|
||||
return event, nil
|
||||
},
|
||||
}
|
||||
cache := map[eventKey]*v1beta1.Event{}
|
||||
eventBroadcaster := newBroadcaster(&testEvents, 0, cache).(*eventBroadcasterImpl)
|
||||
recorder := eventBroadcaster.NewRecorder(scheme.Scheme, "k8s.io/kube-foo").(*recorderImpl)
|
||||
cachedEvent := recorder.makeEvent(regarding, related, metav1.MicroTime{time.Now()}, v1.EventTypeNormal, "test", "some verbose message: 1", "eventTest", "eventTest-"+hostname, "started")
|
||||
nonFinishedEvent := cachedEvent.DeepCopy()
|
||||
nonFinishedEvent.ReportingController = "nonFinished-controller"
|
||||
cachedEvent.Series = &v1beta1.EventSeries{
|
||||
Count: 10,
|
||||
LastObservedTime: LastObservedTime,
|
||||
}
|
||||
cache[getKey(cachedEvent)] = cachedEvent
|
||||
cache[getKey(nonFinishedEvent)] = nonFinishedEvent
|
||||
eventBroadcaster.finishSeries()
|
||||
select {
|
||||
case actualEvent := <-patchEvent:
|
||||
t.Logf("validating event affected by patch request")
|
||||
eventBroadcaster.mu.Lock()
|
||||
defer eventBroadcaster.mu.Unlock()
|
||||
if len(cache) != 1 {
|
||||
t.Errorf("cache should be empty, but instead got a size of %v", len(cache))
|
||||
}
|
||||
if !actualEvent.Series.LastObservedTime.Equal(&cachedEvent.Series.LastObservedTime) {
|
||||
t.Errorf("series was expected be seen with LastObservedTime %v, but instead got %v ", cachedEvent.Series.LastObservedTime, actualEvent.Series.LastObservedTime)
|
||||
}
|
||||
// check that we emitted only one event
|
||||
if len(patchEvent) != 0 || len(createEvent) != 0 || len(updateEvent) != 0 {
|
||||
t.Errorf("exactly one event should be emitted, but got %v", len(patchEvent))
|
||||
}
|
||||
case <-time.After(wait.ForeverTestTimeout):
|
||||
t.Fatalf("timeout after %v", wait.ForeverTestTimeout)
|
||||
}
|
||||
}
|
||||
|
||||
func TestRefreshExistingEventSeries(t *testing.T) {
|
||||
hostname, _ := os.Hostname()
|
||||
testPod := &v1.Pod{
|
||||
ObjectMeta: metav1.ObjectMeta{
|
||||
SelfLink: "/api/version/pods/foo",
|
||||
Name: "foo",
|
||||
Namespace: "baz",
|
||||
UID: "bar",
|
||||
},
|
||||
}
|
||||
regarding, err := ref.GetPartialReference(scheme.Scheme, testPod, ".spec.containers[1]")
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
related, err := ref.GetPartialReference(scheme.Scheme, testPod, ".spec.containers[0]")
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
LastObservedTime := metav1.MicroTime{Time: time.Now().Add(-9 * time.Minute)}
|
||||
|
||||
createEvent := make(chan *v1beta1.Event, 10)
|
||||
updateEvent := make(chan *v1beta1.Event, 10)
|
||||
patchEvent := make(chan *v1beta1.Event, 10)
|
||||
testEvents := testEventSeriesSink{
|
||||
OnCreate: func(event *v1beta1.Event) (*v1beta1.Event, error) {
|
||||
createEvent <- event
|
||||
return event, nil
|
||||
},
|
||||
OnUpdate: func(event *v1beta1.Event) (*v1beta1.Event, error) {
|
||||
updateEvent <- event
|
||||
return event, nil
|
||||
},
|
||||
OnPatch: func(event *v1beta1.Event, patch []byte) (*v1beta1.Event, error) {
|
||||
// event we receive is already patched, usually the sink uses it
|
||||
//only to retrieve the name and namespace, here we'll use it directly.
|
||||
patchEvent <- event
|
||||
return event, nil
|
||||
},
|
||||
}
|
||||
cache := map[eventKey]*v1beta1.Event{}
|
||||
eventBroadcaster := newBroadcaster(&testEvents, 0, cache).(*eventBroadcasterImpl)
|
||||
recorder := eventBroadcaster.NewRecorder(scheme.Scheme, "k8s.io/kube-foo").(*recorderImpl)
|
||||
cachedEvent := recorder.makeEvent(regarding, related, metav1.MicroTime{time.Now()}, v1.EventTypeNormal, "test", "some verbose message: 1", "eventTest", "eventTest-"+hostname, "started")
|
||||
cachedEvent.Series = &v1beta1.EventSeries{
|
||||
Count: 10,
|
||||
LastObservedTime: LastObservedTime,
|
||||
}
|
||||
cacheKey := getKey(cachedEvent)
|
||||
cache[cacheKey] = cachedEvent
|
||||
|
||||
eventBroadcaster.refreshExistingEventSeries()
|
||||
select {
|
||||
case <-patchEvent:
|
||||
t.Logf("validating event affected by patch request")
|
||||
eventBroadcaster.mu.Lock()
|
||||
defer eventBroadcaster.mu.Unlock()
|
||||
if len(cache) != 1 {
|
||||
t.Errorf("cache should be with same size, but instead got a size of %v", len(cache))
|
||||
}
|
||||
// check that we emitted only one event
|
||||
if len(patchEvent) != 0 || len(createEvent) != 0 || len(updateEvent) != 0 {
|
||||
t.Errorf("exactly one event should be emitted, but got %v", len(patchEvent))
|
||||
}
|
||||
case <-time.After(wait.ForeverTestTimeout):
|
||||
t.Fatalf("timeout after %v", wait.ForeverTestTimeout)
|
||||
}
|
||||
}
|
||||
58
tools/events/interfaces.go
Normal file
58
tools/events/interfaces.go
Normal file
@@ -0,0 +1,58 @@
|
||||
/*
|
||||
Copyright 2019 The Kubernetes Authors.
|
||||
|
||||
Licensed under the Apache License, Version 2.0 (the "License");
|
||||
you may not use this file except in compliance with the License.
|
||||
You may obtain a copy of the License at
|
||||
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
|
||||
Unless required by applicable law or agreed to in writing, software
|
||||
distributed under the License is distributed on an "AS IS" BASIS,
|
||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
*/
|
||||
|
||||
package events
|
||||
|
||||
import (
|
||||
"k8s.io/api/events/v1beta1"
|
||||
"k8s.io/apimachinery/pkg/runtime"
|
||||
)
|
||||
|
||||
// EventRecorder knows how to record events on behalf of an EventSource.
|
||||
type EventRecorder interface {
|
||||
// Eventf constructs an event from the given information and puts it in the queue for sending.
|
||||
// 'regarding' is the object this event is about. Event will make a reference-- or you may also
|
||||
// pass a reference to the object directly.
|
||||
// 'related' is the secondary object for more complex actions. E.g. when regarding object triggers
|
||||
// a creation or deletion of related object.
|
||||
// 'type' of this event, and can be one of Normal, Warning. New types could be added in future
|
||||
// 'reason' is the reason this event is generated. 'reason' should be short and unique; it
|
||||
// should be in UpperCamelCase format (starting with a capital letter). "reason" will be used
|
||||
// to automate handling of events, so imagine people writing switch statements to handle them.
|
||||
// You want to make that easy.
|
||||
// 'note' is intended to be human readable.
|
||||
Eventf(regarding runtime.Object, related runtime.Object, eventtype, reason, action, note string, args ...interface{})
|
||||
}
|
||||
|
||||
// EventBroadcaster knows how to receive events and send them to any EventSink, watcher, or log.
|
||||
type EventBroadcaster interface {
|
||||
// StartRecordingToSink starts sending events received from the specified eventBroadcaster.
|
||||
StartRecordingToSink(stopCh <-chan struct{})
|
||||
|
||||
// NewRecorder returns an EventRecorder that can be used to send events to this EventBroadcaster
|
||||
// with the event source set to the given event source.
|
||||
NewRecorder(scheme *runtime.Scheme, reportingController string) EventRecorder
|
||||
}
|
||||
|
||||
// EventSink knows how to store events (client-go implements it.)
|
||||
// EventSink must respect the namespace that will be embedded in 'event'.
|
||||
// It is assumed that EventSink will return the same sorts of errors as
|
||||
// client-go's REST client.
|
||||
type EventSink interface {
|
||||
Create(event *v1beta1.Event) (*v1beta1.Event, error)
|
||||
Update(event *v1beta1.Event) (*v1beta1.Event, error)
|
||||
Patch(oldEvent *v1beta1.Event, data []byte) (*v1beta1.Event, error)
|
||||
}
|
||||
@@ -16,12 +16,16 @@ limitations under the License.
|
||||
|
||||
// Package leaderelection implements leader election of a set of endpoints.
|
||||
// It uses an annotation in the endpoints object to store the record of the
|
||||
// election state.
|
||||
// election state. This implementation does not guarantee that only one
|
||||
// client is acting as a leader (a.k.a. fencing).
|
||||
//
|
||||
// This implementation does not guarantee that only one client is acting as a
|
||||
// leader (a.k.a. fencing). A client observes timestamps captured locally to
|
||||
// infer the state of the leader election. Thus the implementation is tolerant
|
||||
// to arbitrary clock skew, but is not tolerant to arbitrary clock skew rate.
|
||||
// A client only acts on timestamps captured locally to infer the state of the
|
||||
// leader election. The client does not consider timestamps in the leader
|
||||
// election record to be accurate because these timestamps may not have been
|
||||
// produced by a local clock. The implemention does not depend on their
|
||||
// accuracy and only uses their change to indicate that another client has
|
||||
// renewed the leader lease. Thus the implementation is tolerant to arbitrary
|
||||
// clock skew, but is not tolerant to arbitrary clock skew rate.
|
||||
//
|
||||
// However the level of tolerance to skew rate can be configured by setting
|
||||
// RenewDeadline and LeaseDuration appropriately. The tolerance expressed as a
|
||||
@@ -105,12 +109,26 @@ type LeaderElectionConfig struct {
|
||||
// LeaseDuration is the duration that non-leader candidates will
|
||||
// wait to force acquire leadership. This is measured against time of
|
||||
// last observed ack.
|
||||
//
|
||||
// A client needs to wait a full LeaseDuration without observing a change to
|
||||
// the record before it can attempt to take over. When all clients are
|
||||
// shutdown and a new set of clients are started with different names against
|
||||
// the same leader record, they must wait the full LeaseDuration before
|
||||
// attempting to acquire the lease. Thus LeaseDuration should be as short as
|
||||
// possible (within your tolerance for clock skew rate) to avoid a possible
|
||||
// long waits in the scenario.
|
||||
//
|
||||
// Core clients default this value to 15 seconds.
|
||||
LeaseDuration time.Duration
|
||||
// RenewDeadline is the duration that the acting master will retry
|
||||
// refreshing leadership before giving up.
|
||||
//
|
||||
// Core clients default this value to 10 seconds.
|
||||
RenewDeadline time.Duration
|
||||
// RetryPeriod is the duration the LeaderElector clients should wait
|
||||
// between tries of actions.
|
||||
//
|
||||
// Core clients default this value to 2 seconds.
|
||||
RetryPeriod time.Duration
|
||||
|
||||
// Callbacks are callbacks that are triggered during certain lifecycle
|
||||
|
||||
@@ -33,8 +33,8 @@ import (
|
||||
"k8s.io/apimachinery/pkg/util/runtime"
|
||||
)
|
||||
|
||||
// PortForwardProtocolV1Name is the subprotocol used for port forwarding.
|
||||
// TODO move to API machinery and re-unify with kubelet/server/portfoward
|
||||
// The subprotocol "portforward.k8s.io" is used for port forwarding.
|
||||
const PortForwardProtocolV1Name = "portforward.k8s.io"
|
||||
|
||||
// PortForwarder knows how to listen for local connections and forward them to
|
||||
@@ -401,6 +401,7 @@ func (pf *PortForwarder) handleConnection(conn net.Conn, port ForwardedPort) {
|
||||
}
|
||||
}
|
||||
|
||||
// Close stops all listeners of PortForwarder.
|
||||
func (pf *PortForwarder) Close() {
|
||||
// stop all listeners
|
||||
for _, l := range pf.listeners {
|
||||
|
||||
@@ -153,7 +153,7 @@ func TestEventf(t *testing.T) {
|
||||
Name: "foo",
|
||||
Namespace: "baz",
|
||||
UID: "bar",
|
||||
APIVersion: "version",
|
||||
APIVersion: "v1",
|
||||
FieldPath: "spec.containers[2]",
|
||||
},
|
||||
Reason: "Started",
|
||||
@@ -162,7 +162,7 @@ func TestEventf(t *testing.T) {
|
||||
Count: 1,
|
||||
Type: v1.EventTypeNormal,
|
||||
},
|
||||
expectLog: `Event(v1.ObjectReference{Kind:"Pod", Namespace:"baz", Name:"foo", UID:"bar", APIVersion:"version", ResourceVersion:"", FieldPath:"spec.containers[2]"}): type: 'Normal' reason: 'Started' some verbose message: 1`,
|
||||
expectLog: `Event(v1.ObjectReference{Kind:"Pod", Namespace:"baz", Name:"foo", UID:"bar", APIVersion:"v1", ResourceVersion:"", FieldPath:"spec.containers[2]"}): type: 'Normal' reason: 'Started' some verbose message: 1`,
|
||||
expectUpdate: false,
|
||||
},
|
||||
{
|
||||
@@ -181,7 +181,7 @@ func TestEventf(t *testing.T) {
|
||||
Name: "foo",
|
||||
Namespace: "baz",
|
||||
UID: "bar",
|
||||
APIVersion: "version",
|
||||
APIVersion: "v1",
|
||||
},
|
||||
Reason: "Killed",
|
||||
Message: "some other verbose message: 1",
|
||||
@@ -189,7 +189,7 @@ func TestEventf(t *testing.T) {
|
||||
Count: 1,
|
||||
Type: v1.EventTypeNormal,
|
||||
},
|
||||
expectLog: `Event(v1.ObjectReference{Kind:"Pod", Namespace:"baz", Name:"foo", UID:"bar", APIVersion:"version", ResourceVersion:"", FieldPath:""}): type: 'Normal' reason: 'Killed' some other verbose message: 1`,
|
||||
expectLog: `Event(v1.ObjectReference{Kind:"Pod", Namespace:"baz", Name:"foo", UID:"bar", APIVersion:"v1", ResourceVersion:"", FieldPath:""}): type: 'Normal' reason: 'Killed' some other verbose message: 1`,
|
||||
expectUpdate: false,
|
||||
},
|
||||
{
|
||||
@@ -208,7 +208,7 @@ func TestEventf(t *testing.T) {
|
||||
Name: "foo",
|
||||
Namespace: "baz",
|
||||
UID: "bar",
|
||||
APIVersion: "version",
|
||||
APIVersion: "v1",
|
||||
FieldPath: "spec.containers[2]",
|
||||
},
|
||||
Reason: "Started",
|
||||
@@ -217,7 +217,7 @@ func TestEventf(t *testing.T) {
|
||||
Count: 2,
|
||||
Type: v1.EventTypeNormal,
|
||||
},
|
||||
expectLog: `Event(v1.ObjectReference{Kind:"Pod", Namespace:"baz", Name:"foo", UID:"bar", APIVersion:"version", ResourceVersion:"", FieldPath:"spec.containers[2]"}): type: 'Normal' reason: 'Started' some verbose message: 1`,
|
||||
expectLog: `Event(v1.ObjectReference{Kind:"Pod", Namespace:"baz", Name:"foo", UID:"bar", APIVersion:"v1", ResourceVersion:"", FieldPath:"spec.containers[2]"}): type: 'Normal' reason: 'Started' some verbose message: 1`,
|
||||
expectUpdate: true,
|
||||
},
|
||||
{
|
||||
@@ -236,7 +236,7 @@ func TestEventf(t *testing.T) {
|
||||
Name: "foo",
|
||||
Namespace: "baz",
|
||||
UID: "differentUid",
|
||||
APIVersion: "version",
|
||||
APIVersion: "v1",
|
||||
FieldPath: "spec.containers[3]",
|
||||
},
|
||||
Reason: "Started",
|
||||
@@ -245,7 +245,7 @@ func TestEventf(t *testing.T) {
|
||||
Count: 1,
|
||||
Type: v1.EventTypeNormal,
|
||||
},
|
||||
expectLog: `Event(v1.ObjectReference{Kind:"Pod", Namespace:"baz", Name:"foo", UID:"differentUid", APIVersion:"version", ResourceVersion:"", FieldPath:"spec.containers[3]"}): type: 'Normal' reason: 'Started' some verbose message: 1`,
|
||||
expectLog: `Event(v1.ObjectReference{Kind:"Pod", Namespace:"baz", Name:"foo", UID:"differentUid", APIVersion:"v1", ResourceVersion:"", FieldPath:"spec.containers[3]"}): type: 'Normal' reason: 'Started' some verbose message: 1`,
|
||||
expectUpdate: false,
|
||||
},
|
||||
{
|
||||
@@ -264,7 +264,7 @@ func TestEventf(t *testing.T) {
|
||||
Name: "foo",
|
||||
Namespace: "baz",
|
||||
UID: "bar",
|
||||
APIVersion: "version",
|
||||
APIVersion: "v1",
|
||||
FieldPath: "spec.containers[2]",
|
||||
},
|
||||
Reason: "Started",
|
||||
@@ -273,7 +273,7 @@ func TestEventf(t *testing.T) {
|
||||
Count: 3,
|
||||
Type: v1.EventTypeNormal,
|
||||
},
|
||||
expectLog: `Event(v1.ObjectReference{Kind:"Pod", Namespace:"baz", Name:"foo", UID:"bar", APIVersion:"version", ResourceVersion:"", FieldPath:"spec.containers[2]"}): type: 'Normal' reason: 'Started' some verbose message: 1`,
|
||||
expectLog: `Event(v1.ObjectReference{Kind:"Pod", Namespace:"baz", Name:"foo", UID:"bar", APIVersion:"v1", ResourceVersion:"", FieldPath:"spec.containers[2]"}): type: 'Normal' reason: 'Started' some verbose message: 1`,
|
||||
expectUpdate: true,
|
||||
},
|
||||
{
|
||||
@@ -292,7 +292,7 @@ func TestEventf(t *testing.T) {
|
||||
Name: "foo",
|
||||
Namespace: "baz",
|
||||
UID: "differentUid",
|
||||
APIVersion: "version",
|
||||
APIVersion: "v1",
|
||||
FieldPath: "spec.containers[3]",
|
||||
},
|
||||
Reason: "Stopped",
|
||||
@@ -301,7 +301,7 @@ func TestEventf(t *testing.T) {
|
||||
Count: 1,
|
||||
Type: v1.EventTypeNormal,
|
||||
},
|
||||
expectLog: `Event(v1.ObjectReference{Kind:"Pod", Namespace:"baz", Name:"foo", UID:"differentUid", APIVersion:"version", ResourceVersion:"", FieldPath:"spec.containers[3]"}): type: 'Normal' reason: 'Stopped' some verbose message: 1`,
|
||||
expectLog: `Event(v1.ObjectReference{Kind:"Pod", Namespace:"baz", Name:"foo", UID:"differentUid", APIVersion:"v1", ResourceVersion:"", FieldPath:"spec.containers[3]"}): type: 'Normal' reason: 'Stopped' some verbose message: 1`,
|
||||
expectUpdate: false,
|
||||
},
|
||||
{
|
||||
@@ -320,7 +320,7 @@ func TestEventf(t *testing.T) {
|
||||
Name: "foo",
|
||||
Namespace: "baz",
|
||||
UID: "differentUid",
|
||||
APIVersion: "version",
|
||||
APIVersion: "v1",
|
||||
FieldPath: "spec.containers[3]",
|
||||
},
|
||||
Reason: "Stopped",
|
||||
@@ -329,7 +329,7 @@ func TestEventf(t *testing.T) {
|
||||
Count: 2,
|
||||
Type: v1.EventTypeNormal,
|
||||
},
|
||||
expectLog: `Event(v1.ObjectReference{Kind:"Pod", Namespace:"baz", Name:"foo", UID:"differentUid", APIVersion:"version", ResourceVersion:"", FieldPath:"spec.containers[3]"}): type: 'Normal' reason: 'Stopped' some verbose message: 1`,
|
||||
expectLog: `Event(v1.ObjectReference{Kind:"Pod", Namespace:"baz", Name:"foo", UID:"differentUid", APIVersion:"v1", ResourceVersion:"", FieldPath:"spec.containers[3]"}): type: 'Normal' reason: 'Stopped' some verbose message: 1`,
|
||||
expectUpdate: true,
|
||||
},
|
||||
}
|
||||
@@ -509,7 +509,7 @@ func TestLotsOfEvents(t *testing.T) {
|
||||
Name: fmt.Sprintf("foo-%v", i),
|
||||
Namespace: "baz",
|
||||
UID: "bar",
|
||||
APIVersion: "version",
|
||||
APIVersion: "v1",
|
||||
}
|
||||
// we need to vary the reason to prevent aggregation
|
||||
go recorder.Eventf(ref, v1.EventTypeNormal, "Reason-"+string(i), strconv.Itoa(i))
|
||||
@@ -567,7 +567,7 @@ func TestEventfNoNamespace(t *testing.T) {
|
||||
Name: "foo",
|
||||
Namespace: "",
|
||||
UID: "bar",
|
||||
APIVersion: "version",
|
||||
APIVersion: "v1",
|
||||
FieldPath: "spec.containers[2]",
|
||||
},
|
||||
Reason: "Started",
|
||||
@@ -576,7 +576,7 @@ func TestEventfNoNamespace(t *testing.T) {
|
||||
Count: 1,
|
||||
Type: v1.EventTypeNormal,
|
||||
},
|
||||
expectLog: `Event(v1.ObjectReference{Kind:"Pod", Namespace:"", Name:"foo", UID:"bar", APIVersion:"version", ResourceVersion:"", FieldPath:"spec.containers[2]"}): type: 'Normal' reason: 'Started' some verbose message: 1`,
|
||||
expectLog: `Event(v1.ObjectReference{Kind:"Pod", Namespace:"", Name:"foo", UID:"bar", APIVersion:"v1", ResourceVersion:"", FieldPath:"spec.containers[2]"}): type: 'Normal' reason: 'Started' some verbose message: 1`,
|
||||
expectUpdate: false,
|
||||
},
|
||||
}
|
||||
@@ -677,7 +677,7 @@ func TestMultiSinkCache(t *testing.T) {
|
||||
Name: "foo",
|
||||
Namespace: "baz",
|
||||
UID: "bar",
|
||||
APIVersion: "version",
|
||||
APIVersion: "v1",
|
||||
FieldPath: "spec.containers[2]",
|
||||
},
|
||||
Reason: "Started",
|
||||
@@ -686,7 +686,7 @@ func TestMultiSinkCache(t *testing.T) {
|
||||
Count: 1,
|
||||
Type: v1.EventTypeNormal,
|
||||
},
|
||||
expectLog: `Event(v1.ObjectReference{Kind:"Pod", Namespace:"baz", Name:"foo", UID:"bar", APIVersion:"version", ResourceVersion:"", FieldPath:"spec.containers[2]"}): type: 'Normal' reason: 'Started' some verbose message: 1`,
|
||||
expectLog: `Event(v1.ObjectReference{Kind:"Pod", Namespace:"baz", Name:"foo", UID:"bar", APIVersion:"v1", ResourceVersion:"", FieldPath:"spec.containers[2]"}): type: 'Normal' reason: 'Started' some verbose message: 1`,
|
||||
expectUpdate: false,
|
||||
},
|
||||
{
|
||||
@@ -705,7 +705,7 @@ func TestMultiSinkCache(t *testing.T) {
|
||||
Name: "foo",
|
||||
Namespace: "baz",
|
||||
UID: "bar",
|
||||
APIVersion: "version",
|
||||
APIVersion: "v1",
|
||||
},
|
||||
Reason: "Killed",
|
||||
Message: "some other verbose message: 1",
|
||||
@@ -713,7 +713,7 @@ func TestMultiSinkCache(t *testing.T) {
|
||||
Count: 1,
|
||||
Type: v1.EventTypeNormal,
|
||||
},
|
||||
expectLog: `Event(v1.ObjectReference{Kind:"Pod", Namespace:"baz", Name:"foo", UID:"bar", APIVersion:"version", ResourceVersion:"", FieldPath:""}): type: 'Normal' reason: 'Killed' some other verbose message: 1`,
|
||||
expectLog: `Event(v1.ObjectReference{Kind:"Pod", Namespace:"baz", Name:"foo", UID:"bar", APIVersion:"v1", ResourceVersion:"", FieldPath:""}): type: 'Normal' reason: 'Killed' some other verbose message: 1`,
|
||||
expectUpdate: false,
|
||||
},
|
||||
{
|
||||
@@ -732,7 +732,7 @@ func TestMultiSinkCache(t *testing.T) {
|
||||
Name: "foo",
|
||||
Namespace: "baz",
|
||||
UID: "bar",
|
||||
APIVersion: "version",
|
||||
APIVersion: "v1",
|
||||
FieldPath: "spec.containers[2]",
|
||||
},
|
||||
Reason: "Started",
|
||||
@@ -741,7 +741,7 @@ func TestMultiSinkCache(t *testing.T) {
|
||||
Count: 2,
|
||||
Type: v1.EventTypeNormal,
|
||||
},
|
||||
expectLog: `Event(v1.ObjectReference{Kind:"Pod", Namespace:"baz", Name:"foo", UID:"bar", APIVersion:"version", ResourceVersion:"", FieldPath:"spec.containers[2]"}): type: 'Normal' reason: 'Started' some verbose message: 1`,
|
||||
expectLog: `Event(v1.ObjectReference{Kind:"Pod", Namespace:"baz", Name:"foo", UID:"bar", APIVersion:"v1", ResourceVersion:"", FieldPath:"spec.containers[2]"}): type: 'Normal' reason: 'Started' some verbose message: 1`,
|
||||
expectUpdate: true,
|
||||
},
|
||||
{
|
||||
@@ -760,7 +760,7 @@ func TestMultiSinkCache(t *testing.T) {
|
||||
Name: "foo",
|
||||
Namespace: "baz",
|
||||
UID: "differentUid",
|
||||
APIVersion: "version",
|
||||
APIVersion: "v1",
|
||||
FieldPath: "spec.containers[3]",
|
||||
},
|
||||
Reason: "Started",
|
||||
@@ -769,7 +769,7 @@ func TestMultiSinkCache(t *testing.T) {
|
||||
Count: 1,
|
||||
Type: v1.EventTypeNormal,
|
||||
},
|
||||
expectLog: `Event(v1.ObjectReference{Kind:"Pod", Namespace:"baz", Name:"foo", UID:"differentUid", APIVersion:"version", ResourceVersion:"", FieldPath:"spec.containers[3]"}): type: 'Normal' reason: 'Started' some verbose message: 1`,
|
||||
expectLog: `Event(v1.ObjectReference{Kind:"Pod", Namespace:"baz", Name:"foo", UID:"differentUid", APIVersion:"v1", ResourceVersion:"", FieldPath:"spec.containers[3]"}): type: 'Normal' reason: 'Started' some verbose message: 1`,
|
||||
expectUpdate: false,
|
||||
},
|
||||
{
|
||||
@@ -788,7 +788,7 @@ func TestMultiSinkCache(t *testing.T) {
|
||||
Name: "foo",
|
||||
Namespace: "baz",
|
||||
UID: "bar",
|
||||
APIVersion: "version",
|
||||
APIVersion: "v1",
|
||||
FieldPath: "spec.containers[2]",
|
||||
},
|
||||
Reason: "Started",
|
||||
@@ -797,7 +797,7 @@ func TestMultiSinkCache(t *testing.T) {
|
||||
Count: 3,
|
||||
Type: v1.EventTypeNormal,
|
||||
},
|
||||
expectLog: `Event(v1.ObjectReference{Kind:"Pod", Namespace:"baz", Name:"foo", UID:"bar", APIVersion:"version", ResourceVersion:"", FieldPath:"spec.containers[2]"}): type: 'Normal' reason: 'Started' some verbose message: 1`,
|
||||
expectLog: `Event(v1.ObjectReference{Kind:"Pod", Namespace:"baz", Name:"foo", UID:"bar", APIVersion:"v1", ResourceVersion:"", FieldPath:"spec.containers[2]"}): type: 'Normal' reason: 'Started' some verbose message: 1`,
|
||||
expectUpdate: true,
|
||||
},
|
||||
{
|
||||
@@ -816,7 +816,7 @@ func TestMultiSinkCache(t *testing.T) {
|
||||
Name: "foo",
|
||||
Namespace: "baz",
|
||||
UID: "differentUid",
|
||||
APIVersion: "version",
|
||||
APIVersion: "v1",
|
||||
FieldPath: "spec.containers[3]",
|
||||
},
|
||||
Reason: "Stopped",
|
||||
@@ -825,7 +825,7 @@ func TestMultiSinkCache(t *testing.T) {
|
||||
Count: 1,
|
||||
Type: v1.EventTypeNormal,
|
||||
},
|
||||
expectLog: `Event(v1.ObjectReference{Kind:"Pod", Namespace:"baz", Name:"foo", UID:"differentUid", APIVersion:"version", ResourceVersion:"", FieldPath:"spec.containers[3]"}): type: 'Normal' reason: 'Stopped' some verbose message: 1`,
|
||||
expectLog: `Event(v1.ObjectReference{Kind:"Pod", Namespace:"baz", Name:"foo", UID:"differentUid", APIVersion:"v1", ResourceVersion:"", FieldPath:"spec.containers[3]"}): type: 'Normal' reason: 'Stopped' some verbose message: 1`,
|
||||
expectUpdate: false,
|
||||
},
|
||||
{
|
||||
@@ -844,7 +844,7 @@ func TestMultiSinkCache(t *testing.T) {
|
||||
Name: "foo",
|
||||
Namespace: "baz",
|
||||
UID: "differentUid",
|
||||
APIVersion: "version",
|
||||
APIVersion: "v1",
|
||||
FieldPath: "spec.containers[3]",
|
||||
},
|
||||
Reason: "Stopped",
|
||||
@@ -853,7 +853,7 @@ func TestMultiSinkCache(t *testing.T) {
|
||||
Count: 2,
|
||||
Type: v1.EventTypeNormal,
|
||||
},
|
||||
expectLog: `Event(v1.ObjectReference{Kind:"Pod", Namespace:"baz", Name:"foo", UID:"differentUid", APIVersion:"version", ResourceVersion:"", FieldPath:"spec.containers[3]"}): type: 'Normal' reason: 'Stopped' some verbose message: 1`,
|
||||
expectLog: `Event(v1.ObjectReference{Kind:"Pod", Namespace:"baz", Name:"foo", UID:"differentUid", APIVersion:"v1", ResourceVersion:"", FieldPath:"spec.containers[3]"}): type: 'Normal' reason: 'Stopped' some verbose message: 1`,
|
||||
expectUpdate: true,
|
||||
},
|
||||
}
|
||||
|
||||
@@ -19,8 +19,6 @@ package reference
|
||||
import (
|
||||
"errors"
|
||||
"fmt"
|
||||
"net/url"
|
||||
"strings"
|
||||
|
||||
"k8s.io/api/core/v1"
|
||||
"k8s.io/apimachinery/pkg/api/meta"
|
||||
@@ -30,8 +28,7 @@ import (
|
||||
|
||||
var (
|
||||
// Errors that could be returned by GetReference.
|
||||
ErrNilObject = errors.New("can't reference a nil object")
|
||||
ErrNoSelfLink = errors.New("selfLink was empty, can't make reference")
|
||||
ErrNilObject = errors.New("can't reference a nil object")
|
||||
)
|
||||
|
||||
// GetReference returns an ObjectReference which refers to the given
|
||||
@@ -47,20 +44,6 @@ func GetReference(scheme *runtime.Scheme, obj runtime.Object) (*v1.ObjectReferen
|
||||
return ref, nil
|
||||
}
|
||||
|
||||
gvk := obj.GetObjectKind().GroupVersionKind()
|
||||
|
||||
// if the object referenced is actually persisted, we can just get kind from meta
|
||||
// if we are building an object reference to something not yet persisted, we should fallback to scheme
|
||||
kind := gvk.Kind
|
||||
if len(kind) == 0 {
|
||||
// TODO: this is wrong
|
||||
gvks, _, err := scheme.ObjectKinds(obj)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
kind = gvks[0].Kind
|
||||
}
|
||||
|
||||
// An object that implements only List has enough metadata to build a reference
|
||||
var listMeta metav1.Common
|
||||
objectMeta, err := meta.Accessor(obj)
|
||||
@@ -73,29 +56,29 @@ func GetReference(scheme *runtime.Scheme, obj runtime.Object) (*v1.ObjectReferen
|
||||
listMeta = objectMeta
|
||||
}
|
||||
|
||||
// if the object referenced is actually persisted, we can also get version from meta
|
||||
version := gvk.GroupVersion().String()
|
||||
if len(version) == 0 {
|
||||
selfLink := listMeta.GetSelfLink()
|
||||
if len(selfLink) == 0 {
|
||||
return nil, ErrNoSelfLink
|
||||
}
|
||||
selfLinkUrl, err := url.Parse(selfLink)
|
||||
gvk := obj.GetObjectKind().GroupVersionKind()
|
||||
|
||||
// If object meta doesn't contain data about kind and/or version,
|
||||
// we are falling back to scheme.
|
||||
//
|
||||
// TODO: This doesn't work for CRDs, which are not registered in scheme.
|
||||
if gvk.Empty() {
|
||||
gvks, _, err := scheme.ObjectKinds(obj)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
// example paths: /<prefix>/<version>/*
|
||||
parts := strings.Split(selfLinkUrl.Path, "/")
|
||||
if len(parts) < 4 {
|
||||
return nil, fmt.Errorf("unexpected self link format: '%v'; got version '%v'", selfLink, version)
|
||||
}
|
||||
if parts[1] == "api" {
|
||||
version = parts[2]
|
||||
} else {
|
||||
version = parts[2] + "/" + parts[3]
|
||||
if len(gvks) == 0 || gvks[0].Empty() {
|
||||
return nil, fmt.Errorf("unexpected gvks registered for object %T: %v", obj, gvks)
|
||||
}
|
||||
// TODO: The same object can be registered for multiple group versions
|
||||
// (although in practise this doesn't seem to be used).
|
||||
// In such case, the version set may not be correct.
|
||||
gvk = gvks[0]
|
||||
}
|
||||
|
||||
kind := gvk.Kind
|
||||
version := gvk.GroupVersion().String()
|
||||
|
||||
// only has list metadata
|
||||
if objectMeta == nil {
|
||||
return &v1.ObjectReference{
|
||||
|
||||
@@ -37,29 +37,31 @@ func TestGetReferenceRefVersion(t *testing.T) {
|
||||
tests := []struct {
|
||||
name string
|
||||
input *TestRuntimeObj
|
||||
groupVersion schema.GroupVersion
|
||||
expectedRefVersion string
|
||||
}{
|
||||
{
|
||||
name: "api from selflink",
|
||||
name: "v1 GV from scheme",
|
||||
input: &TestRuntimeObj{
|
||||
ObjectMeta: metav1.ObjectMeta{SelfLink: "/api/v1/namespaces"},
|
||||
ObjectMeta: metav1.ObjectMeta{SelfLink: "/bad-selflink/unused"},
|
||||
},
|
||||
groupVersion: schema.GroupVersion{Group: "", Version: "v1"},
|
||||
expectedRefVersion: "v1",
|
||||
},
|
||||
{
|
||||
name: "foo.group/v3 from selflink",
|
||||
name: "foo.group/v3 GV from scheme",
|
||||
input: &TestRuntimeObj{
|
||||
ObjectMeta: metav1.ObjectMeta{SelfLink: "/apis/foo.group/v3/namespaces"},
|
||||
ObjectMeta: metav1.ObjectMeta{SelfLink: "/bad-selflink/unused"},
|
||||
},
|
||||
groupVersion: schema.GroupVersion{Group: "foo.group", Version: "v3"},
|
||||
expectedRefVersion: "foo.group/v3",
|
||||
},
|
||||
}
|
||||
|
||||
scheme := runtime.NewScheme()
|
||||
scheme.AddKnownTypes(schema.GroupVersion{Group: "this", Version: "is ignored"}, &TestRuntimeObj{})
|
||||
|
||||
for _, test := range tests {
|
||||
t.Run(test.name, func(t *testing.T) {
|
||||
scheme := runtime.NewScheme()
|
||||
scheme.AddKnownTypes(test.groupVersion, &TestRuntimeObj{})
|
||||
ref, err := GetReference(scheme, test.input)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
|
||||
@@ -18,42 +18,86 @@ package watch
|
||||
|
||||
import (
|
||||
"sync"
|
||||
"sync/atomic"
|
||||
|
||||
"k8s.io/apimachinery/pkg/runtime"
|
||||
"k8s.io/apimachinery/pkg/watch"
|
||||
"k8s.io/client-go/tools/cache"
|
||||
)
|
||||
|
||||
func newTicketer() *ticketer {
|
||||
return &ticketer{
|
||||
func newEventProcessor(out chan<- watch.Event) *eventProcessor {
|
||||
return &eventProcessor{
|
||||
out: out,
|
||||
cond: sync.NewCond(&sync.Mutex{}),
|
||||
done: make(chan struct{}),
|
||||
}
|
||||
}
|
||||
|
||||
type ticketer struct {
|
||||
counter uint64
|
||||
// eventProcessor buffers events and writes them to an out chan when a reader
|
||||
// is waiting. Because of the requirement to buffer events, it synchronizes
|
||||
// input with a condition, and synchronizes output with a channels. It needs to
|
||||
// be able to yield while both waiting on an input condition and while blocked
|
||||
// on writing to the output channel.
|
||||
type eventProcessor struct {
|
||||
out chan<- watch.Event
|
||||
|
||||
cond *sync.Cond
|
||||
current uint64
|
||||
cond *sync.Cond
|
||||
buff []watch.Event
|
||||
|
||||
done chan struct{}
|
||||
}
|
||||
|
||||
func (t *ticketer) GetTicket() uint64 {
|
||||
// -1 to start from 0
|
||||
return atomic.AddUint64(&t.counter, 1) - 1
|
||||
func (e *eventProcessor) run() {
|
||||
for {
|
||||
batch := e.takeBatch()
|
||||
e.writeBatch(batch)
|
||||
if e.stopped() {
|
||||
return
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (t *ticketer) WaitForTicket(ticket uint64, f func()) {
|
||||
t.cond.L.Lock()
|
||||
defer t.cond.L.Unlock()
|
||||
for ticket != t.current {
|
||||
t.cond.Wait()
|
||||
func (e *eventProcessor) takeBatch() []watch.Event {
|
||||
e.cond.L.Lock()
|
||||
defer e.cond.L.Unlock()
|
||||
|
||||
for len(e.buff) == 0 && !e.stopped() {
|
||||
e.cond.Wait()
|
||||
}
|
||||
|
||||
f()
|
||||
batch := e.buff
|
||||
e.buff = nil
|
||||
return batch
|
||||
}
|
||||
|
||||
t.current++
|
||||
t.cond.Broadcast()
|
||||
func (e *eventProcessor) writeBatch(events []watch.Event) {
|
||||
for _, event := range events {
|
||||
select {
|
||||
case e.out <- event:
|
||||
case <-e.done:
|
||||
return
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (e *eventProcessor) push(event watch.Event) {
|
||||
e.cond.L.Lock()
|
||||
defer e.cond.L.Unlock()
|
||||
defer e.cond.Signal()
|
||||
e.buff = append(e.buff, event)
|
||||
}
|
||||
|
||||
func (e *eventProcessor) stopped() bool {
|
||||
select {
|
||||
case <-e.done:
|
||||
return true
|
||||
default:
|
||||
return false
|
||||
}
|
||||
}
|
||||
|
||||
func (e *eventProcessor) stop() {
|
||||
close(e.done)
|
||||
e.cond.Signal()
|
||||
}
|
||||
|
||||
// NewIndexerInformerWatcher will create an IndexerInformer and wrap it into watch.Interface
|
||||
@@ -61,55 +105,44 @@ func (t *ticketer) WaitForTicket(ticket uint64, f func()) {
|
||||
// it also returns a channel you can use to wait for the informers to fully shutdown.
|
||||
func NewIndexerInformerWatcher(lw cache.ListerWatcher, objType runtime.Object) (cache.Indexer, cache.Controller, watch.Interface, <-chan struct{}) {
|
||||
ch := make(chan watch.Event)
|
||||
doneCh := make(chan struct{})
|
||||
w := watch.NewProxyWatcher(ch)
|
||||
t := newTicketer()
|
||||
e := newEventProcessor(ch)
|
||||
|
||||
indexer, informer := cache.NewIndexerInformer(lw, objType, 0, cache.ResourceEventHandlerFuncs{
|
||||
AddFunc: func(obj interface{}) {
|
||||
go t.WaitForTicket(t.GetTicket(), func() {
|
||||
select {
|
||||
case ch <- watch.Event{
|
||||
Type: watch.Added,
|
||||
Object: obj.(runtime.Object),
|
||||
}:
|
||||
case <-w.StopChan():
|
||||
}
|
||||
e.push(watch.Event{
|
||||
Type: watch.Added,
|
||||
Object: obj.(runtime.Object),
|
||||
})
|
||||
},
|
||||
UpdateFunc: func(old, new interface{}) {
|
||||
go t.WaitForTicket(t.GetTicket(), func() {
|
||||
select {
|
||||
case ch <- watch.Event{
|
||||
Type: watch.Modified,
|
||||
Object: new.(runtime.Object),
|
||||
}:
|
||||
case <-w.StopChan():
|
||||
}
|
||||
e.push(watch.Event{
|
||||
Type: watch.Modified,
|
||||
Object: new.(runtime.Object),
|
||||
})
|
||||
},
|
||||
DeleteFunc: func(obj interface{}) {
|
||||
go t.WaitForTicket(t.GetTicket(), func() {
|
||||
staleObj, stale := obj.(cache.DeletedFinalStateUnknown)
|
||||
if stale {
|
||||
// We have no means of passing the additional information down using watch API based on watch.Event
|
||||
// but the caller can filter such objects by checking if metadata.deletionTimestamp is set
|
||||
obj = staleObj
|
||||
}
|
||||
staleObj, stale := obj.(cache.DeletedFinalStateUnknown)
|
||||
if stale {
|
||||
// We have no means of passing the additional information down using
|
||||
// watch API based on watch.Event but the caller can filter such
|
||||
// objects by checking if metadata.deletionTimestamp is set
|
||||
obj = staleObj
|
||||
}
|
||||
|
||||
select {
|
||||
case ch <- watch.Event{
|
||||
Type: watch.Deleted,
|
||||
Object: obj.(runtime.Object),
|
||||
}:
|
||||
case <-w.StopChan():
|
||||
}
|
||||
e.push(watch.Event{
|
||||
Type: watch.Deleted,
|
||||
Object: obj.(runtime.Object),
|
||||
})
|
||||
},
|
||||
}, cache.Indexers{})
|
||||
|
||||
go e.run()
|
||||
|
||||
doneCh := make(chan struct{})
|
||||
go func() {
|
||||
defer close(doneCh)
|
||||
defer e.stop()
|
||||
informer.Run(w.StopChan())
|
||||
}()
|
||||
|
||||
|
||||
@@ -17,8 +17,9 @@ limitations under the License.
|
||||
package watch
|
||||
|
||||
import (
|
||||
"math/rand"
|
||||
"context"
|
||||
"reflect"
|
||||
goruntime "runtime"
|
||||
"sort"
|
||||
"testing"
|
||||
"time"
|
||||
@@ -28,6 +29,7 @@ import (
|
||||
corev1 "k8s.io/api/core/v1"
|
||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||
"k8s.io/apimachinery/pkg/runtime"
|
||||
"k8s.io/apimachinery/pkg/runtime/schema"
|
||||
"k8s.io/apimachinery/pkg/util/diff"
|
||||
"k8s.io/apimachinery/pkg/watch"
|
||||
fakeclientset "k8s.io/client-go/kubernetes/fake"
|
||||
@@ -35,6 +37,86 @@ import (
|
||||
"k8s.io/client-go/tools/cache"
|
||||
)
|
||||
|
||||
// TestEventProcessorExit is expected to timeout if the event processor fails
|
||||
// to exit when stopped.
|
||||
func TestEventProcessorExit(t *testing.T) {
|
||||
event := watch.Event{}
|
||||
|
||||
tests := []struct {
|
||||
name string
|
||||
write func(e *eventProcessor)
|
||||
}{
|
||||
{
|
||||
name: "exit on blocked read",
|
||||
write: func(e *eventProcessor) {
|
||||
e.push(event)
|
||||
},
|
||||
},
|
||||
{
|
||||
name: "exit on blocked write",
|
||||
write: func(e *eventProcessor) {
|
||||
e.push(event)
|
||||
e.push(event)
|
||||
},
|
||||
},
|
||||
}
|
||||
for _, test := range tests {
|
||||
t.Run(test.name, func(t *testing.T) {
|
||||
out := make(chan watch.Event)
|
||||
e := newEventProcessor(out)
|
||||
|
||||
test.write(e)
|
||||
|
||||
exited := make(chan struct{})
|
||||
go func() {
|
||||
e.run()
|
||||
close(exited)
|
||||
}()
|
||||
|
||||
<-out
|
||||
e.stop()
|
||||
goruntime.Gosched()
|
||||
<-exited
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
type apiInt int
|
||||
|
||||
func (apiInt) GetObjectKind() schema.ObjectKind { return nil }
|
||||
func (apiInt) DeepCopyObject() runtime.Object { return nil }
|
||||
|
||||
func TestEventProcessorOrdersEvents(t *testing.T) {
|
||||
out := make(chan watch.Event)
|
||||
e := newEventProcessor(out)
|
||||
go e.run()
|
||||
|
||||
numProcessed := 0
|
||||
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
|
||||
go func() {
|
||||
for i := 0; i < 1000; i++ {
|
||||
e := <-out
|
||||
if got, want := int(e.Object.(apiInt)), i; got != want {
|
||||
t.Errorf("unexpected event: got=%d, want=%d", got, want)
|
||||
}
|
||||
numProcessed++
|
||||
}
|
||||
cancel()
|
||||
}()
|
||||
|
||||
for i := 0; i < 1000; i++ {
|
||||
e.push(watch.Event{Object: apiInt(i)})
|
||||
}
|
||||
|
||||
<-ctx.Done()
|
||||
e.stop()
|
||||
|
||||
if numProcessed != 1000 {
|
||||
t.Errorf("unexpected number of events processed: %d", numProcessed)
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
type byEventTypeAndName []watch.Event
|
||||
|
||||
func (a byEventTypeAndName) Len() int { return len(a) }
|
||||
@@ -51,44 +133,6 @@ func (a byEventTypeAndName) Less(i, j int) bool {
|
||||
return a[i].Object.(*corev1.Secret).Name < a[j].Object.(*corev1.Secret).Name
|
||||
}
|
||||
|
||||
func TestTicketer(t *testing.T) {
|
||||
tg := newTicketer()
|
||||
|
||||
const numTickets = 100 // current golang limit for race detector is 8192 simultaneously alive goroutines
|
||||
var tickets []uint64
|
||||
for i := 0; i < numTickets; i++ {
|
||||
ticket := tg.GetTicket()
|
||||
tickets = append(tickets, ticket)
|
||||
|
||||
exp, got := uint64(i), ticket
|
||||
if got != exp {
|
||||
t.Fatalf("expected ticket %d, got %d", exp, got)
|
||||
}
|
||||
}
|
||||
|
||||
// shuffle tickets
|
||||
rand.Shuffle(len(tickets), func(i, j int) {
|
||||
tickets[i], tickets[j] = tickets[j], tickets[i]
|
||||
})
|
||||
|
||||
res := make(chan uint64, len(tickets))
|
||||
for _, ticket := range tickets {
|
||||
go func(ticket uint64) {
|
||||
time.Sleep(time.Duration(rand.Intn(50)) * time.Millisecond)
|
||||
tg.WaitForTicket(ticket, func() {
|
||||
res <- ticket
|
||||
})
|
||||
}(ticket)
|
||||
}
|
||||
|
||||
for i := 0; i < numTickets; i++ {
|
||||
exp, got := uint64(i), <-res
|
||||
if got != exp {
|
||||
t.Fatalf("expected ticket %d, got %d", exp, got)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func TestNewInformerWatcher(t *testing.T) {
|
||||
// Make sure there are no 2 same types of events on a secret with the same name or that might be flaky.
|
||||
tt := []struct {
|
||||
|
||||
@@ -17,6 +17,7 @@ limitations under the License.
|
||||
package certificate
|
||||
|
||||
import (
|
||||
"context"
|
||||
"crypto/ecdsa"
|
||||
"crypto/elliptic"
|
||||
cryptorand "crypto/rand"
|
||||
@@ -148,9 +149,13 @@ type CSRClientFunc func(current *tls.Certificate) (certificatesclient.Certificat
|
||||
func (e *NoCertKeyError) Error() string { return string(*e) }
|
||||
|
||||
type manager struct {
|
||||
getTemplate func() *x509.CertificateRequest
|
||||
lastRequestLock sync.Mutex
|
||||
lastRequest *x509.CertificateRequest
|
||||
getTemplate func() *x509.CertificateRequest
|
||||
|
||||
// lastRequestLock guards lastRequestCancel and lastRequest
|
||||
lastRequestLock sync.Mutex
|
||||
lastRequestCancel context.CancelFunc
|
||||
lastRequest *x509.CertificateRequest
|
||||
|
||||
dynamicTemplate bool
|
||||
usages []certificates.KeyUsage
|
||||
forceRotation bool
|
||||
@@ -261,7 +266,8 @@ func (m *manager) Start() {
|
||||
case <-timer.C:
|
||||
// unblock when deadline expires
|
||||
case <-templateChanged:
|
||||
if reflect.DeepEqual(m.getLastRequest(), m.getTemplate()) {
|
||||
_, lastRequestTemplate := m.getLastRequest()
|
||||
if reflect.DeepEqual(lastRequestTemplate, m.getTemplate()) {
|
||||
// if the template now matches what we last requested, restart the rotation deadline loop
|
||||
return
|
||||
}
|
||||
@@ -289,10 +295,19 @@ func (m *manager) Start() {
|
||||
if m.dynamicTemplate {
|
||||
go wait.Until(func() {
|
||||
// check if the current template matches what we last requested
|
||||
if !m.certSatisfiesTemplate() && !reflect.DeepEqual(m.getLastRequest(), m.getTemplate()) {
|
||||
lastRequestCancel, lastRequestTemplate := m.getLastRequest()
|
||||
|
||||
if !m.certSatisfiesTemplate() && !reflect.DeepEqual(lastRequestTemplate, m.getTemplate()) {
|
||||
// if the template is different, queue up an interrupt of the rotation deadline loop.
|
||||
// if we've requested a CSR that matches the new template by the time the interrupt is handled, the interrupt is disregarded.
|
||||
templateChanged <- struct{}{}
|
||||
if lastRequestCancel != nil {
|
||||
// if we're currently waiting on a submitted request that no longer matches what we want, stop waiting
|
||||
lastRequestCancel()
|
||||
}
|
||||
select {
|
||||
case templateChanged <- struct{}{}:
|
||||
case <-m.stopCh:
|
||||
}
|
||||
}
|
||||
}, time.Second, m.stopCh)
|
||||
}
|
||||
@@ -386,12 +401,15 @@ func (m *manager) rotateCerts() (bool, error) {
|
||||
return false, m.updateServerError(err)
|
||||
}
|
||||
|
||||
ctx, cancel := context.WithTimeout(context.Background(), certificateWaitTimeout)
|
||||
defer cancel()
|
||||
|
||||
// Once we've successfully submitted a CSR for this template, record that we did so
|
||||
m.setLastRequest(template)
|
||||
m.setLastRequest(cancel, template)
|
||||
|
||||
// Wait for the certificate to be signed. This interface and internal timout
|
||||
// is a remainder after the old design using raw watch wrapped with backoff.
|
||||
crtPEM, err := csr.WaitForCertificate(client, req, certificateWaitTimeout)
|
||||
crtPEM, err := csr.WaitForCertificate(ctx, client, req)
|
||||
if err != nil {
|
||||
utilruntime.HandleError(fmt.Errorf("Certificate request was not signed: %v", err))
|
||||
return false, nil
|
||||
@@ -561,14 +579,15 @@ func (m *manager) generateCSR() (template *x509.CertificateRequest, csrPEM []byt
|
||||
return template, csrPEM, keyPEM, privateKey, nil
|
||||
}
|
||||
|
||||
func (m *manager) getLastRequest() *x509.CertificateRequest {
|
||||
func (m *manager) getLastRequest() (context.CancelFunc, *x509.CertificateRequest) {
|
||||
m.lastRequestLock.Lock()
|
||||
defer m.lastRequestLock.Unlock()
|
||||
return m.lastRequest
|
||||
return m.lastRequestCancel, m.lastRequest
|
||||
}
|
||||
|
||||
func (m *manager) setLastRequest(r *x509.CertificateRequest) {
|
||||
func (m *manager) setLastRequest(cancel context.CancelFunc, r *x509.CertificateRequest) {
|
||||
m.lastRequestLock.Lock()
|
||||
defer m.lastRequestLock.Unlock()
|
||||
m.lastRequestCancel = cancel
|
||||
m.lastRequest = r
|
||||
}
|
||||
|
||||
@@ -82,7 +82,7 @@ func RequestCertificate(client certificatesclient.CertificateSigningRequestInter
|
||||
}
|
||||
|
||||
// WaitForCertificate waits for a certificate to be issued until timeout, or returns an error.
|
||||
func WaitForCertificate(client certificatesclient.CertificateSigningRequestInterface, req *certificates.CertificateSigningRequest, timeout time.Duration) (certData []byte, err error) {
|
||||
func WaitForCertificate(ctx context.Context, client certificatesclient.CertificateSigningRequestInterface, req *certificates.CertificateSigningRequest) (certData []byte, err error) {
|
||||
fieldSelector := fields.OneTermEqualSelector("metadata.name", req.Name).String()
|
||||
lw := &cache.ListWatch{
|
||||
ListFunc: func(options metav1.ListOptions) (runtime.Object, error) {
|
||||
@@ -94,8 +94,6 @@ func WaitForCertificate(client certificatesclient.CertificateSigningRequestInter
|
||||
return client.Watch(options)
|
||||
},
|
||||
}
|
||||
ctx, cancel := watchtools.ContextWithOptionalTimeout(context.Background(), timeout)
|
||||
defer cancel()
|
||||
event, err := watchtools.UntilWithSync(
|
||||
ctx,
|
||||
lw,
|
||||
|
||||
@@ -18,6 +18,7 @@ package workqueue
|
||||
|
||||
import (
|
||||
"container/heap"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"k8s.io/apimachinery/pkg/util/clock"
|
||||
@@ -66,6 +67,8 @@ type delayingType struct {
|
||||
|
||||
// stopCh lets us signal a shutdown to the waiting loop
|
||||
stopCh chan struct{}
|
||||
// stopOnce guarantees we only signal shutdown a single time
|
||||
stopOnce sync.Once
|
||||
|
||||
// heartbeat ensures we wait no more than maxWait before firing
|
||||
heartbeat clock.Ticker
|
||||
@@ -133,11 +136,14 @@ func (pq waitForPriorityQueue) Peek() interface{} {
|
||||
return pq[0]
|
||||
}
|
||||
|
||||
// ShutDown gives a way to shut off this queue
|
||||
// ShutDown stops the queue. After the queue drains, the returned shutdown bool
|
||||
// on Get() will be true. This method may be invoked more than once.
|
||||
func (q *delayingType) ShutDown() {
|
||||
q.Interface.ShutDown()
|
||||
close(q.stopCh)
|
||||
q.heartbeat.Stop()
|
||||
q.stopOnce.Do(func() {
|
||||
q.Interface.ShutDown()
|
||||
close(q.stopCh)
|
||||
q.heartbeat.Stop()
|
||||
})
|
||||
}
|
||||
|
||||
// AddAfter adds the given item to the work queue after the given delay
|
||||
|
||||
Reference in New Issue
Block a user