mirror of
https://github.com/kubernetes/client-go.git
synced 2026-06-09 18:55:26 +00:00
Compare commits
38 Commits
v0.32.0-al
...
kubernetes
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
1e90b7a36f | ||
|
|
1d2c64df00 | ||
|
|
0810080103 | ||
|
|
0976f4afe2 | ||
|
|
0673906c42 | ||
|
|
a2e13ccd49 | ||
|
|
127c8ce629 | ||
|
|
26011c05ab | ||
|
|
9bcc910985 | ||
|
|
3ed0010e30 | ||
|
|
c0015c0cc1 | ||
|
|
22ef81a898 | ||
|
|
7d76947ccd | ||
|
|
7615511cc0 | ||
|
|
a99ac36e1a | ||
|
|
424bde0e01 | ||
|
|
958c03a8dd | ||
|
|
9a62005bf9 | ||
|
|
d64f019211 | ||
|
|
73e7057101 | ||
|
|
2c91b666c2 | ||
|
|
94836903b2 | ||
|
|
4ea001dc73 | ||
|
|
f7ff47eb37 | ||
|
|
45ef4fc0dc | ||
|
|
c428844852 | ||
|
|
a0a9a3744f | ||
|
|
fa64ba9235 | ||
|
|
ed62839de3 | ||
|
|
85e546b378 | ||
|
|
465fab1523 | ||
|
|
c076c6e063 | ||
|
|
171f146b90 | ||
|
|
98cbc2c6ac | ||
|
|
501db86fbe | ||
|
|
ce329ac04e | ||
|
|
dd7f3ad83f | ||
|
|
ba984c792f |
10
Godeps/Godeps.json
generated
10
Godeps/Godeps.json
generated
@@ -54,6 +54,10 @@
|
||||
"ImportPath": "github.com/google/btree",
|
||||
"Rev": "7d79101e329e"
|
||||
},
|
||||
{
|
||||
"ImportPath": "github.com/google/go-cmp",
|
||||
"Rev": "v0.3.0"
|
||||
},
|
||||
{
|
||||
"ImportPath": "github.com/google/gofuzz",
|
||||
"Rev": "24818f796faf"
|
||||
@@ -184,15 +188,15 @@
|
||||
},
|
||||
{
|
||||
"ImportPath": "k8s.io/api",
|
||||
"Rev": "e63b5755afac"
|
||||
"Rev": "e0137a30be7f"
|
||||
},
|
||||
{
|
||||
"ImportPath": "k8s.io/apimachinery",
|
||||
"Rev": "ad85901afca0"
|
||||
"Rev": "587ae2000094"
|
||||
},
|
||||
{
|
||||
"ImportPath": "k8s.io/klog",
|
||||
"Rev": "v0.3.0"
|
||||
"Rev": "v0.3.1"
|
||||
},
|
||||
{
|
||||
"ImportPath": "k8s.io/kube-openapi",
|
||||
|
||||
@@ -172,7 +172,7 @@ func (d *CachedDiscoveryClient) getCachedFile(filename string) ([]byte, error) {
|
||||
}
|
||||
|
||||
func (d *CachedDiscoveryClient) writeCachedFile(filename string, obj runtime.Object) error {
|
||||
if err := os.MkdirAll(filepath.Dir(filename), 0755); err != nil {
|
||||
if err := os.MkdirAll(filepath.Dir(filename), 0750); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
@@ -191,7 +191,7 @@ func (d *CachedDiscoveryClient) writeCachedFile(filename string, obj runtime.Obj
|
||||
return err
|
||||
}
|
||||
|
||||
err = os.Chmod(f.Name(), 0755)
|
||||
err = os.Chmod(f.Name(), 0660)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
@@ -19,6 +19,7 @@ package disk
|
||||
import (
|
||||
"io/ioutil"
|
||||
"os"
|
||||
"path/filepath"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
@@ -96,6 +97,32 @@ func TestNewCachedDiscoveryClient_TTL(t *testing.T) {
|
||||
assert.Equal(c.groupCalls, 2)
|
||||
}
|
||||
|
||||
func TestNewCachedDiscoveryClient_PathPerm(t *testing.T) {
|
||||
assert := assert.New(t)
|
||||
|
||||
d, err := ioutil.TempDir("", "")
|
||||
assert.NoError(err)
|
||||
os.RemoveAll(d)
|
||||
defer os.RemoveAll(d)
|
||||
|
||||
c := fakeDiscoveryClient{}
|
||||
cdc := newCachedDiscoveryClient(&c, d, 1*time.Nanosecond)
|
||||
cdc.ServerGroups()
|
||||
|
||||
err = filepath.Walk(d, func(path string, info os.FileInfo, err error) error {
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if info.IsDir() {
|
||||
assert.Equal(os.FileMode(0750), info.Mode().Perm())
|
||||
} else {
|
||||
assert.Equal(os.FileMode(0660), info.Mode().Perm())
|
||||
}
|
||||
return nil
|
||||
})
|
||||
assert.NoError(err)
|
||||
}
|
||||
|
||||
type fakeDiscoveryClient struct {
|
||||
groupCalls int
|
||||
resourceCalls int
|
||||
|
||||
@@ -18,6 +18,7 @@ package disk
|
||||
|
||||
import (
|
||||
"net/http"
|
||||
"os"
|
||||
"path/filepath"
|
||||
|
||||
"github.com/gregjones/httpcache"
|
||||
@@ -35,6 +36,8 @@ type cacheRoundTripper struct {
|
||||
// corresponding requests.
|
||||
func newCacheRoundTripper(cacheDir string, rt http.RoundTripper) http.RoundTripper {
|
||||
d := diskv.New(diskv.Options{
|
||||
PathPerm: os.FileMode(0750),
|
||||
FilePerm: os.FileMode(0660),
|
||||
BasePath: cacheDir,
|
||||
TempDir: filepath.Join(cacheDir, ".diskv-temp"),
|
||||
})
|
||||
|
||||
@@ -22,7 +22,10 @@ import (
|
||||
"net/http"
|
||||
"net/url"
|
||||
"os"
|
||||
"path/filepath"
|
||||
"testing"
|
||||
|
||||
"github.com/stretchr/testify/assert"
|
||||
)
|
||||
|
||||
// copied from k8s.io/client-go/transport/round_trippers_test.go
|
||||
@@ -93,3 +96,52 @@ func TestCacheRoundTripper(t *testing.T) {
|
||||
t.Errorf("Invalid content read from cache %q", string(content))
|
||||
}
|
||||
}
|
||||
|
||||
func TestCacheRoundTripperPathPerm(t *testing.T) {
|
||||
assert := assert.New(t)
|
||||
|
||||
rt := &testRoundTripper{}
|
||||
cacheDir, err := ioutil.TempDir("", "cache-rt")
|
||||
os.RemoveAll(cacheDir)
|
||||
defer os.RemoveAll(cacheDir)
|
||||
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
cache := newCacheRoundTripper(cacheDir, rt)
|
||||
|
||||
// First call, caches the response
|
||||
req := &http.Request{
|
||||
Method: http.MethodGet,
|
||||
URL: &url.URL{Host: "localhost"},
|
||||
}
|
||||
rt.Response = &http.Response{
|
||||
Header: http.Header{"ETag": []string{`"123456"`}},
|
||||
Body: ioutil.NopCloser(bytes.NewReader([]byte("Content"))),
|
||||
StatusCode: http.StatusOK,
|
||||
}
|
||||
resp, err := cache.RoundTrip(req)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
content, err := ioutil.ReadAll(resp.Body)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
if string(content) != "Content" {
|
||||
t.Errorf(`Expected Body to be "Content", got %q`, string(content))
|
||||
}
|
||||
|
||||
err = filepath.Walk(cacheDir, func(path string, info os.FileInfo, err error) error {
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if info.IsDir() {
|
||||
assert.Equal(os.FileMode(0750), info.Mode().Perm())
|
||||
} else {
|
||||
assert.Equal(os.FileMode(0660), info.Mode().Perm())
|
||||
}
|
||||
return nil
|
||||
})
|
||||
assert.NoError(err)
|
||||
}
|
||||
|
||||
@@ -42,7 +42,7 @@ func NewFilteredDynamicSharedInformerFactory(client dynamic.Interface, defaultRe
|
||||
return &dynamicSharedInformerFactory{
|
||||
client: client,
|
||||
defaultResync: defaultResync,
|
||||
namespace: metav1.NamespaceAll,
|
||||
namespace: namespace,
|
||||
informers: map[schema.GroupVersionResource]informers.GenericInformer{},
|
||||
startedInformers: make(map[schema.GroupVersionResource]bool),
|
||||
tweakListOptions: tweakListOptions,
|
||||
|
||||
10
go.mod
10
go.mod
@@ -26,9 +26,9 @@ require (
|
||||
golang.org/x/oauth2 v0.0.0-20190402181905-9f3314589c9a
|
||||
golang.org/x/time v0.0.0-20161028155119-f51c12702a4d
|
||||
google.golang.org/appengine v1.5.0 // indirect
|
||||
k8s.io/api v0.0.0-20190511023547-e63b5755afac
|
||||
k8s.io/apimachinery v0.0.0-20190511023455-ad85901afca0
|
||||
k8s.io/klog v0.3.0
|
||||
k8s.io/api v0.0.0-20190620085005-e0137a30be7f
|
||||
k8s.io/apimachinery v0.0.0-20190528154421-587ae2000094
|
||||
k8s.io/klog v0.3.1
|
||||
k8s.io/utils v0.0.0-20190221042446-c2654d5206da
|
||||
sigs.k8s.io/yaml v1.1.0
|
||||
)
|
||||
@@ -37,6 +37,6 @@ replace (
|
||||
golang.org/x/sync => golang.org/x/sync v0.0.0-20181108010431-42b317875d0f
|
||||
golang.org/x/sys => golang.org/x/sys v0.0.0-20190209173611-3b5209105503
|
||||
golang.org/x/tools => golang.org/x/tools v0.0.0-20190313210603-aa82965741a9
|
||||
k8s.io/api => k8s.io/api v0.0.0-20190511023547-e63b5755afac
|
||||
k8s.io/apimachinery => k8s.io/apimachinery v0.0.0-20190511023455-ad85901afca0
|
||||
k8s.io/api => k8s.io/api v0.0.0-20190620085005-e0137a30be7f
|
||||
k8s.io/apimachinery => k8s.io/apimachinery v0.0.0-20190528154421-587ae2000094
|
||||
)
|
||||
|
||||
12
go.sum
12
go.sum
@@ -22,8 +22,11 @@ github.com/golang/protobuf v1.2.0 h1:P3YflyNX/ehuJFLhxviNdFxQPkGK5cDcApsge1SqnvM
|
||||
github.com/golang/protobuf v1.2.0/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U=
|
||||
github.com/google/btree v0.0.0-20160524151835-7d79101e329e h1:JHB7F/4TJCrYBW8+GZO8VkWDj1jxcWuCl6uxKODiyi4=
|
||||
github.com/google/btree v0.0.0-20160524151835-7d79101e329e/go.mod h1:lNA+9X1NB3Zf8V7Ke586lFgjr2dZNuvo3lPJSGZ5JPQ=
|
||||
github.com/google/go-cmp v0.3.0 h1:crn/baboCvb5fXaQ0IJ1SGTsTVrWpDsCWC8EGETZijY=
|
||||
github.com/google/go-cmp v0.3.0/go.mod h1:8QqcDgzrUqlUb/G2PQTWiueGozuR1884gddMywk6iLU=
|
||||
github.com/google/gofuzz v0.0.0-20170612174753-24818f796faf h1:+RRA9JqSOZFfKrOeqr2z77+8R2RKyh8PG66dcu1V0ck=
|
||||
github.com/google/gofuzz v0.0.0-20170612174753-24818f796faf/go.mod h1:HP5RmnzzSNb993RKQDq4+1A4ia9nllfqcQFTQJedwGI=
|
||||
github.com/google/uuid v1.0.0 h1:b4Gk+7WdP/d3HZH8EJsZpvV7EtDOgaZLtnaNGIu1adA=
|
||||
github.com/google/uuid v1.0.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
|
||||
github.com/googleapis/gnostic v0.0.0-20170729233727-0c5108395e2d h1:7XGaL1e6bYS1yIonGp9761ExpPPV1ui0SAC59Yube9k=
|
||||
github.com/googleapis/gnostic v0.0.0-20170729233727-0c5108395e2d/go.mod h1:sJBsCZ4ayReDTBIg8b9dl28c5xFWyhBTVRp3pOg5EKY=
|
||||
@@ -88,10 +91,11 @@ gopkg.in/tomb.v1 v1.0.0-20141024135613-dd632973f1e7 h1:uRGJdciOHaEIrze2W8Q3AKkep
|
||||
gopkg.in/tomb.v1 v1.0.0-20141024135613-dd632973f1e7/go.mod h1:dt/ZhP58zS4L8KSrWDmTeBkI65Dw0HsyUHuEVlX15mw=
|
||||
gopkg.in/yaml.v2 v2.2.1 h1:mUhvW9EsL+naU5Q3cakzfE91YhliOondGd6ZrsDBHQE=
|
||||
gopkg.in/yaml.v2 v2.2.1/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
|
||||
k8s.io/api v0.0.0-20190511023547-e63b5755afac/go.mod h1:f265Ep4XvHtyggfaaNhtP0AtAFilBaXq2fPjm5kYxwI=
|
||||
k8s.io/apimachinery v0.0.0-20190511023455-ad85901afca0/go.mod h1:5CBnzrKYGHzv9ZsSKmQ8wHt4XI4/TUBPDwYM9FlZMyw=
|
||||
k8s.io/klog v0.3.0 h1:0VPpR+sizsiivjIfIAQH/rl8tan6jvWkS7lU+0di3lE=
|
||||
k8s.io/klog v0.3.0/go.mod h1:Gq+BEi5rUBO/HRz0bTSXDUcqjScdoY3a9IHpCEIOOfk=
|
||||
k8s.io/api v0.0.0-20190620085005-e0137a30be7f/go.mod h1:kSN4o7sxlIvlMOEe8JbQDVmC4xvMyyK0eQpOLHALC8U=
|
||||
k8s.io/apimachinery v0.0.0-20190528154421-587ae2000094 h1:xcudKBvgY20fHRibq7eY2Uy+whngBZbhCtAW5hnD9Lg=
|
||||
k8s.io/apimachinery v0.0.0-20190528154421-587ae2000094/go.mod h1:I4A+glKBHiTgiEjQiCCQfCAIcIMFGt291SmsvcrFzJA=
|
||||
k8s.io/klog v0.3.1 h1:RVgyDHY/kFKtLqh67NvEWIgkMneNoIrdkN0CxDSQc68=
|
||||
k8s.io/klog v0.3.1/go.mod h1:Gq+BEi5rUBO/HRz0bTSXDUcqjScdoY3a9IHpCEIOOfk=
|
||||
k8s.io/kube-openapi v0.0.0-20190228160746-b3a7cee44a30 h1:TRb4wNWoBVrH9plmkp2q86FIDppkbrEXdXlxU3a3BMI=
|
||||
k8s.io/kube-openapi v0.0.0-20190228160746-b3a7cee44a30/go.mod h1:BXM9ceUBTj2QnfH2MK1odQs778ajze1RxcmP6S8RVVc=
|
||||
k8s.io/utils v0.0.0-20190221042446-c2654d5206da h1:ElyM7RPonbKnQqOcw7dG2IK5uvQQn3b/WPHqD5mBvP4=
|
||||
|
||||
@@ -74,9 +74,10 @@ func (c *Config) TransportConfig() (*transport.Config, error) {
|
||||
KeyFile: c.KeyFile,
|
||||
KeyData: c.KeyData,
|
||||
},
|
||||
Username: c.Username,
|
||||
Password: c.Password,
|
||||
BearerToken: c.BearerToken,
|
||||
Username: c.Username,
|
||||
Password: c.Password,
|
||||
BearerToken: c.BearerToken,
|
||||
BearerTokenFile: c.BearerTokenFile,
|
||||
Impersonate: transport.ImpersonationConfig{
|
||||
UserName: c.Impersonate.UserName,
|
||||
Groups: c.Impersonate.Groups,
|
||||
|
||||
@@ -105,7 +105,7 @@ func LoadFromFile(path string) (*Info, error) {
|
||||
// The fields of client.Config with a corresponding field in the Info are set
|
||||
// with the value from the Info.
|
||||
func (info Info) MergeWithConfig(c restclient.Config) (restclient.Config, error) {
|
||||
var config restclient.Config = c
|
||||
var config = c
|
||||
config.Username = info.User
|
||||
config.Password = info.Password
|
||||
config.CAFile = info.CAFile
|
||||
@@ -118,6 +118,7 @@ func (info Info) MergeWithConfig(c restclient.Config) (restclient.Config, error)
|
||||
return config, nil
|
||||
}
|
||||
|
||||
// Complete returns true if the Kubernetes API authorization info is complete.
|
||||
func (info Info) Complete() bool {
|
||||
return len(info.User) > 0 ||
|
||||
len(info.CertFile) > 0 ||
|
||||
|
||||
2
tools/cache/reflector.go
vendored
2
tools/cache/reflector.go
vendored
@@ -83,7 +83,7 @@ var (
|
||||
// NewNamespaceKeyedIndexerAndReflector creates an Indexer and a Reflector
|
||||
// The indexer is configured to key on namespace
|
||||
func NewNamespaceKeyedIndexerAndReflector(lw ListerWatcher, expectedType interface{}, resyncPeriod time.Duration) (indexer Indexer, reflector *Reflector) {
|
||||
indexer = NewIndexer(MetaNamespaceKeyFunc, Indexers{"namespace": MetaNamespaceIndexFunc})
|
||||
indexer = NewIndexer(MetaNamespaceKeyFunc, Indexers{NamespaceIndex: MetaNamespaceIndexFunc})
|
||||
reflector = NewReflector(lw, expectedType, indexer, resyncPeriod)
|
||||
return indexer, reflector
|
||||
}
|
||||
|
||||
@@ -228,6 +228,7 @@ func (config *DirectClientConfig) getUserIdentificationPartialConfig(configAuthI
|
||||
// blindly overwrite existing values based on precedence
|
||||
if len(configAuthInfo.Token) > 0 {
|
||||
mergedConfig.BearerToken = configAuthInfo.Token
|
||||
mergedConfig.BearerTokenFile = configAuthInfo.TokenFile
|
||||
} else if len(configAuthInfo.TokenFile) > 0 {
|
||||
tokenBytes, err := ioutil.ReadFile(configAuthInfo.TokenFile)
|
||||
if err != nil {
|
||||
@@ -489,8 +490,9 @@ func (config *inClusterClientConfig) ClientConfig() (*restclient.Config, error)
|
||||
if server := config.overrides.ClusterInfo.Server; len(server) > 0 {
|
||||
icc.Host = server
|
||||
}
|
||||
if token := config.overrides.AuthInfo.Token; len(token) > 0 {
|
||||
icc.BearerToken = token
|
||||
if len(config.overrides.AuthInfo.Token) > 0 || len(config.overrides.AuthInfo.TokenFile) > 0 {
|
||||
icc.BearerToken = config.overrides.AuthInfo.Token
|
||||
icc.BearerTokenFile = config.overrides.AuthInfo.TokenFile
|
||||
}
|
||||
if certificateAuthorityFile := config.overrides.ClusterInfo.CertificateAuthority; len(certificateAuthorityFile) > 0 {
|
||||
icc.TLSClientConfig.CAFile = certificateAuthorityFile
|
||||
|
||||
@@ -548,6 +548,30 @@ func TestInClusterClientConfigPrecedence(t *testing.T) {
|
||||
},
|
||||
},
|
||||
},
|
||||
{
|
||||
overrides: &ConfigOverrides{
|
||||
ClusterInfo: clientcmdapi.Cluster{
|
||||
Server: "https://host-from-overrides.com",
|
||||
CertificateAuthority: "/path/to/ca-from-overrides.crt",
|
||||
},
|
||||
AuthInfo: clientcmdapi.AuthInfo{
|
||||
Token: "token-from-override",
|
||||
TokenFile: "tokenfile-from-override",
|
||||
},
|
||||
},
|
||||
},
|
||||
{
|
||||
overrides: &ConfigOverrides{
|
||||
ClusterInfo: clientcmdapi.Cluster{
|
||||
Server: "https://host-from-overrides.com",
|
||||
CertificateAuthority: "/path/to/ca-from-overrides.crt",
|
||||
},
|
||||
AuthInfo: clientcmdapi.AuthInfo{
|
||||
Token: "",
|
||||
TokenFile: "tokenfile-from-override",
|
||||
},
|
||||
},
|
||||
},
|
||||
{
|
||||
overrides: &ConfigOverrides{},
|
||||
},
|
||||
@@ -556,13 +580,15 @@ func TestInClusterClientConfigPrecedence(t *testing.T) {
|
||||
for _, tc := range tt {
|
||||
expectedServer := "https://host-from-cluster.com"
|
||||
expectedToken := "token-from-cluster"
|
||||
expectedTokenFile := "tokenfile-from-cluster"
|
||||
expectedCAFile := "/path/to/ca-from-cluster.crt"
|
||||
|
||||
icc := &inClusterClientConfig{
|
||||
inClusterConfigProvider: func() (*restclient.Config, error) {
|
||||
return &restclient.Config{
|
||||
Host: expectedServer,
|
||||
BearerToken: expectedToken,
|
||||
Host: expectedServer,
|
||||
BearerToken: expectedToken,
|
||||
BearerTokenFile: expectedTokenFile,
|
||||
TLSClientConfig: restclient.TLSClientConfig{
|
||||
CAFile: expectedCAFile,
|
||||
},
|
||||
@@ -579,8 +605,9 @@ func TestInClusterClientConfigPrecedence(t *testing.T) {
|
||||
if overridenServer := tc.overrides.ClusterInfo.Server; len(overridenServer) > 0 {
|
||||
expectedServer = overridenServer
|
||||
}
|
||||
if overridenToken := tc.overrides.AuthInfo.Token; len(overridenToken) > 0 {
|
||||
expectedToken = overridenToken
|
||||
if len(tc.overrides.AuthInfo.Token) > 0 || len(tc.overrides.AuthInfo.TokenFile) > 0 {
|
||||
expectedToken = tc.overrides.AuthInfo.Token
|
||||
expectedTokenFile = tc.overrides.AuthInfo.TokenFile
|
||||
}
|
||||
if overridenCAFile := tc.overrides.ClusterInfo.CertificateAuthority; len(overridenCAFile) > 0 {
|
||||
expectedCAFile = overridenCAFile
|
||||
@@ -592,6 +619,9 @@ func TestInClusterClientConfigPrecedence(t *testing.T) {
|
||||
if clientConfig.BearerToken != expectedToken {
|
||||
t.Errorf("Expected token %v, got %v", expectedToken, clientConfig.BearerToken)
|
||||
}
|
||||
if clientConfig.BearerTokenFile != expectedTokenFile {
|
||||
t.Errorf("Expected tokenfile %v, got %v", expectedTokenFile, clientConfig.BearerTokenFile)
|
||||
}
|
||||
if clientConfig.TLSClientConfig.CAFile != expectedCAFile {
|
||||
t.Errorf("Expected Certificate Authority %v, got %v", expectedCAFile, clientConfig.TLSClientConfig.CAFile)
|
||||
}
|
||||
|
||||
285
tools/events/event_broadcaster.go
Normal file
285
tools/events/event_broadcaster.go
Normal file
@@ -0,0 +1,285 @@
|
||||
/*
|
||||
Copyright 2019 The Kubernetes Authors.
|
||||
|
||||
Licensed under the Apache License, Version 2.0 (the "License");
|
||||
you may not use this file except in compliance with the License.
|
||||
You may obtain a copy of the License at
|
||||
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
|
||||
Unless required by applicable law or agreed to in writing, software
|
||||
distributed under the License is distributed on an "AS IS" BASIS,
|
||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
*/
|
||||
|
||||
package events
|
||||
|
||||
import (
|
||||
"os"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
corev1 "k8s.io/api/core/v1"
|
||||
"k8s.io/apimachinery/pkg/api/errors"
|
||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||
"k8s.io/apimachinery/pkg/runtime"
|
||||
"k8s.io/apimachinery/pkg/util/clock"
|
||||
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
|
||||
"k8s.io/apimachinery/pkg/watch"
|
||||
restclient "k8s.io/client-go/rest"
|
||||
|
||||
"k8s.io/api/events/v1beta1"
|
||||
"k8s.io/apimachinery/pkg/util/json"
|
||||
"k8s.io/apimachinery/pkg/util/strategicpatch"
|
||||
"k8s.io/apimachinery/pkg/util/wait"
|
||||
"k8s.io/client-go/tools/record/util"
|
||||
"k8s.io/klog"
|
||||
)
|
||||
|
||||
const (
|
||||
maxTriesPerEvent = 12
|
||||
finishTime = 6 * time.Minute
|
||||
refreshTime = 30 * time.Minute
|
||||
maxQueuedEvents = 1000
|
||||
)
|
||||
|
||||
var defaultSleepDuration = 10 * time.Second
|
||||
|
||||
// TODO: validate impact of copying and investigate hashing
|
||||
type eventKey struct {
|
||||
action string
|
||||
reason string
|
||||
reportingController string
|
||||
reportingInstance string
|
||||
regarding corev1.ObjectReference
|
||||
related corev1.ObjectReference
|
||||
}
|
||||
|
||||
type eventBroadcasterImpl struct {
|
||||
*watch.Broadcaster
|
||||
mu sync.Mutex
|
||||
eventCache map[eventKey]*v1beta1.Event
|
||||
sleepDuration time.Duration
|
||||
sink EventSink
|
||||
}
|
||||
|
||||
// NewBroadcaster Creates a new event broadcaster.
|
||||
func NewBroadcaster(sink EventSink) EventBroadcaster {
|
||||
return newBroadcaster(sink, defaultSleepDuration)
|
||||
}
|
||||
|
||||
// NewBroadcasterForTest Creates a new event broadcaster for test purposes.
|
||||
func newBroadcaster(sink EventSink, sleepDuration time.Duration) EventBroadcaster {
|
||||
return &eventBroadcasterImpl{
|
||||
Broadcaster: watch.NewBroadcaster(maxQueuedEvents, watch.DropIfChannelFull),
|
||||
eventCache: map[eventKey]*v1beta1.Event{},
|
||||
sleepDuration: sleepDuration,
|
||||
sink: sink,
|
||||
}
|
||||
}
|
||||
|
||||
// TODO: add test for refreshExistingEventSeries
|
||||
func (e *eventBroadcasterImpl) refreshExistingEventSeries() {
|
||||
// TODO: Investigate whether lock contention won't be a problem
|
||||
e.mu.Lock()
|
||||
defer e.mu.Unlock()
|
||||
for isomorphicKey, event := range e.eventCache {
|
||||
if event.Series != nil {
|
||||
if recordedEvent, retry := recordEvent(e.sink, event); !retry {
|
||||
e.eventCache[isomorphicKey] = recordedEvent
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// TODO: add test for finishSeries
|
||||
func (e *eventBroadcasterImpl) finishSeries() {
|
||||
// TODO: Investigate whether lock contention won't be a problem
|
||||
e.mu.Lock()
|
||||
defer e.mu.Unlock()
|
||||
for isomorphicKey, event := range e.eventCache {
|
||||
eventSeries := event.Series
|
||||
if eventSeries != nil {
|
||||
if eventSeries.LastObservedTime.Time.Add(finishTime).Before(time.Now()) {
|
||||
if _, retry := recordEvent(e.sink, event); !retry {
|
||||
delete(e.eventCache, isomorphicKey)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// NewRecorder returns an EventRecorder that records events with the given event source.
|
||||
func (e *eventBroadcasterImpl) NewRecorder(scheme *runtime.Scheme, reportingController string) EventRecorder {
|
||||
hostname, _ := os.Hostname()
|
||||
reportingInstance := reportingController + "-" + hostname
|
||||
return &recorderImpl{scheme, reportingController, reportingInstance, e.Broadcaster, clock.RealClock{}}
|
||||
}
|
||||
|
||||
func (e *eventBroadcasterImpl) recordToSink(event *v1beta1.Event, clock clock.Clock) {
|
||||
// Make a copy before modification, because there could be multiple listeners.
|
||||
eventCopy := event.DeepCopy()
|
||||
go func() {
|
||||
evToRecord := func() *v1beta1.Event {
|
||||
e.mu.Lock()
|
||||
defer e.mu.Unlock()
|
||||
eventKey := getKey(eventCopy)
|
||||
isomorphicEvent, isIsomorphic := e.eventCache[eventKey]
|
||||
if isIsomorphic {
|
||||
if isomorphicEvent.Series != nil {
|
||||
isomorphicEvent.Series.Count++
|
||||
isomorphicEvent.Series.LastObservedTime = metav1.MicroTime{Time: clock.Now()}
|
||||
return nil
|
||||
}
|
||||
isomorphicEvent.Series = &v1beta1.EventSeries{
|
||||
Count: 1,
|
||||
LastObservedTime: metav1.MicroTime{Time: clock.Now()},
|
||||
}
|
||||
return isomorphicEvent
|
||||
}
|
||||
e.eventCache[eventKey] = eventCopy
|
||||
return eventCopy
|
||||
}()
|
||||
if evToRecord != nil {
|
||||
recordedEvent := e.attemptRecording(evToRecord)
|
||||
if recordedEvent != nil {
|
||||
recordedEventKey := getKey(recordedEvent)
|
||||
e.mu.Lock()
|
||||
defer e.mu.Unlock()
|
||||
e.eventCache[recordedEventKey] = recordedEvent
|
||||
}
|
||||
}
|
||||
}()
|
||||
}
|
||||
|
||||
func (e *eventBroadcasterImpl) attemptRecording(event *v1beta1.Event) *v1beta1.Event {
|
||||
tries := 0
|
||||
for {
|
||||
if recordedEvent, retry := recordEvent(e.sink, event); !retry {
|
||||
return recordedEvent
|
||||
}
|
||||
tries++
|
||||
if tries >= maxTriesPerEvent {
|
||||
klog.Errorf("Unable to write event '%#v' (retry limit exceeded!)", event)
|
||||
return nil
|
||||
}
|
||||
// Randomize sleep so that various clients won't all be
|
||||
// synced up if the master goes down.
|
||||
time.Sleep(wait.Jitter(e.sleepDuration, 0.25))
|
||||
}
|
||||
}
|
||||
|
||||
func recordEvent(sink EventSink, event *v1beta1.Event) (*v1beta1.Event, bool) {
|
||||
var newEvent *v1beta1.Event
|
||||
var err error
|
||||
isEventSeries := event.Series != nil
|
||||
if isEventSeries {
|
||||
patch, err := createPatchBytesForSeries(event)
|
||||
if err != nil {
|
||||
klog.Errorf("Unable to calculate diff, no merge is possible: %v", err)
|
||||
return nil, false
|
||||
}
|
||||
newEvent, err = sink.Patch(event, patch)
|
||||
}
|
||||
// Update can fail because the event may have been removed and it no longer exists.
|
||||
if !isEventSeries || (isEventSeries && util.IsKeyNotFoundError(err)) {
|
||||
// Making sure that ResourceVersion is empty on creation
|
||||
event.ResourceVersion = ""
|
||||
newEvent, err = sink.Create(event)
|
||||
}
|
||||
if err == nil {
|
||||
return newEvent, false
|
||||
}
|
||||
// If we can't contact the server, then hold everything while we keep trying.
|
||||
// Otherwise, something about the event is malformed and we should abandon it.
|
||||
switch err.(type) {
|
||||
case *restclient.RequestConstructionError:
|
||||
// We will construct the request the same next time, so don't keep trying.
|
||||
klog.Errorf("Unable to construct event '%#v': '%v' (will not retry!)", event, err)
|
||||
return nil, false
|
||||
case *errors.StatusError:
|
||||
if errors.IsAlreadyExists(err) {
|
||||
klog.V(5).Infof("Server rejected event '%#v': '%v' (will not retry!)", event, err)
|
||||
} else {
|
||||
klog.Errorf("Server rejected event '%#v': '%v' (will not retry!)", event, err)
|
||||
}
|
||||
return nil, false
|
||||
case *errors.UnexpectedObjectError:
|
||||
// We don't expect this; it implies the server's response didn't match a
|
||||
// known pattern. Go ahead and retry.
|
||||
default:
|
||||
// This case includes actual http transport errors. Go ahead and retry.
|
||||
}
|
||||
klog.Errorf("Unable to write event: '%v' (may retry after sleeping)", err)
|
||||
return nil, true
|
||||
}
|
||||
|
||||
func createPatchBytesForSeries(event *v1beta1.Event) ([]byte, error) {
|
||||
oldEvent := event.DeepCopy()
|
||||
oldEvent.Series = nil
|
||||
oldData, err := json.Marshal(oldEvent)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
newData, err := json.Marshal(event)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return strategicpatch.CreateTwoWayMergePatch(oldData, newData, v1beta1.Event{})
|
||||
}
|
||||
|
||||
func getKey(event *v1beta1.Event) eventKey {
|
||||
key := eventKey{
|
||||
action: event.Action,
|
||||
reason: event.Reason,
|
||||
reportingController: event.ReportingController,
|
||||
reportingInstance: event.ReportingInstance,
|
||||
regarding: event.Regarding,
|
||||
}
|
||||
if event.Related != nil {
|
||||
key.related = *event.Related
|
||||
}
|
||||
return key
|
||||
}
|
||||
|
||||
// startEventWatcher starts sending events received from this EventBroadcaster to the given event handler function.
|
||||
// The return value is used to stop recording
|
||||
func (e *eventBroadcasterImpl) startEventWatcher(eventHandler func(event runtime.Object)) func() {
|
||||
watcher := e.Watch()
|
||||
go func() {
|
||||
defer utilruntime.HandleCrash()
|
||||
for {
|
||||
watchEvent, ok := <-watcher.ResultChan()
|
||||
if !ok {
|
||||
return
|
||||
}
|
||||
eventHandler(watchEvent.Object)
|
||||
}
|
||||
}()
|
||||
return watcher.Stop
|
||||
}
|
||||
|
||||
// StartRecordingToSink starts sending events received from the specified eventBroadcaster to the given sink.
|
||||
func (e *eventBroadcasterImpl) StartRecordingToSink(stopCh <-chan struct{}) {
|
||||
go wait.Until(func() {
|
||||
e.refreshExistingEventSeries()
|
||||
}, refreshTime, stopCh)
|
||||
go wait.Until(func() {
|
||||
e.finishSeries()
|
||||
}, finishTime, stopCh)
|
||||
eventHandler := func(obj runtime.Object) {
|
||||
event, ok := obj.(*v1beta1.Event)
|
||||
if !ok {
|
||||
klog.Errorf("unexpected type, expected v1beta1.Event")
|
||||
return
|
||||
}
|
||||
e.recordToSink(event, clock.RealClock{})
|
||||
}
|
||||
stopWatcher := e.startEventWatcher(eventHandler)
|
||||
go func() {
|
||||
<-stopCh
|
||||
stopWatcher()
|
||||
}()
|
||||
}
|
||||
89
tools/events/event_recorder.go
Normal file
89
tools/events/event_recorder.go
Normal file
@@ -0,0 +1,89 @@
|
||||
/*
|
||||
Copyright 2019 The Kubernetes Authors.
|
||||
|
||||
Licensed under the Apache License, Version 2.0 (the "License");
|
||||
you may not use this file except in compliance with the License.
|
||||
You may obtain a copy of the License at
|
||||
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
|
||||
Unless required by applicable law or agreed to in writing, software
|
||||
distributed under the License is distributed on an "AS IS" BASIS,
|
||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
*/
|
||||
|
||||
package events
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"time"
|
||||
|
||||
v1 "k8s.io/api/core/v1"
|
||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||
"k8s.io/apimachinery/pkg/runtime"
|
||||
"k8s.io/apimachinery/pkg/util/clock"
|
||||
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
|
||||
"k8s.io/apimachinery/pkg/watch"
|
||||
"k8s.io/client-go/tools/reference"
|
||||
|
||||
"k8s.io/api/events/v1beta1"
|
||||
"k8s.io/client-go/tools/record/util"
|
||||
"k8s.io/klog"
|
||||
)
|
||||
|
||||
type recorderImpl struct {
|
||||
scheme *runtime.Scheme
|
||||
reportingController string
|
||||
reportingInstance string
|
||||
*watch.Broadcaster
|
||||
clock clock.Clock
|
||||
}
|
||||
|
||||
func (recorder *recorderImpl) Eventf(regarding runtime.Object, related runtime.Object, eventtype, reason, action, note string, args ...interface{}) {
|
||||
timestamp := metav1.MicroTime{time.Now()}
|
||||
message := fmt.Sprintf(note, args...)
|
||||
refRegarding, err := reference.GetReference(recorder.scheme, regarding)
|
||||
if err != nil {
|
||||
klog.Errorf("Could not construct reference to: '%#v' due to: '%v'. Will not report event: '%v' '%v' '%v'", regarding, err, eventtype, reason, message)
|
||||
return
|
||||
}
|
||||
refRelated, err := reference.GetReference(recorder.scheme, related)
|
||||
if err != nil {
|
||||
klog.Errorf("Could not construct reference to: '%#v' due to: '%v'.", related, err)
|
||||
}
|
||||
if !util.ValidateEventType(eventtype) {
|
||||
klog.Errorf("Unsupported event type: '%v'", eventtype)
|
||||
return
|
||||
}
|
||||
event := recorder.makeEvent(refRegarding, refRelated, timestamp, eventtype, reason, message, recorder.reportingController, recorder.reportingInstance, action)
|
||||
go func() {
|
||||
defer utilruntime.HandleCrash()
|
||||
recorder.Action(watch.Added, event)
|
||||
}()
|
||||
}
|
||||
|
||||
func (recorder *recorderImpl) makeEvent(refRegarding *v1.ObjectReference, refRelated *v1.ObjectReference, timestamp metav1.MicroTime, eventtype, reason, message string, reportingController string, reportingInstance string, action string) *v1beta1.Event {
|
||||
t := metav1.Time{Time: recorder.clock.Now()}
|
||||
namespace := refRegarding.Namespace
|
||||
if namespace == "" {
|
||||
namespace = metav1.NamespaceSystem
|
||||
}
|
||||
return &v1beta1.Event{
|
||||
ObjectMeta: metav1.ObjectMeta{
|
||||
Name: fmt.Sprintf("%v.%x", refRegarding.Name, t.UnixNano()),
|
||||
Namespace: namespace,
|
||||
},
|
||||
EventTime: timestamp,
|
||||
Series: nil,
|
||||
ReportingController: reportingController,
|
||||
ReportingInstance: reportingInstance,
|
||||
Action: action,
|
||||
Reason: reason,
|
||||
Regarding: *refRegarding,
|
||||
Related: refRelated,
|
||||
Note: message,
|
||||
Type: eventtype,
|
||||
}
|
||||
}
|
||||
208
tools/events/eventseries_test.go
Normal file
208
tools/events/eventseries_test.go
Normal file
@@ -0,0 +1,208 @@
|
||||
/*
|
||||
Copyright 2019 The Kubernetes Authors.
|
||||
|
||||
Licensed under the Apache License, Version 2.0 (the "License");
|
||||
you may not use this file except in compliance with the License.
|
||||
You may obtain a copy of the License at
|
||||
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
|
||||
Unless required by applicable law or agreed to in writing, software
|
||||
distributed under the License is distributed on an "AS IS" BASIS,
|
||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
*/
|
||||
|
||||
package events
|
||||
|
||||
import (
|
||||
"strconv"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"os"
|
||||
"strings"
|
||||
|
||||
v1 "k8s.io/api/core/v1"
|
||||
"k8s.io/api/events/v1beta1"
|
||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||
k8sruntime "k8s.io/apimachinery/pkg/runtime"
|
||||
"k8s.io/client-go/kubernetes/scheme"
|
||||
ref "k8s.io/client-go/tools/reference"
|
||||
)
|
||||
|
||||
type testEventSeriesSink struct {
|
||||
OnCreate func(e *v1beta1.Event) (*v1beta1.Event, error)
|
||||
OnUpdate func(e *v1beta1.Event) (*v1beta1.Event, error)
|
||||
OnPatch func(e *v1beta1.Event, p []byte) (*v1beta1.Event, error)
|
||||
}
|
||||
|
||||
// Create records the event for testing.
|
||||
func (t *testEventSeriesSink) Create(e *v1beta1.Event) (*v1beta1.Event, error) {
|
||||
if t.OnCreate != nil {
|
||||
return t.OnCreate(e)
|
||||
}
|
||||
return e, nil
|
||||
}
|
||||
|
||||
// Update records the event for testing.
|
||||
func (t *testEventSeriesSink) Update(e *v1beta1.Event) (*v1beta1.Event, error) {
|
||||
if t.OnUpdate != nil {
|
||||
return t.OnUpdate(e)
|
||||
}
|
||||
return e, nil
|
||||
}
|
||||
|
||||
// Patch records the event for testing.
|
||||
func (t *testEventSeriesSink) Patch(e *v1beta1.Event, p []byte) (*v1beta1.Event, error) {
|
||||
if t.OnPatch != nil {
|
||||
return t.OnPatch(e, p)
|
||||
}
|
||||
return e, nil
|
||||
}
|
||||
|
||||
func TestEventSeriesf(t *testing.T) {
|
||||
hostname, _ := os.Hostname()
|
||||
|
||||
testPod := &v1.Pod{
|
||||
ObjectMeta: metav1.ObjectMeta{
|
||||
SelfLink: "/api/version/pods/foo",
|
||||
Name: "foo",
|
||||
Namespace: "baz",
|
||||
UID: "bar",
|
||||
},
|
||||
}
|
||||
|
||||
regarding, err := ref.GetPartialReference(scheme.Scheme, testPod, ".spec.containers[1]")
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
related, err := ref.GetPartialReference(scheme.Scheme, testPod, ".spec.containers[0]")
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
expectedEvent := &v1beta1.Event{
|
||||
ObjectMeta: metav1.ObjectMeta{
|
||||
Name: "foo",
|
||||
Namespace: "baz",
|
||||
},
|
||||
EventTime: metav1.MicroTime{time.Now()},
|
||||
ReportingController: "eventTest",
|
||||
ReportingInstance: "eventTest-" + hostname,
|
||||
Action: "started",
|
||||
Reason: "test",
|
||||
Regarding: *regarding,
|
||||
Related: related,
|
||||
Note: "some verbose message: 1",
|
||||
Type: v1.EventTypeNormal,
|
||||
}
|
||||
|
||||
isomorphicEvent := expectedEvent.DeepCopy()
|
||||
|
||||
nonIsomorphicEvent := expectedEvent.DeepCopy()
|
||||
nonIsomorphicEvent.Action = "stopped"
|
||||
|
||||
expectedEvent.Series = &v1beta1.EventSeries{Count: 1}
|
||||
table := []struct {
|
||||
regarding k8sruntime.Object
|
||||
related k8sruntime.Object
|
||||
actual *v1beta1.Event
|
||||
elements []interface{}
|
||||
expect *v1beta1.Event
|
||||
expectUpdate bool
|
||||
}{
|
||||
{
|
||||
regarding: regarding,
|
||||
related: related,
|
||||
actual: isomorphicEvent,
|
||||
elements: []interface{}{1},
|
||||
expect: expectedEvent,
|
||||
expectUpdate: true,
|
||||
},
|
||||
{
|
||||
regarding: regarding,
|
||||
related: related,
|
||||
actual: nonIsomorphicEvent,
|
||||
elements: []interface{}{1},
|
||||
expect: nonIsomorphicEvent,
|
||||
expectUpdate: false,
|
||||
},
|
||||
}
|
||||
|
||||
stopCh := make(chan struct{})
|
||||
|
||||
createEvent := make(chan *v1beta1.Event)
|
||||
updateEvent := make(chan *v1beta1.Event)
|
||||
patchEvent := make(chan *v1beta1.Event)
|
||||
|
||||
testEvents := testEventSeriesSink{
|
||||
OnCreate: func(event *v1beta1.Event) (*v1beta1.Event, error) {
|
||||
createEvent <- event
|
||||
return event, nil
|
||||
},
|
||||
OnUpdate: func(event *v1beta1.Event) (*v1beta1.Event, error) {
|
||||
updateEvent <- event
|
||||
return event, nil
|
||||
},
|
||||
OnPatch: func(event *v1beta1.Event, patch []byte) (*v1beta1.Event, error) {
|
||||
// event we receive is already patched, usually the sink uses it only to retrieve the name and namespace, here
|
||||
// we'll use it directly
|
||||
patchEvent <- event
|
||||
return event, nil
|
||||
},
|
||||
}
|
||||
eventBroadcaster := newBroadcaster(&testEvents, 0)
|
||||
recorder := eventBroadcaster.NewRecorder(scheme.Scheme, "eventTest")
|
||||
eventBroadcaster.StartRecordingToSink(stopCh)
|
||||
recorder.Eventf(regarding, related, isomorphicEvent.Type, isomorphicEvent.Reason, isomorphicEvent.Action, isomorphicEvent.Note, []interface{}{1})
|
||||
// read from the chan as this was needed only to populate the cache
|
||||
<-createEvent
|
||||
for index, item := range table {
|
||||
actual := item.actual
|
||||
recorder.Eventf(item.regarding, item.related, actual.Type, actual.Reason, actual.Action, actual.Note, item.elements)
|
||||
// validate event
|
||||
if item.expectUpdate {
|
||||
actualEvent := <-patchEvent
|
||||
t.Logf("%v - validating event affected by patch request", index)
|
||||
validateEventSerie(strconv.Itoa(index), true, actualEvent, item.expect, t)
|
||||
} else {
|
||||
actualEvent := <-createEvent
|
||||
t.Logf("%v - validating event affected by a create request", index)
|
||||
validateEventSerie(strconv.Itoa(index), false, actualEvent, item.expect, t)
|
||||
}
|
||||
}
|
||||
close(stopCh)
|
||||
}
|
||||
|
||||
func validateEventSerie(messagePrefix string, expectedUpdate bool, actualEvent *v1beta1.Event, expectedEvent *v1beta1.Event, t *testing.T) {
|
||||
recvEvent := *actualEvent
|
||||
|
||||
// Just check that the timestamp was set.
|
||||
if recvEvent.EventTime.IsZero() {
|
||||
t.Errorf("%v - timestamp wasn't set: %#v", messagePrefix, recvEvent)
|
||||
}
|
||||
|
||||
if expectedUpdate {
|
||||
if recvEvent.Series == nil {
|
||||
t.Errorf("%v - Series was nil but expected: %#v", messagePrefix, recvEvent.Series)
|
||||
|
||||
} else {
|
||||
if recvEvent.Series.Count != expectedEvent.Series.Count {
|
||||
t.Errorf("%v - Series mismatch actual was: %#v but expected: %#v", messagePrefix, recvEvent.Series, expectedEvent.Series)
|
||||
}
|
||||
}
|
||||
|
||||
// Check that name has the right prefix.
|
||||
if n, en := recvEvent.Name, expectedEvent.Name; !strings.HasPrefix(n, en) {
|
||||
t.Errorf("%v - Name '%v' does not contain prefix '%v'", messagePrefix, n, en)
|
||||
}
|
||||
} else {
|
||||
if recvEvent.Series != nil {
|
||||
t.Errorf("%v - series was expected to be nil but was: %#v", messagePrefix, recvEvent.Series)
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
58
tools/events/interfaces.go
Normal file
58
tools/events/interfaces.go
Normal file
@@ -0,0 +1,58 @@
|
||||
/*
|
||||
Copyright 2019 The Kubernetes Authors.
|
||||
|
||||
Licensed under the Apache License, Version 2.0 (the "License");
|
||||
you may not use this file except in compliance with the License.
|
||||
You may obtain a copy of the License at
|
||||
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
|
||||
Unless required by applicable law or agreed to in writing, software
|
||||
distributed under the License is distributed on an "AS IS" BASIS,
|
||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
*/
|
||||
|
||||
package events
|
||||
|
||||
import (
|
||||
"k8s.io/api/events/v1beta1"
|
||||
"k8s.io/apimachinery/pkg/runtime"
|
||||
)
|
||||
|
||||
// EventRecorder knows how to record events on behalf of an EventSource.
|
||||
type EventRecorder interface {
|
||||
// Eventf constructs an event from the given information and puts it in the queue for sending.
|
||||
// 'regarding' is the object this event is about. Event will make a reference-- or you may also
|
||||
// pass a reference to the object directly.
|
||||
// 'related' is the secondary object for more complex actions. E.g. when regarding object triggers
|
||||
// a creation or deletion of related object.
|
||||
// 'type' of this event, and can be one of Normal, Warning. New types could be added in future
|
||||
// 'reason' is the reason this event is generated. 'reason' should be short and unique; it
|
||||
// should be in UpperCamelCase format (starting with a capital letter). "reason" will be used
|
||||
// to automate handling of events, so imagine people writing switch statements to handle them.
|
||||
// You want to make that easy.
|
||||
// 'note' is intended to be human readable.
|
||||
Eventf(regarding runtime.Object, related runtime.Object, eventtype, reason, action, note string, args ...interface{})
|
||||
}
|
||||
|
||||
// EventBroadcaster knows how to receive events and send them to any EventSink, watcher, or log.
|
||||
type EventBroadcaster interface {
|
||||
// StartRecordingToSink starts sending events received from the specified eventBroadcaster.
|
||||
StartRecordingToSink(stopCh <-chan struct{})
|
||||
|
||||
// NewRecorder returns an EventRecorder that can be used to send events to this EventBroadcaster
|
||||
// with the event source set to the given event source.
|
||||
NewRecorder(scheme *runtime.Scheme, reportingController string) EventRecorder
|
||||
}
|
||||
|
||||
// EventSink knows how to store events (client-go implements it.)
|
||||
// EventSink must respect the namespace that will be embedded in 'event'.
|
||||
// It is assumed that EventSink will return the same sorts of errors as
|
||||
// client-go's REST client.
|
||||
type EventSink interface {
|
||||
Create(event *v1beta1.Event) (*v1beta1.Event, error)
|
||||
Update(event *v1beta1.Event) (*v1beta1.Event, error)
|
||||
Patch(oldEvent *v1beta1.Event, data []byte) (*v1beta1.Event, error)
|
||||
}
|
||||
@@ -16,12 +16,16 @@ limitations under the License.
|
||||
|
||||
// Package leaderelection implements leader election of a set of endpoints.
|
||||
// It uses an annotation in the endpoints object to store the record of the
|
||||
// election state.
|
||||
// election state. This implementation does not guarantee that only one
|
||||
// client is acting as a leader (a.k.a. fencing).
|
||||
//
|
||||
// This implementation does not guarantee that only one client is acting as a
|
||||
// leader (a.k.a. fencing). A client observes timestamps captured locally to
|
||||
// infer the state of the leader election. Thus the implementation is tolerant
|
||||
// to arbitrary clock skew, but is not tolerant to arbitrary clock skew rate.
|
||||
// A client only acts on timestamps captured locally to infer the state of the
|
||||
// leader election. The client does not consider timestamps in the leader
|
||||
// election record to be accurate because these timestamps may not have been
|
||||
// produced by a local clock. The implemention does not depend on their
|
||||
// accuracy and only uses their change to indicate that another client has
|
||||
// renewed the leader lease. Thus the implementation is tolerant to arbitrary
|
||||
// clock skew, but is not tolerant to arbitrary clock skew rate.
|
||||
//
|
||||
// However the level of tolerance to skew rate can be configured by setting
|
||||
// RenewDeadline and LeaseDuration appropriately. The tolerance expressed as a
|
||||
@@ -105,12 +109,26 @@ type LeaderElectionConfig struct {
|
||||
// LeaseDuration is the duration that non-leader candidates will
|
||||
// wait to force acquire leadership. This is measured against time of
|
||||
// last observed ack.
|
||||
//
|
||||
// A client needs to wait a full LeaseDuration without observing a change to
|
||||
// the record before it can attempt to take over. When all clients are
|
||||
// shutdown and a new set of clients are started with different names against
|
||||
// the same leader record, they must wait the full LeaseDuration before
|
||||
// attempting to acquire the lease. Thus LeaseDuration should be as short as
|
||||
// possible (within your tolerance for clock skew rate) to avoid a possible
|
||||
// long waits in the scenario.
|
||||
//
|
||||
// Core clients default this value to 15 seconds.
|
||||
LeaseDuration time.Duration
|
||||
// RenewDeadline is the duration that the acting master will retry
|
||||
// refreshing leadership before giving up.
|
||||
//
|
||||
// Core clients default this value to 10 seconds.
|
||||
RenewDeadline time.Duration
|
||||
// RetryPeriod is the duration the LeaderElector clients should wait
|
||||
// between tries of actions.
|
||||
//
|
||||
// Core clients default this value to 2 seconds.
|
||||
RetryPeriod time.Duration
|
||||
|
||||
// Callbacks are callbacks that are triggered during certain lifecycle
|
||||
|
||||
@@ -33,8 +33,8 @@ import (
|
||||
"k8s.io/apimachinery/pkg/util/runtime"
|
||||
)
|
||||
|
||||
// PortForwardProtocolV1Name is the subprotocol used for port forwarding.
|
||||
// TODO move to API machinery and re-unify with kubelet/server/portfoward
|
||||
// The subprotocol "portforward.k8s.io" is used for port forwarding.
|
||||
const PortForwardProtocolV1Name = "portforward.k8s.io"
|
||||
|
||||
// PortForwarder knows how to listen for local connections and forward them to
|
||||
@@ -401,6 +401,7 @@ func (pf *PortForwarder) handleConnection(conn net.Conn, port ForwardedPort) {
|
||||
}
|
||||
}
|
||||
|
||||
// Close stops all listeners of PortForwarder.
|
||||
func (pf *PortForwarder) Close() {
|
||||
// stop all listeners
|
||||
for _, l := range pf.listeners {
|
||||
|
||||
@@ -17,6 +17,7 @@ limitations under the License.
|
||||
package certificate
|
||||
|
||||
import (
|
||||
"context"
|
||||
"crypto/ecdsa"
|
||||
"crypto/elliptic"
|
||||
cryptorand "crypto/rand"
|
||||
@@ -148,9 +149,13 @@ type CSRClientFunc func(current *tls.Certificate) (certificatesclient.Certificat
|
||||
func (e *NoCertKeyError) Error() string { return string(*e) }
|
||||
|
||||
type manager struct {
|
||||
getTemplate func() *x509.CertificateRequest
|
||||
lastRequestLock sync.Mutex
|
||||
lastRequest *x509.CertificateRequest
|
||||
getTemplate func() *x509.CertificateRequest
|
||||
|
||||
// lastRequestLock guards lastRequestCancel and lastRequest
|
||||
lastRequestLock sync.Mutex
|
||||
lastRequestCancel context.CancelFunc
|
||||
lastRequest *x509.CertificateRequest
|
||||
|
||||
dynamicTemplate bool
|
||||
usages []certificates.KeyUsage
|
||||
forceRotation bool
|
||||
@@ -261,7 +266,8 @@ func (m *manager) Start() {
|
||||
case <-timer.C:
|
||||
// unblock when deadline expires
|
||||
case <-templateChanged:
|
||||
if reflect.DeepEqual(m.getLastRequest(), m.getTemplate()) {
|
||||
_, lastRequestTemplate := m.getLastRequest()
|
||||
if reflect.DeepEqual(lastRequestTemplate, m.getTemplate()) {
|
||||
// if the template now matches what we last requested, restart the rotation deadline loop
|
||||
return
|
||||
}
|
||||
@@ -289,10 +295,19 @@ func (m *manager) Start() {
|
||||
if m.dynamicTemplate {
|
||||
go wait.Until(func() {
|
||||
// check if the current template matches what we last requested
|
||||
if !m.certSatisfiesTemplate() && !reflect.DeepEqual(m.getLastRequest(), m.getTemplate()) {
|
||||
lastRequestCancel, lastRequestTemplate := m.getLastRequest()
|
||||
|
||||
if !m.certSatisfiesTemplate() && !reflect.DeepEqual(lastRequestTemplate, m.getTemplate()) {
|
||||
// if the template is different, queue up an interrupt of the rotation deadline loop.
|
||||
// if we've requested a CSR that matches the new template by the time the interrupt is handled, the interrupt is disregarded.
|
||||
templateChanged <- struct{}{}
|
||||
if lastRequestCancel != nil {
|
||||
// if we're currently waiting on a submitted request that no longer matches what we want, stop waiting
|
||||
lastRequestCancel()
|
||||
}
|
||||
select {
|
||||
case templateChanged <- struct{}{}:
|
||||
case <-m.stopCh:
|
||||
}
|
||||
}
|
||||
}, time.Second, m.stopCh)
|
||||
}
|
||||
@@ -386,12 +401,15 @@ func (m *manager) rotateCerts() (bool, error) {
|
||||
return false, m.updateServerError(err)
|
||||
}
|
||||
|
||||
ctx, cancel := context.WithTimeout(context.Background(), certificateWaitTimeout)
|
||||
defer cancel()
|
||||
|
||||
// Once we've successfully submitted a CSR for this template, record that we did so
|
||||
m.setLastRequest(template)
|
||||
m.setLastRequest(cancel, template)
|
||||
|
||||
// Wait for the certificate to be signed. This interface and internal timout
|
||||
// is a remainder after the old design using raw watch wrapped with backoff.
|
||||
crtPEM, err := csr.WaitForCertificate(client, req, certificateWaitTimeout)
|
||||
crtPEM, err := csr.WaitForCertificate(ctx, client, req)
|
||||
if err != nil {
|
||||
utilruntime.HandleError(fmt.Errorf("Certificate request was not signed: %v", err))
|
||||
return false, nil
|
||||
@@ -561,14 +579,15 @@ func (m *manager) generateCSR() (template *x509.CertificateRequest, csrPEM []byt
|
||||
return template, csrPEM, keyPEM, privateKey, nil
|
||||
}
|
||||
|
||||
func (m *manager) getLastRequest() *x509.CertificateRequest {
|
||||
func (m *manager) getLastRequest() (context.CancelFunc, *x509.CertificateRequest) {
|
||||
m.lastRequestLock.Lock()
|
||||
defer m.lastRequestLock.Unlock()
|
||||
return m.lastRequest
|
||||
return m.lastRequestCancel, m.lastRequest
|
||||
}
|
||||
|
||||
func (m *manager) setLastRequest(r *x509.CertificateRequest) {
|
||||
func (m *manager) setLastRequest(cancel context.CancelFunc, r *x509.CertificateRequest) {
|
||||
m.lastRequestLock.Lock()
|
||||
defer m.lastRequestLock.Unlock()
|
||||
m.lastRequestCancel = cancel
|
||||
m.lastRequest = r
|
||||
}
|
||||
|
||||
@@ -82,7 +82,7 @@ func RequestCertificate(client certificatesclient.CertificateSigningRequestInter
|
||||
}
|
||||
|
||||
// WaitForCertificate waits for a certificate to be issued until timeout, or returns an error.
|
||||
func WaitForCertificate(client certificatesclient.CertificateSigningRequestInterface, req *certificates.CertificateSigningRequest, timeout time.Duration) (certData []byte, err error) {
|
||||
func WaitForCertificate(ctx context.Context, client certificatesclient.CertificateSigningRequestInterface, req *certificates.CertificateSigningRequest) (certData []byte, err error) {
|
||||
fieldSelector := fields.OneTermEqualSelector("metadata.name", req.Name).String()
|
||||
lw := &cache.ListWatch{
|
||||
ListFunc: func(options metav1.ListOptions) (runtime.Object, error) {
|
||||
@@ -94,8 +94,6 @@ func WaitForCertificate(client certificatesclient.CertificateSigningRequestInter
|
||||
return client.Watch(options)
|
||||
},
|
||||
}
|
||||
ctx, cancel := watchtools.ContextWithOptionalTimeout(context.Background(), timeout)
|
||||
defer cancel()
|
||||
event, err := watchtools.UntilWithSync(
|
||||
ctx,
|
||||
lw,
|
||||
|
||||
@@ -18,6 +18,7 @@ package workqueue
|
||||
|
||||
import (
|
||||
"container/heap"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"k8s.io/apimachinery/pkg/util/clock"
|
||||
@@ -66,6 +67,8 @@ type delayingType struct {
|
||||
|
||||
// stopCh lets us signal a shutdown to the waiting loop
|
||||
stopCh chan struct{}
|
||||
// stopOnce guarantees we only signal shutdown a single time
|
||||
stopOnce sync.Once
|
||||
|
||||
// heartbeat ensures we wait no more than maxWait before firing
|
||||
heartbeat clock.Ticker
|
||||
@@ -133,11 +136,14 @@ func (pq waitForPriorityQueue) Peek() interface{} {
|
||||
return pq[0]
|
||||
}
|
||||
|
||||
// ShutDown gives a way to shut off this queue
|
||||
// ShutDown stops the queue. After the queue drains, the returned shutdown bool
|
||||
// on Get() will be true. This method may be invoked more than once.
|
||||
func (q *delayingType) ShutDown() {
|
||||
q.Interface.ShutDown()
|
||||
close(q.stopCh)
|
||||
q.heartbeat.Stop()
|
||||
q.stopOnce.Do(func() {
|
||||
q.Interface.ShutDown()
|
||||
close(q.stopCh)
|
||||
q.heartbeat.Stop()
|
||||
})
|
||||
}
|
||||
|
||||
// AddAfter adds the given item to the work queue after the given delay
|
||||
|
||||
Reference in New Issue
Block a user