Add Kubernetes event log when the pod is launched

This commit is contained in:
Tomofumi Hayashi 2020-03-04 16:42:29 +09:00 committed by Tomofumi Hayashi
parent bfaf22964b
commit 079c853eba
20 changed files with 1363 additions and 38 deletions

1
go.mod
View File

@ -28,5 +28,6 @@ require (
k8s.io/code-generator v0.17.2 // indirect
k8s.io/gengo v0.0.0-20200127102705-1e9b17e831be // indirect
k8s.io/kube-openapi v0.0.0-20200121204235-bf4fb3bd569c // indirect
k8s.io/klog v1.0.0
k8s.io/kubernetes v1.13.0
)

1
go.sum
View File

@ -73,6 +73,7 @@ github.com/gogo/protobuf v1.3.0/go.mod h1:SlYgWuQ5SjCEi6WLHjHCa1yvBfUnHcTbrrZtXP
github.com/golang/freetype v0.0.0-20170609003504-e2365dfdc4a0/go.mod h1:E/TSTwGwJL78qG/PmXZO1EjYhfJinVAhrmmHX6Z8B9k=
github.com/golang/glog v0.0.0-20160126235308-23def4e6c14b h1:VKtxabqXZkF25pY9ekfRL6a582T4P37/31XEstQ5p58=
github.com/golang/glog v0.0.0-20160126235308-23def4e6c14b/go.mod h1:SBH7ygxi8pfUlaOkMMuAQtPIUF8ecWP5IEl/CR7VP2Q=
github.com/golang/groupcache v0.0.0-20190702054246-869f871628b6 h1:ZgQEtGgCBiWRM39fZuwSd1LwSqqSW0hOdXCYYDX0R3I=
github.com/golang/groupcache v0.0.0-20190702054246-869f871628b6/go.mod h1:cIg4eruTrX1D+g88fzRXU5OdNfaM+9IcxsU14FzY7Hc=
github.com/golang/mock v1.1.1/go.mod h1:oTYuIxOrZwtPieC+H1uAHpcLFnEyAGVDL/k47Jfbm0A=
github.com/golang/protobuf v0.0.0-20161109072736-4bd1920723d7/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U=

View File

@ -39,6 +39,15 @@ rules:
verbs:
- get
- update
- apiGroups:
- ""
- events.k8s.io
resources:
- events
verbs:
- create
- patch
- update
---
kind: ClusterRoleBinding
apiVersion: rbac.authorization.k8s.io/v1beta1

View File

@ -44,6 +44,15 @@ rules:
verbs:
- get
- update
- apiGroups:
- ""
- events.k8s.io
resources:
- events
verbs:
- create
- patch
- update
---
kind: ClusterRoleBinding
apiVersion: rbac.authorization.k8s.io/v1

View File

@ -39,6 +39,15 @@ rules:
verbs:
- get
- update
- apiGroups:
- ""
- events.k8s.io
resources:
- events
verbs:
- create
- patch
- update
---
kind: ClusterRoleBinding
apiVersion: rbac.authorization.k8s.io/v1beta1

View File

@ -44,6 +44,15 @@ rules:
verbs:
- get
- update
- apiGroups:
- ""
- events.k8s.io
resources:
- events
verbs:
- create
- patch
- update
---
kind: ClusterRoleBinding
apiVersion: rbac.authorization.k8s.io/v1

View File

@ -24,9 +24,14 @@ import (
v1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/kubernetes/scheme"
v1core "k8s.io/client-go/kubernetes/typed/core/v1"
"k8s.io/client-go/rest"
"k8s.io/client-go/tools/clientcmd"
"k8s.io/client-go/tools/record"
"k8s.io/klog"
"github.com/containernetworking/cni/libcni"
"github.com/containernetworking/cni/pkg/skel"
@ -52,8 +57,10 @@ type NoK8sNetworkError struct {
// ClientInfo contains information given from k8s client
type ClientInfo struct {
Client kubernetes.Interface
NetClient netclient.K8sCniCncfIoV1Interface
Client kubernetes.Interface
NetClient netclient.K8sCniCncfIoV1Interface
EventBroadcaster record.EventBroadcaster
EventRecorder record.EventRecorder
}
// AddPod adds pod into kubernetes
@ -76,6 +83,13 @@ func (c *ClientInfo) AddNetAttachDef(netattach *nettypes.NetworkAttachmentDefini
return c.NetClient.NetworkAttachmentDefinitions(netattach.ObjectMeta.Namespace).Create(netattach)
}
// Eventf puts event into kubernetes events
func (c *ClientInfo) Eventf(object runtime.Object, eventtype, reason, messageFmt string, args ...interface{}) {
if c != nil && c.EventRecorder != nil {
c.EventRecorder.Eventf(object, eventtype, reason, messageFmt, args...)
}
}
func (e *NoK8sNetworkError) Error() string { return string(e.message) }
// SetNetworkStatus sets network status into Pod annotation
@ -282,33 +296,33 @@ func GetK8sArgs(args *skel.CmdArgs) (*types.K8sArgs, error) {
// TryLoadPodDelegates attempts to load Kubernetes-defined delegates and add them to the Multus config.
// Returns the number of Kubernetes-defined delegates added or an error.
func TryLoadPodDelegates(k8sArgs *types.K8sArgs, conf *types.NetConf, clientInfo *ClientInfo) (int, *ClientInfo, error) {
func TryLoadPodDelegates(k8sArgs *types.K8sArgs, conf *types.NetConf, clientInfo *ClientInfo) (int, *v1.Pod, *ClientInfo, error) {
var err error
logging.Debugf("TryLoadPodDelegates: %v, %v, %v", k8sArgs, conf, clientInfo)
clientInfo, err = GetK8sClient(conf.Kubeconfig, clientInfo)
if err != nil {
return 0, nil, err
return 0, nil, nil, err
}
if clientInfo == nil {
if len(conf.Delegates) == 0 {
// No available kube client and no delegates, we can't do anything
return 0, nil, logging.Errorf("TryLoadPodDelegates: must have either Kubernetes config or delegates")
return 0, nil, nil, logging.Errorf("TryLoadPodDelegates: must have either Kubernetes config or delegates")
}
return 0, nil, nil
return 0, nil, nil, nil
}
// Get the pod info. If cannot get it, we use cached delegates
pod, err := clientInfo.GetPod(string(k8sArgs.K8S_POD_NAMESPACE), string(k8sArgs.K8S_POD_NAME))
if err != nil {
logging.Debugf("TryLoadPodDelegates: Err in loading K8s cluster default network from pod annotation: %v, use cached delegates", err)
return 0, nil, nil
return 0, nil, nil, nil
}
delegate, err := tryLoadK8sPodDefaultNetwork(clientInfo, pod, conf)
if err != nil {
return 0, nil, logging.Errorf("TryLoadPodDelegates: error in loading K8s cluster default network from pod annotation: %v", err)
return 0, nil, nil, logging.Errorf("TryLoadPodDelegates: error in loading K8s cluster default network from pod annotation: %v", err)
}
if delegate != nil {
logging.Debugf("TryLoadPodDelegates: Overwrite the cluster default network with %v from pod annotations", delegate)
@ -322,13 +336,13 @@ func TryLoadPodDelegates(k8sArgs *types.K8sArgs, conf *types.NetConf, clientInfo
if err != nil {
if _, ok := err.(*NoK8sNetworkError); ok {
return 0, clientInfo, nil
return 0, nil, clientInfo, nil
}
return 0, nil, logging.Errorf("TryLoadPodDelegates: error in getting k8s network from pod: %v", err)
return 0, nil, nil, logging.Errorf("TryLoadPodDelegates: error in getting k8s network from pod: %v", err)
}
if err = conf.AddDelegates(delegates); err != nil {
return 0, nil, err
return 0, nil, nil, err
}
// Check gatewayRequest is configured in delegates
@ -345,10 +359,10 @@ func TryLoadPodDelegates(k8sArgs *types.K8sArgs, conf *types.NetConf, clientInfo
types.CheckGatewayConfig(conf.Delegates)
}
return len(delegates), clientInfo, nil
return len(delegates), pod, clientInfo, nil
}
return 0, clientInfo, nil
return 0, pod, clientInfo, nil
}
// GetK8sClient gets client info from kubeconfig
@ -396,9 +410,16 @@ func GetK8sClient(kubeconfig string, kubeClient *ClientInfo) (*ClientInfo, error
return nil, err
}
broadcaster := record.NewBroadcaster()
broadcaster.StartLogging(klog.Infof)
broadcaster.StartRecordingToSink(&v1core.EventSinkImpl{Interface: client.CoreV1().Events("")})
recorder := broadcaster.NewRecorder(scheme.Scheme, v1.EventSource{Component: "multus"})
return &ClientInfo{
Client: client,
NetClient: netclient,
Client: client,
NetClient: netclient,
EventBroadcaster: broadcaster,
EventRecorder: recorder,
}, nil
}

View File

@ -617,8 +617,9 @@ var _ = Describe("k8sclient operations", func() {
Expect(netConf.Delegates[0].Conf.Name).To(Equal("net2"))
Expect(netConf.Delegates[0].Conf.Type).To(Equal("mynet2"))
numK8sDelegates, _, err := TryLoadPodDelegates(k8sArgs, netConf, clientInfo)
numK8sDelegates, pod, _, err := TryLoadPodDelegates(k8sArgs, netConf, clientInfo)
Expect(err).NotTo(HaveOccurred())
Expect(pod).NotTo(BeNil())
Expect(numK8sDelegates).To(Equal(0))
Expect(netConf.Delegates[0].Conf.Name).To(Equal("net1"))
Expect(netConf.Delegates[0].Conf.Type).To(Equal("mynet1"))
@ -655,7 +656,7 @@ var _ = Describe("k8sclient operations", func() {
Expect(err).To(HaveOccurred())
netConf.ConfDir = "badfilepath"
_, _, err = TryLoadPodDelegates(k8sArgs, netConf, clientInfo)
_, _, _, err = TryLoadPodDelegates(k8sArgs, netConf, clientInfo)
Expect(err).To(HaveOccurred())
})
@ -690,7 +691,7 @@ var _ = Describe("k8sclient operations", func() {
k8sArgs, err := GetK8sArgs(args)
Expect(err).NotTo(HaveOccurred())
numK8sDelegates, _, err := TryLoadPodDelegates(k8sArgs, netConf, clientInfo)
numK8sDelegates, _, _, err := TryLoadPodDelegates(k8sArgs, netConf, clientInfo)
Expect(err).NotTo(HaveOccurred())
Expect(numK8sDelegates).To(Equal(0))
Expect(netConf.Delegates[0].Conf.Name).To(Equal("net1"))
@ -727,7 +728,7 @@ var _ = Describe("k8sclient operations", func() {
k8sArgs, err := GetK8sArgs(args)
Expect(err).NotTo(HaveOccurred())
_, _, err = TryLoadPodDelegates(k8sArgs, netConf, nil)
_, _, _, err = TryLoadPodDelegates(k8sArgs, netConf, nil)
Expect(err).To(HaveOccurred())
})
@ -760,12 +761,12 @@ var _ = Describe("k8sclient operations", func() {
k8sArgs, err := GetK8sArgs(args)
Expect(err).NotTo(HaveOccurred())
_, _, err = TryLoadPodDelegates(k8sArgs, netConf, nil)
_, _, _, err = TryLoadPodDelegates(k8sArgs, netConf, nil)
Expect(err).NotTo(HaveOccurred())
// additionally, we expect the test to fail with no delegates, as at least one is always required.
netConf.Delegates = nil
_, _, err = TryLoadPodDelegates(k8sArgs, netConf, nil)
_, _, _, err = TryLoadPodDelegates(k8sArgs, netConf, nil)
Expect(err).To(HaveOccurred())
})
@ -824,7 +825,7 @@ users:
k8sArgs, err := GetK8sArgs(args)
Expect(err).NotTo(HaveOccurred())
_, _, err = TryLoadPodDelegates(k8sArgs, netConf, nil)
_, _, _, err = TryLoadPodDelegates(k8sArgs, netConf, nil)
Expect(err).NotTo(HaveOccurred())
})

View File

@ -34,6 +34,7 @@ import (
"github.com/containernetworking/cni/pkg/invoke"
"github.com/containernetworking/cni/pkg/skel"
cnitypes "github.com/containernetworking/cni/pkg/types"
cnicurrent "github.com/containernetworking/cni/pkg/types/current"
cniversion "github.com/containernetworking/cni/pkg/version"
"github.com/containernetworking/plugins/pkg/ns"
k8s "github.com/intel/multus-cni/k8sclient"
@ -43,6 +44,7 @@ import (
nettypes "github.com/k8snetworkplumbingwg/network-attachment-definition-client/pkg/apis/k8s.cni.cncf.io/v1"
nadutils "github.com/k8snetworkplumbingwg/network-attachment-definition-client/pkg/utils"
"github.com/vishvananda/netlink"
v1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/util/wait"
)
@ -263,7 +265,7 @@ func conflistDel(rt *libcni.RuntimeConf, rawnetconflist []byte, binDir string, e
return err
}
func delegateAdd(exec invoke.Exec, ifName string, delegate *types.DelegateNetConf, rt *libcni.RuntimeConf, binDir string, cniArgs string) (cnitypes.Result, error) {
func delegateAdd(exec invoke.Exec, kubeClient *k8s.ClientInfo, pod *v1.Pod, ifName string, delegate *types.DelegateNetConf, rt *libcni.RuntimeConf, binDir string, cniArgs string) (cnitypes.Result, error) {
logging.Debugf("delegateAdd: %v, %s, %v, %v, %s", exec, ifName, delegate, rt, binDir)
if os.Setenv("CNI_IFNAME", ifName) != nil {
return nil, logging.Errorf("delegateAdd: error setting envionment variable CNI_IFNAME")
@ -322,7 +324,7 @@ func delegateAdd(exec invoke.Exec, ifName string, delegate *types.DelegateNetCon
} else {
result, err = confAdd(rt, delegate.Bytes, binDir, exec)
if err != nil {
return nil, logging.Errorf("delegateAdd: error invoking DelegateAdd - %q: %v", delegate.Conf.Type, err)
return nil, logging.Errorf("delegateAdd: error invoking confAdd - %q: %v", delegate.Conf.Type, err)
}
}
@ -338,6 +340,24 @@ func delegateAdd(exec invoke.Exec, ifName string, delegate *types.DelegateNetCon
logging.Verbosef("Add: %s:%s:%s:%s %s", rt.Args[1][1], rt.Args[2][1], confName, rt.IfName, string(data))
}
// get IP addresses from result
ips := []string{}
res, err := cnicurrent.NewResultFromResult(result)
if err != nil {
logging.Errorf("delegateAdd: error converting result: %v", err)
return result, nil
}
for _, ip := range res.IPs {
ips = append(ips, ip.Address.String())
}
// send kubernetes events
if delegate.Name != "" {
kubeClient.Eventf(pod, v1.EventTypeNormal, "AddedInterface", "Add %s %v from %s", rt.IfName, ips, delegate.Name)
} else {
kubeClient.Eventf(pod, v1.EventTypeNormal, "AddedInterface", "Add %s %v", rt.IfName, ips)
}
return result, nil
}
@ -444,6 +464,11 @@ func cmdAdd(args *skel.CmdArgs, exec invoke.Exec, kubeClient *k8s.ClientInfo) (c
return nil, cmdErr(nil, "error loading netconf: %v", err)
}
kubeClient, err = k8s.GetK8sClient(n.Kubeconfig, kubeClient)
if err != nil {
return nil, cmdErr(nil, "error getting k8s client: %v", err)
}
k8sArgs, err := k8s.GetK8sArgs(args)
if err != nil {
return nil, cmdErr(nil, "error getting k8s args: %v", err)
@ -468,7 +493,7 @@ func cmdAdd(args *skel.CmdArgs, exec invoke.Exec, kubeClient *k8s.ClientInfo) (c
n.Delegates[0].MasterPlugin = true
}
_, kc, err := k8s.TryLoadPodDelegates(k8sArgs, n, kubeClient)
_, pod, kc, err := k8s.TryLoadPodDelegates(k8sArgs, n, kubeClient)
if err != nil {
return nil, cmdErr(k8sArgs, "error loading k8s delegates k8s args: %v", err)
}
@ -486,7 +511,7 @@ func cmdAdd(args *skel.CmdArgs, exec invoke.Exec, kubeClient *k8s.ClientInfo) (c
runtimeConfig := types.MergeCNIRuntimeConfig(n.RuntimeConfig, delegate)
rt := types.CreateCNIRuntimeConf(args, k8sArgs, ifName, runtimeConfig)
tmpResult, err = delegateAdd(exec, ifName, delegate, rt, n.BinDir, cniArgs)
tmpResult, err = delegateAdd(exec, kubeClient, pod, ifName, delegate, rt, n.BinDir, cniArgs)
if err != nil {
// If the add failed, tear down all networks we already added
netName := delegate.Conf.Name
@ -631,7 +656,7 @@ func cmdDel(args *skel.CmdArgs, exec invoke.Exec, kubeClient *k8s.ClientInfo) er
}
// Get pod annotation and so on
_, _, err := k8s.TryLoadPodDelegates(k8sArgs, in, kubeClient)
_, _, _, err := k8s.TryLoadPodDelegates(k8sArgs, in, kubeClient)
if err != nil {
if len(in.Delegates) == 0 {
// No delegate available so send error

View File

@ -16,6 +16,7 @@
package main
import (
"bytes"
"context"
"encoding/json"
"fmt"
@ -25,7 +26,6 @@ import (
"reflect"
"strings"
"testing"
"bytes"
"github.com/containernetworking/cni/pkg/skel"
cnitypes "github.com/containernetworking/cni/pkg/types"
@ -40,6 +40,7 @@ import (
"github.com/intel/multus-cni/types"
netfake "github.com/k8snetworkplumbingwg/network-attachment-definition-client/pkg/client/clientset/versioned/fake"
"k8s.io/client-go/kubernetes/fake"
"k8s.io/client-go/tools/record"
. "github.com/onsi/ginkgo"
. "github.com/onsi/gomega"
@ -149,12 +150,12 @@ func (f *fakeExec) ExecPlugin(ctx context.Context, pluginPath string, stdinData
dec := json.NewDecoder(reader)
enc := json.NewEncoder(writer)
err = dec.Decode(&m)
err = dec.Decode(&m)
Expect(err).NotTo(HaveOccurred())
for k := range m {
if k == "prevResult" {
delete(m, k)
}
if k == "prevResult" {
delete(m, k)
}
}
err = enc.Encode(&m)
Expect(err).NotTo(HaveOccurred())
@ -190,11 +191,26 @@ func (f *fakeExec) FindInPath(plugin string, paths []string) (string, error) {
// NewFakeClientInfo returns fake client (just for testing)
func NewFakeClientInfo() *k8sclient.ClientInfo {
return &k8sclient.ClientInfo{
Client: fake.NewSimpleClientset(),
NetClient: netfake.NewSimpleClientset().K8sCniCncfIoV1(),
Client: fake.NewSimpleClientset(),
NetClient: netfake.NewSimpleClientset().K8sCniCncfIoV1(),
EventRecorder: record.NewFakeRecorder(10),
}
}
func collectEvents(source <-chan string) []string {
done := false
events := make([]string, 0)
for !done {
select {
case ev := <-source:
events = append(events, ev)
default:
done = true
}
}
return events
}
var _ = Describe("multus operations cniVersion 0.2.0 config", func() {
var testNS ns.NetNS
var tmpDir string
@ -1210,7 +1226,7 @@ var _ = Describe("multus operations cniVersion 0.2.0 config", func() {
_, err = cmdAdd(args, fExec, nil)
Expect(fExec.addIndex).To(Equal(2))
Expect(fExec.delIndex).To(Equal(2))
Expect(err).To(MatchError("Multus: [/]: error adding container to network \"other1\": delegateAdd: error invoking DelegateAdd - \"other-plugin\": error in getting result from AddNetwork: expected plugin failure"))
Expect(err).To(MatchError("Multus: [/]: error adding container to network \"other1\": delegateAdd: error invoking confAdd - \"other-plugin\": error in getting result from AddNetwork: expected plugin failure"))
// Cleanup default network file.
if _, errStat := os.Stat(configPath); errStat == nil {
@ -1391,10 +1407,9 @@ var _ = Describe("multus operations cniVersion 0.2.0 config", func() {
r := result.(*current.Result)
// plugin 1 is the masterplugin
Expect(reflect.DeepEqual(r, expectedResult1)).To(BeTrue())
})
It("executes delegates and kubernetes networks", func() {
It("executes delegates and kubernetes networks with events check", func() {
fakePod := testhelpers.NewFakePod("testpod", "net1,net2", "")
net1 := `{
"name": "net1",
@ -1477,6 +1492,13 @@ var _ = Describe("multus operations cniVersion 0.2.0 config", func() {
r := result.(*types020.Result)
// plugin 1 is the masterplugin
Expect(reflect.DeepEqual(r, expectedResult1)).To(BeTrue())
recorder := clientInfo.EventRecorder.(*record.FakeRecorder)
events := collectEvents(recorder.Events)
Expect(len(events)).To(Equal(3))
Expect(events[0]).To(Equal("Normal AddedInterface Add eth0 [1.1.1.2/24]"))
Expect(events[1]).To(Equal("Normal AddedInterface Add net1 [1.1.1.3/24] from net1"))
Expect(events[2]).To(Equal("Normal AddedInterface Add net2 [1.1.1.4/24] from net2"))
})
It("executes kubernetes networks and delete it after pod removal", func() {
@ -2442,7 +2464,7 @@ var _ = Describe("multus operations cniVersion 0.4.0 config", func() {
_, err = cmdAdd(args, fExec, nil)
Expect(fExec.addIndex).To(Equal(2))
Expect(fExec.delIndex).To(Equal(2))
Expect(err).To(MatchError("Multus: [/]: error adding container to network \"other1\": delegateAdd: error invoking DelegateAdd - \"other-plugin\": error in getting result from AddNetwork: expected plugin failure"))
Expect(err).To(MatchError("Multus: [/]: error adding container to network \"other1\": delegateAdd: error invoking confAdd - \"other-plugin\": error in getting result from AddNetwork: expected plugin failure"))
// Cleanup default network file.
if _, errStat := os.Stat(configPath); errStat == nil {

View File

@ -96,6 +96,9 @@ func LoadDelegateNetConf(bytes []byte, net *NetworkSelectionElement, deviceID st
}
if net != nil {
if net.Name != "" {
delegateConf.Name = net.Name
}
if net.InterfaceRequest != "" {
delegateConf.IfnameRequest = net.InterfaceRequest
}

View File

@ -94,6 +94,7 @@ type NetworkStatus struct {
type DelegateNetConf struct {
Conf types.NetConf
ConfList types.NetConfList
Name string
IfnameRequest string `json:"ifnameRequest,omitempty"`
MacRequest string `json:"macRequest,omitempty"`
IPRequest []string `json:"ipRequest,omitempty"`

191
vendor/github.com/golang/groupcache/LICENSE generated vendored Normal file
View File

@ -0,0 +1,191 @@
Apache License
Version 2.0, January 2004
http://www.apache.org/licenses/
TERMS AND CONDITIONS FOR USE, REPRODUCTION, AND DISTRIBUTION
1. Definitions.
"License" shall mean the terms and conditions for use, reproduction, and
distribution as defined by Sections 1 through 9 of this document.
"Licensor" shall mean the copyright owner or entity authorized by the copyright
owner that is granting the License.
"Legal Entity" shall mean the union of the acting entity and all other entities
that control, are controlled by, or are under common control with that entity.
For the purposes of this definition, "control" means (i) the power, direct or
indirect, to cause the direction or management of such entity, whether by
contract or otherwise, or (ii) ownership of fifty percent (50%) or more of the
outstanding shares, or (iii) beneficial ownership of such entity.
"You" (or "Your") shall mean an individual or Legal Entity exercising
permissions granted by this License.
"Source" form shall mean the preferred form for making modifications, including
but not limited to software source code, documentation source, and configuration
files.
"Object" form shall mean any form resulting from mechanical transformation or
translation of a Source form, including but not limited to compiled object code,
generated documentation, and conversions to other media types.
"Work" shall mean the work of authorship, whether in Source or Object form, made
available under the License, as indicated by a copyright notice that is included
in or attached to the work (an example is provided in the Appendix below).
"Derivative Works" shall mean any work, whether in Source or Object form, that
is based on (or derived from) the Work and for which the editorial revisions,
annotations, elaborations, or other modifications represent, as a whole, an
original work of authorship. For the purposes of this License, Derivative Works
shall not include works that remain separable from, or merely link (or bind by
name) to the interfaces of, the Work and Derivative Works thereof.
"Contribution" shall mean any work of authorship, including the original version
of the Work and any modifications or additions to that Work or Derivative Works
thereof, that is intentionally submitted to Licensor for inclusion in the Work
by the copyright owner or by an individual or Legal Entity authorized to submit
on behalf of the copyright owner. For the purposes of this definition,
"submitted" means any form of electronic, verbal, or written communication sent
to the Licensor or its representatives, including but not limited to
communication on electronic mailing lists, source code control systems, and
issue tracking systems that are managed by, or on behalf of, the Licensor for
the purpose of discussing and improving the Work, but excluding communication
that is conspicuously marked or otherwise designated in writing by the copyright
owner as "Not a Contribution."
"Contributor" shall mean Licensor and any individual or Legal Entity on behalf
of whom a Contribution has been received by Licensor and subsequently
incorporated within the Work.
2. Grant of Copyright License.
Subject to the terms and conditions of this License, each Contributor hereby
grants to You a perpetual, worldwide, non-exclusive, no-charge, royalty-free,
irrevocable copyright license to reproduce, prepare Derivative Works of,
publicly display, publicly perform, sublicense, and distribute the Work and such
Derivative Works in Source or Object form.
3. Grant of Patent License.
Subject to the terms and conditions of this License, each Contributor hereby
grants to You a perpetual, worldwide, non-exclusive, no-charge, royalty-free,
irrevocable (except as stated in this section) patent license to make, have
made, use, offer to sell, sell, import, and otherwise transfer the Work, where
such license applies only to those patent claims licensable by such Contributor
that are necessarily infringed by their Contribution(s) alone or by combination
of their Contribution(s) with the Work to which such Contribution(s) was
submitted. If You institute patent litigation against any entity (including a
cross-claim or counterclaim in a lawsuit) alleging that the Work or a
Contribution incorporated within the Work constitutes direct or contributory
patent infringement, then any patent licenses granted to You under this License
for that Work shall terminate as of the date such litigation is filed.
4. Redistribution.
You may reproduce and distribute copies of the Work or Derivative Works thereof
in any medium, with or without modifications, and in Source or Object form,
provided that You meet the following conditions:
You must give any other recipients of the Work or Derivative Works a copy of
this License; and
You must cause any modified files to carry prominent notices stating that You
changed the files; and
You must retain, in the Source form of any Derivative Works that You distribute,
all copyright, patent, trademark, and attribution notices from the Source form
of the Work, excluding those notices that do not pertain to any part of the
Derivative Works; and
If the Work includes a "NOTICE" text file as part of its distribution, then any
Derivative Works that You distribute must include a readable copy of the
attribution notices contained within such NOTICE file, excluding those notices
that do not pertain to any part of the Derivative Works, in at least one of the
following places: within a NOTICE text file distributed as part of the
Derivative Works; within the Source form or documentation, if provided along
with the Derivative Works; or, within a display generated by the Derivative
Works, if and wherever such third-party notices normally appear. The contents of
the NOTICE file are for informational purposes only and do not modify the
License. You may add Your own attribution notices within Derivative Works that
You distribute, alongside or as an addendum to the NOTICE text from the Work,
provided that such additional attribution notices cannot be construed as
modifying the License.
You may add Your own copyright statement to Your modifications and may provide
additional or different license terms and conditions for use, reproduction, or
distribution of Your modifications, or for any such Derivative Works as a whole,
provided Your use, reproduction, and distribution of the Work otherwise complies
with the conditions stated in this License.
5. Submission of Contributions.
Unless You explicitly state otherwise, any Contribution intentionally submitted
for inclusion in the Work by You to the Licensor shall be under the terms and
conditions of this License, without any additional terms or conditions.
Notwithstanding the above, nothing herein shall supersede or modify the terms of
any separate license agreement you may have executed with Licensor regarding
such Contributions.
6. Trademarks.
This License does not grant permission to use the trade names, trademarks,
service marks, or product names of the Licensor, except as required for
reasonable and customary use in describing the origin of the Work and
reproducing the content of the NOTICE file.
7. Disclaimer of Warranty.
Unless required by applicable law or agreed to in writing, Licensor provides the
Work (and each Contributor provides its Contributions) on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied,
including, without limitation, any warranties or conditions of TITLE,
NON-INFRINGEMENT, MERCHANTABILITY, or FITNESS FOR A PARTICULAR PURPOSE. You are
solely responsible for determining the appropriateness of using or
redistributing the Work and assume any risks associated with Your exercise of
permissions under this License.
8. Limitation of Liability.
In no event and under no legal theory, whether in tort (including negligence),
contract, or otherwise, unless required by applicable law (such as deliberate
and grossly negligent acts) or agreed to in writing, shall any Contributor be
liable to You for damages, including any direct, indirect, special, incidental,
or consequential damages of any character arising as a result of this License or
out of the use or inability to use the Work (including but not limited to
damages for loss of goodwill, work stoppage, computer failure or malfunction, or
any and all other commercial damages or losses), even if such Contributor has
been advised of the possibility of such damages.
9. Accepting Warranty or Additional Liability.
While redistributing the Work or Derivative Works thereof, You may choose to
offer, and charge a fee for, acceptance of support, warranty, indemnity, or
other liability obligations and/or rights consistent with this License. However,
in accepting such obligations, You may act only on Your own behalf and on Your
sole responsibility, not on behalf of any other Contributor, and only if You
agree to indemnify, defend, and hold each Contributor harmless for any liability
incurred by, or claims asserted against, such Contributor by reason of your
accepting any such warranty or additional liability.
END OF TERMS AND CONDITIONS
APPENDIX: How to apply the Apache License to your work
To apply the Apache License to your work, attach the following boilerplate
notice, with the fields enclosed by brackets "[]" replaced with your own
identifying information. (Don't include the brackets!) The text should be
enclosed in the appropriate comment syntax for the file format. We also
recommend that a file or class name and description of purpose be included on
the same "printed page" as the copyright notice for easier identification within
third-party archives.
Copyright [yyyy] [name of copyright owner]
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.

133
vendor/github.com/golang/groupcache/lru/lru.go generated vendored Normal file
View File

@ -0,0 +1,133 @@
/*
Copyright 2013 Google Inc.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
// Package lru implements an LRU cache.
package lru
import "container/list"
// Cache is an LRU cache. It is not safe for concurrent access.
type Cache struct {
// MaxEntries is the maximum number of cache entries before
// an item is evicted. Zero means no limit.
MaxEntries int
// OnEvicted optionally specifies a callback function to be
// executed when an entry is purged from the cache.
OnEvicted func(key Key, value interface{})
ll *list.List
cache map[interface{}]*list.Element
}
// A Key may be any value that is comparable. See http://golang.org/ref/spec#Comparison_operators
type Key interface{}
type entry struct {
key Key
value interface{}
}
// New creates a new Cache.
// If maxEntries is zero, the cache has no limit and it's assumed
// that eviction is done by the caller.
func New(maxEntries int) *Cache {
return &Cache{
MaxEntries: maxEntries,
ll: list.New(),
cache: make(map[interface{}]*list.Element),
}
}
// Add adds a value to the cache.
func (c *Cache) Add(key Key, value interface{}) {
if c.cache == nil {
c.cache = make(map[interface{}]*list.Element)
c.ll = list.New()
}
if ee, ok := c.cache[key]; ok {
c.ll.MoveToFront(ee)
ee.Value.(*entry).value = value
return
}
ele := c.ll.PushFront(&entry{key, value})
c.cache[key] = ele
if c.MaxEntries != 0 && c.ll.Len() > c.MaxEntries {
c.RemoveOldest()
}
}
// Get looks up a key's value from the cache.
func (c *Cache) Get(key Key) (value interface{}, ok bool) {
if c.cache == nil {
return
}
if ele, hit := c.cache[key]; hit {
c.ll.MoveToFront(ele)
return ele.Value.(*entry).value, true
}
return
}
// Remove removes the provided key from the cache.
func (c *Cache) Remove(key Key) {
if c.cache == nil {
return
}
if ele, hit := c.cache[key]; hit {
c.removeElement(ele)
}
}
// RemoveOldest removes the oldest item from the cache.
func (c *Cache) RemoveOldest() {
if c.cache == nil {
return
}
ele := c.ll.Back()
if ele != nil {
c.removeElement(ele)
}
}
func (c *Cache) removeElement(e *list.Element) {
c.ll.Remove(e)
kv := e.Value.(*entry)
delete(c.cache, kv.key)
if c.OnEvicted != nil {
c.OnEvicted(kv.key, kv.value)
}
}
// Len returns the number of items in the cache.
func (c *Cache) Len() int {
if c.cache == nil {
return 0
}
return c.ll.Len()
}
// Clear purges all stored items from the cache.
func (c *Cache) Clear() {
if c.OnEvicted != nil {
for _, e := range c.cache {
kv := e.Value.(*entry)
c.OnEvicted(kv.key, kv.value)
}
}
c.ll = nil
c.cache = nil
}

27
vendor/k8s.io/client-go/tools/record/OWNERS generated vendored Normal file
View File

@ -0,0 +1,27 @@
reviewers:
- lavalamp
- smarterclayton
- wojtek-t
- deads2k
- derekwaynecarr
- caesarxuchao
- vishh
- mikedanese
- liggitt
- nikhiljindal
- erictune
- pmorie
- dchen1107
- saad-ali
- luxas
- yifan-gu
- eparis
- mwielgus
- timothysc
- jsafrane
- dims
- krousey
- a-robinson
- aveshagarwal
- resouer
- cjcullen

18
vendor/k8s.io/client-go/tools/record/doc.go generated vendored Normal file
View File

@ -0,0 +1,18 @@
/*
Copyright 2014 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
// Package record has all client logic for recording and reporting events.
package record // import "k8s.io/client-go/tools/record"

322
vendor/k8s.io/client-go/tools/record/event.go generated vendored Normal file
View File

@ -0,0 +1,322 @@
/*
Copyright 2014 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package record
import (
"fmt"
"math/rand"
"time"
"k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/util/clock"
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
"k8s.io/apimachinery/pkg/watch"
restclient "k8s.io/client-go/rest"
ref "k8s.io/client-go/tools/reference"
"net/http"
"k8s.io/klog"
)
const maxTriesPerEvent = 12
var defaultSleepDuration = 10 * time.Second
const maxQueuedEvents = 1000
// EventSink knows how to store events (client.Client implements it.)
// EventSink must respect the namespace that will be embedded in 'event'.
// It is assumed that EventSink will return the same sorts of errors as
// pkg/client's REST client.
type EventSink interface {
Create(event *v1.Event) (*v1.Event, error)
Update(event *v1.Event) (*v1.Event, error)
Patch(oldEvent *v1.Event, data []byte) (*v1.Event, error)
}
// EventRecorder knows how to record events on behalf of an EventSource.
type EventRecorder interface {
// Event constructs an event from the given information and puts it in the queue for sending.
// 'object' is the object this event is about. Event will make a reference-- or you may also
// pass a reference to the object directly.
// 'type' of this event, and can be one of Normal, Warning. New types could be added in future
// 'reason' is the reason this event is generated. 'reason' should be short and unique; it
// should be in UpperCamelCase format (starting with a capital letter). "reason" will be used
// to automate handling of events, so imagine people writing switch statements to handle them.
// You want to make that easy.
// 'message' is intended to be human readable.
//
// The resulting event will be created in the same namespace as the reference object.
Event(object runtime.Object, eventtype, reason, message string)
// Eventf is just like Event, but with Sprintf for the message field.
Eventf(object runtime.Object, eventtype, reason, messageFmt string, args ...interface{})
// PastEventf is just like Eventf, but with an option to specify the event's 'timestamp' field.
PastEventf(object runtime.Object, timestamp metav1.Time, eventtype, reason, messageFmt string, args ...interface{})
// AnnotatedEventf is just like eventf, but with annotations attached
AnnotatedEventf(object runtime.Object, annotations map[string]string, eventtype, reason, messageFmt string, args ...interface{})
}
// EventBroadcaster knows how to receive events and send them to any EventSink, watcher, or log.
type EventBroadcaster interface {
// StartEventWatcher starts sending events received from this EventBroadcaster to the given
// event handler function. The return value can be ignored or used to stop recording, if
// desired.
StartEventWatcher(eventHandler func(*v1.Event)) watch.Interface
// StartRecordingToSink starts sending events received from this EventBroadcaster to the given
// sink. The return value can be ignored or used to stop recording, if desired.
StartRecordingToSink(sink EventSink) watch.Interface
// StartLogging starts sending events received from this EventBroadcaster to the given logging
// function. The return value can be ignored or used to stop recording, if desired.
StartLogging(logf func(format string, args ...interface{})) watch.Interface
// NewRecorder returns an EventRecorder that can be used to send events to this EventBroadcaster
// with the event source set to the given event source.
NewRecorder(scheme *runtime.Scheme, source v1.EventSource) EventRecorder
}
// Creates a new event broadcaster.
func NewBroadcaster() EventBroadcaster {
return &eventBroadcasterImpl{watch.NewBroadcaster(maxQueuedEvents, watch.DropIfChannelFull), defaultSleepDuration}
}
func NewBroadcasterForTests(sleepDuration time.Duration) EventBroadcaster {
return &eventBroadcasterImpl{watch.NewBroadcaster(maxQueuedEvents, watch.DropIfChannelFull), sleepDuration}
}
type eventBroadcasterImpl struct {
*watch.Broadcaster
sleepDuration time.Duration
}
// StartRecordingToSink starts sending events received from the specified eventBroadcaster to the given sink.
// The return value can be ignored or used to stop recording, if desired.
// TODO: make me an object with parameterizable queue length and retry interval
func (eventBroadcaster *eventBroadcasterImpl) StartRecordingToSink(sink EventSink) watch.Interface {
// The default math/rand package functions aren't thread safe, so create a
// new Rand object for each StartRecording call.
randGen := rand.New(rand.NewSource(time.Now().UnixNano()))
eventCorrelator := NewEventCorrelator(clock.RealClock{})
return eventBroadcaster.StartEventWatcher(
func(event *v1.Event) {
recordToSink(sink, event, eventCorrelator, randGen, eventBroadcaster.sleepDuration)
})
}
func recordToSink(sink EventSink, event *v1.Event, eventCorrelator *EventCorrelator, randGen *rand.Rand, sleepDuration time.Duration) {
// Make a copy before modification, because there could be multiple listeners.
// Events are safe to copy like this.
eventCopy := *event
event = &eventCopy
result, err := eventCorrelator.EventCorrelate(event)
if err != nil {
utilruntime.HandleError(err)
}
if result.Skip {
return
}
tries := 0
for {
if recordEvent(sink, result.Event, result.Patch, result.Event.Count > 1, eventCorrelator) {
break
}
tries++
if tries >= maxTriesPerEvent {
klog.Errorf("Unable to write event '%#v' (retry limit exceeded!)", event)
break
}
// Randomize the first sleep so that various clients won't all be
// synced up if the master goes down.
if tries == 1 {
time.Sleep(time.Duration(float64(sleepDuration) * randGen.Float64()))
} else {
time.Sleep(sleepDuration)
}
}
}
func isKeyNotFoundError(err error) bool {
statusErr, _ := err.(*errors.StatusError)
if statusErr != nil && statusErr.Status().Code == http.StatusNotFound {
return true
}
return false
}
// recordEvent attempts to write event to a sink. It returns true if the event
// was successfully recorded or discarded, false if it should be retried.
// If updateExistingEvent is false, it creates a new event, otherwise it updates
// existing event.
func recordEvent(sink EventSink, event *v1.Event, patch []byte, updateExistingEvent bool, eventCorrelator *EventCorrelator) bool {
var newEvent *v1.Event
var err error
if updateExistingEvent {
newEvent, err = sink.Patch(event, patch)
}
// Update can fail because the event may have been removed and it no longer exists.
if !updateExistingEvent || (updateExistingEvent && isKeyNotFoundError(err)) {
// Making sure that ResourceVersion is empty on creation
event.ResourceVersion = ""
newEvent, err = sink.Create(event)
}
if err == nil {
// we need to update our event correlator with the server returned state to handle name/resourceversion
eventCorrelator.UpdateState(newEvent)
return true
}
// If we can't contact the server, then hold everything while we keep trying.
// Otherwise, something about the event is malformed and we should abandon it.
switch err.(type) {
case *restclient.RequestConstructionError:
// We will construct the request the same next time, so don't keep trying.
klog.Errorf("Unable to construct event '%#v': '%v' (will not retry!)", event, err)
return true
case *errors.StatusError:
if errors.IsAlreadyExists(err) {
klog.V(5).Infof("Server rejected event '%#v': '%v' (will not retry!)", event, err)
} else {
klog.Errorf("Server rejected event '%#v': '%v' (will not retry!)", event, err)
}
return true
case *errors.UnexpectedObjectError:
// We don't expect this; it implies the server's response didn't match a
// known pattern. Go ahead and retry.
default:
// This case includes actual http transport errors. Go ahead and retry.
}
klog.Errorf("Unable to write event: '%v' (may retry after sleeping)", err)
return false
}
// StartLogging starts sending events received from this EventBroadcaster to the given logging function.
// The return value can be ignored or used to stop recording, if desired.
func (eventBroadcaster *eventBroadcasterImpl) StartLogging(logf func(format string, args ...interface{})) watch.Interface {
return eventBroadcaster.StartEventWatcher(
func(e *v1.Event) {
logf("Event(%#v): type: '%v' reason: '%v' %v", e.InvolvedObject, e.Type, e.Reason, e.Message)
})
}
// StartEventWatcher starts sending events received from this EventBroadcaster to the given event handler function.
// The return value can be ignored or used to stop recording, if desired.
func (eventBroadcaster *eventBroadcasterImpl) StartEventWatcher(eventHandler func(*v1.Event)) watch.Interface {
watcher := eventBroadcaster.Watch()
go func() {
defer utilruntime.HandleCrash()
for watchEvent := range watcher.ResultChan() {
event, ok := watchEvent.Object.(*v1.Event)
if !ok {
// This is all local, so there's no reason this should
// ever happen.
continue
}
eventHandler(event)
}
}()
return watcher
}
// NewRecorder returns an EventRecorder that records events with the given event source.
func (eventBroadcaster *eventBroadcasterImpl) NewRecorder(scheme *runtime.Scheme, source v1.EventSource) EventRecorder {
return &recorderImpl{scheme, source, eventBroadcaster.Broadcaster, clock.RealClock{}}
}
type recorderImpl struct {
scheme *runtime.Scheme
source v1.EventSource
*watch.Broadcaster
clock clock.Clock
}
func (recorder *recorderImpl) generateEvent(object runtime.Object, annotations map[string]string, timestamp metav1.Time, eventtype, reason, message string) {
ref, err := ref.GetReference(recorder.scheme, object)
if err != nil {
klog.Errorf("Could not construct reference to: '%#v' due to: '%v'. Will not report event: '%v' '%v' '%v'", object, err, eventtype, reason, message)
return
}
if !validateEventType(eventtype) {
klog.Errorf("Unsupported event type: '%v'", eventtype)
return
}
event := recorder.makeEvent(ref, annotations, eventtype, reason, message)
event.Source = recorder.source
go func() {
// NOTE: events should be a non-blocking operation
defer utilruntime.HandleCrash()
recorder.Action(watch.Added, event)
}()
}
func validateEventType(eventtype string) bool {
switch eventtype {
case v1.EventTypeNormal, v1.EventTypeWarning:
return true
}
return false
}
func (recorder *recorderImpl) Event(object runtime.Object, eventtype, reason, message string) {
recorder.generateEvent(object, nil, metav1.Now(), eventtype, reason, message)
}
func (recorder *recorderImpl) Eventf(object runtime.Object, eventtype, reason, messageFmt string, args ...interface{}) {
recorder.Event(object, eventtype, reason, fmt.Sprintf(messageFmt, args...))
}
func (recorder *recorderImpl) PastEventf(object runtime.Object, timestamp metav1.Time, eventtype, reason, messageFmt string, args ...interface{}) {
recorder.generateEvent(object, nil, timestamp, eventtype, reason, fmt.Sprintf(messageFmt, args...))
}
func (recorder *recorderImpl) AnnotatedEventf(object runtime.Object, annotations map[string]string, eventtype, reason, messageFmt string, args ...interface{}) {
recorder.generateEvent(object, annotations, metav1.Now(), eventtype, reason, fmt.Sprintf(messageFmt, args...))
}
func (recorder *recorderImpl) makeEvent(ref *v1.ObjectReference, annotations map[string]string, eventtype, reason, message string) *v1.Event {
t := metav1.Time{Time: recorder.clock.Now()}
namespace := ref.Namespace
if namespace == "" {
namespace = metav1.NamespaceDefault
}
return &v1.Event{
ObjectMeta: metav1.ObjectMeta{
Name: fmt.Sprintf("%v.%x", ref.Name, t.UnixNano()),
Namespace: namespace,
Annotations: annotations,
},
InvolvedObject: *ref,
Reason: reason,
Message: message,
FirstTimestamp: t,
LastTimestamp: t,
Count: 1,
Type: eventtype,
}
}

462
vendor/k8s.io/client-go/tools/record/events_cache.go generated vendored Normal file
View File

@ -0,0 +1,462 @@
/*
Copyright 2015 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package record
import (
"encoding/json"
"fmt"
"strings"
"sync"
"time"
"github.com/golang/groupcache/lru"
"k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/util/clock"
"k8s.io/apimachinery/pkg/util/sets"
"k8s.io/apimachinery/pkg/util/strategicpatch"
"k8s.io/client-go/util/flowcontrol"
)
const (
maxLruCacheEntries = 4096
// if we see the same event that varies only by message
// more than 10 times in a 10 minute period, aggregate the event
defaultAggregateMaxEvents = 10
defaultAggregateIntervalInSeconds = 600
// by default, allow a source to send 25 events about an object
// but control the refill rate to 1 new event every 5 minutes
// this helps control the long-tail of events for things that are always
// unhealthy
defaultSpamBurst = 25
defaultSpamQPS = 1. / 300.
)
// getEventKey builds unique event key based on source, involvedObject, reason, message
func getEventKey(event *v1.Event) string {
return strings.Join([]string{
event.Source.Component,
event.Source.Host,
event.InvolvedObject.Kind,
event.InvolvedObject.Namespace,
event.InvolvedObject.Name,
event.InvolvedObject.FieldPath,
string(event.InvolvedObject.UID),
event.InvolvedObject.APIVersion,
event.Type,
event.Reason,
event.Message,
},
"")
}
// getSpamKey builds unique event key based on source, involvedObject
func getSpamKey(event *v1.Event) string {
return strings.Join([]string{
event.Source.Component,
event.Source.Host,
event.InvolvedObject.Kind,
event.InvolvedObject.Namespace,
event.InvolvedObject.Name,
string(event.InvolvedObject.UID),
event.InvolvedObject.APIVersion,
},
"")
}
// EventFilterFunc is a function that returns true if the event should be skipped
type EventFilterFunc func(event *v1.Event) bool
// EventSourceObjectSpamFilter is responsible for throttling
// the amount of events a source and object can produce.
type EventSourceObjectSpamFilter struct {
sync.RWMutex
// the cache that manages last synced state
cache *lru.Cache
// burst is the amount of events we allow per source + object
burst int
// qps is the refill rate of the token bucket in queries per second
qps float32
// clock is used to allow for testing over a time interval
clock clock.Clock
}
// NewEventSourceObjectSpamFilter allows burst events from a source about an object with the specified qps refill.
func NewEventSourceObjectSpamFilter(lruCacheSize, burst int, qps float32, clock clock.Clock) *EventSourceObjectSpamFilter {
return &EventSourceObjectSpamFilter{
cache: lru.New(lruCacheSize),
burst: burst,
qps: qps,
clock: clock,
}
}
// spamRecord holds data used to perform spam filtering decisions.
type spamRecord struct {
// rateLimiter controls the rate of events about this object
rateLimiter flowcontrol.RateLimiter
}
// Filter controls that a given source+object are not exceeding the allowed rate.
func (f *EventSourceObjectSpamFilter) Filter(event *v1.Event) bool {
var record spamRecord
// controls our cached information about this event (source+object)
eventKey := getSpamKey(event)
// do we have a record of similar events in our cache?
f.Lock()
defer f.Unlock()
value, found := f.cache.Get(eventKey)
if found {
record = value.(spamRecord)
}
// verify we have a rate limiter for this record
if record.rateLimiter == nil {
record.rateLimiter = flowcontrol.NewTokenBucketRateLimiterWithClock(f.qps, f.burst, f.clock)
}
// ensure we have available rate
filter := !record.rateLimiter.TryAccept()
// update the cache
f.cache.Add(eventKey, record)
return filter
}
// EventAggregatorKeyFunc is responsible for grouping events for aggregation
// It returns a tuple of the following:
// aggregateKey - key the identifies the aggregate group to bucket this event
// localKey - key that makes this event in the local group
type EventAggregatorKeyFunc func(event *v1.Event) (aggregateKey string, localKey string)
// EventAggregatorByReasonFunc aggregates events by exact match on event.Source, event.InvolvedObject, event.Type and event.Reason
func EventAggregatorByReasonFunc(event *v1.Event) (string, string) {
return strings.Join([]string{
event.Source.Component,
event.Source.Host,
event.InvolvedObject.Kind,
event.InvolvedObject.Namespace,
event.InvolvedObject.Name,
string(event.InvolvedObject.UID),
event.InvolvedObject.APIVersion,
event.Type,
event.Reason,
},
""), event.Message
}
// EventAggregatorMessageFunc is responsible for producing an aggregation message
type EventAggregatorMessageFunc func(event *v1.Event) string
// EventAggregratorByReasonMessageFunc returns an aggregate message by prefixing the incoming message
func EventAggregatorByReasonMessageFunc(event *v1.Event) string {
return "(combined from similar events): " + event.Message
}
// EventAggregator identifies similar events and aggregates them into a single event
type EventAggregator struct {
sync.RWMutex
// The cache that manages aggregation state
cache *lru.Cache
// The function that groups events for aggregation
keyFunc EventAggregatorKeyFunc
// The function that generates a message for an aggregate event
messageFunc EventAggregatorMessageFunc
// The maximum number of events in the specified interval before aggregation occurs
maxEvents uint
// The amount of time in seconds that must transpire since the last occurrence of a similar event before it's considered new
maxIntervalInSeconds uint
// clock is used to allow for testing over a time interval
clock clock.Clock
}
// NewEventAggregator returns a new instance of an EventAggregator
func NewEventAggregator(lruCacheSize int, keyFunc EventAggregatorKeyFunc, messageFunc EventAggregatorMessageFunc,
maxEvents int, maxIntervalInSeconds int, clock clock.Clock) *EventAggregator {
return &EventAggregator{
cache: lru.New(lruCacheSize),
keyFunc: keyFunc,
messageFunc: messageFunc,
maxEvents: uint(maxEvents),
maxIntervalInSeconds: uint(maxIntervalInSeconds),
clock: clock,
}
}
// aggregateRecord holds data used to perform aggregation decisions
type aggregateRecord struct {
// we track the number of unique local keys we have seen in the aggregate set to know when to actually aggregate
// if the size of this set exceeds the max, we know we need to aggregate
localKeys sets.String
// The last time at which the aggregate was recorded
lastTimestamp metav1.Time
}
// EventAggregate checks if a similar event has been seen according to the
// aggregation configuration (max events, max interval, etc) and returns:
//
// - The (potentially modified) event that should be created
// - The cache key for the event, for correlation purposes. This will be set to
// the full key for normal events, and to the result of
// EventAggregatorMessageFunc for aggregate events.
func (e *EventAggregator) EventAggregate(newEvent *v1.Event) (*v1.Event, string) {
now := metav1.NewTime(e.clock.Now())
var record aggregateRecord
// eventKey is the full cache key for this event
eventKey := getEventKey(newEvent)
// aggregateKey is for the aggregate event, if one is needed.
aggregateKey, localKey := e.keyFunc(newEvent)
// Do we have a record of similar events in our cache?
e.Lock()
defer e.Unlock()
value, found := e.cache.Get(aggregateKey)
if found {
record = value.(aggregateRecord)
}
// Is the previous record too old? If so, make a fresh one. Note: if we didn't
// find a similar record, its lastTimestamp will be the zero value, so we
// create a new one in that case.
maxInterval := time.Duration(e.maxIntervalInSeconds) * time.Second
interval := now.Time.Sub(record.lastTimestamp.Time)
if interval > maxInterval {
record = aggregateRecord{localKeys: sets.NewString()}
}
// Write the new event into the aggregation record and put it on the cache
record.localKeys.Insert(localKey)
record.lastTimestamp = now
e.cache.Add(aggregateKey, record)
// If we are not yet over the threshold for unique events, don't correlate them
if uint(record.localKeys.Len()) < e.maxEvents {
return newEvent, eventKey
}
// do not grow our local key set any larger than max
record.localKeys.PopAny()
// create a new aggregate event, and return the aggregateKey as the cache key
// (so that it can be overwritten.)
eventCopy := &v1.Event{
ObjectMeta: metav1.ObjectMeta{
Name: fmt.Sprintf("%v.%x", newEvent.InvolvedObject.Name, now.UnixNano()),
Namespace: newEvent.Namespace,
},
Count: 1,
FirstTimestamp: now,
InvolvedObject: newEvent.InvolvedObject,
LastTimestamp: now,
Message: e.messageFunc(newEvent),
Type: newEvent.Type,
Reason: newEvent.Reason,
Source: newEvent.Source,
}
return eventCopy, aggregateKey
}
// eventLog records data about when an event was observed
type eventLog struct {
// The number of times the event has occurred since first occurrence.
count uint
// The time at which the event was first recorded.
firstTimestamp metav1.Time
// The unique name of the first occurrence of this event
name string
// Resource version returned from previous interaction with server
resourceVersion string
}
// eventLogger logs occurrences of an event
type eventLogger struct {
sync.RWMutex
cache *lru.Cache
clock clock.Clock
}
// newEventLogger observes events and counts their frequencies
func newEventLogger(lruCacheEntries int, clock clock.Clock) *eventLogger {
return &eventLogger{cache: lru.New(lruCacheEntries), clock: clock}
}
// eventObserve records an event, or updates an existing one if key is a cache hit
func (e *eventLogger) eventObserve(newEvent *v1.Event, key string) (*v1.Event, []byte, error) {
var (
patch []byte
err error
)
eventCopy := *newEvent
event := &eventCopy
e.Lock()
defer e.Unlock()
// Check if there is an existing event we should update
lastObservation := e.lastEventObservationFromCache(key)
// If we found a result, prepare a patch
if lastObservation.count > 0 {
// update the event based on the last observation so patch will work as desired
event.Name = lastObservation.name
event.ResourceVersion = lastObservation.resourceVersion
event.FirstTimestamp = lastObservation.firstTimestamp
event.Count = int32(lastObservation.count) + 1
eventCopy2 := *event
eventCopy2.Count = 0
eventCopy2.LastTimestamp = metav1.NewTime(time.Unix(0, 0))
eventCopy2.Message = ""
newData, _ := json.Marshal(event)
oldData, _ := json.Marshal(eventCopy2)
patch, err = strategicpatch.CreateTwoWayMergePatch(oldData, newData, event)
}
// record our new observation
e.cache.Add(
key,
eventLog{
count: uint(event.Count),
firstTimestamp: event.FirstTimestamp,
name: event.Name,
resourceVersion: event.ResourceVersion,
},
)
return event, patch, err
}
// updateState updates its internal tracking information based on latest server state
func (e *eventLogger) updateState(event *v1.Event) {
key := getEventKey(event)
e.Lock()
defer e.Unlock()
// record our new observation
e.cache.Add(
key,
eventLog{
count: uint(event.Count),
firstTimestamp: event.FirstTimestamp,
name: event.Name,
resourceVersion: event.ResourceVersion,
},
)
}
// lastEventObservationFromCache returns the event from the cache, reads must be protected via external lock
func (e *eventLogger) lastEventObservationFromCache(key string) eventLog {
value, ok := e.cache.Get(key)
if ok {
observationValue, ok := value.(eventLog)
if ok {
return observationValue
}
}
return eventLog{}
}
// EventCorrelator processes all incoming events and performs analysis to avoid overwhelming the system. It can filter all
// incoming events to see if the event should be filtered from further processing. It can aggregate similar events that occur
// frequently to protect the system from spamming events that are difficult for users to distinguish. It performs de-duplication
// to ensure events that are observed multiple times are compacted into a single event with increasing counts.
type EventCorrelator struct {
// the function to filter the event
filterFunc EventFilterFunc
// the object that performs event aggregation
aggregator *EventAggregator
// the object that observes events as they come through
logger *eventLogger
}
// EventCorrelateResult is the result of a Correlate
type EventCorrelateResult struct {
// the event after correlation
Event *v1.Event
// if provided, perform a strategic patch when updating the record on the server
Patch []byte
// if true, do no further processing of the event
Skip bool
}
// NewEventCorrelator returns an EventCorrelator configured with default values.
//
// The EventCorrelator is responsible for event filtering, aggregating, and counting
// prior to interacting with the API server to record the event.
//
// The default behavior is as follows:
// * Aggregation is performed if a similar event is recorded 10 times in a
// in a 10 minute rolling interval. A similar event is an event that varies only by
// the Event.Message field. Rather than recording the precise event, aggregation
// will create a new event whose message reports that it has combined events with
// the same reason.
// * Events are incrementally counted if the exact same event is encountered multiple
// times.
// * A source may burst 25 events about an object, but has a refill rate budget
// per object of 1 event every 5 minutes to control long-tail of spam.
func NewEventCorrelator(clock clock.Clock) *EventCorrelator {
cacheSize := maxLruCacheEntries
spamFilter := NewEventSourceObjectSpamFilter(cacheSize, defaultSpamBurst, defaultSpamQPS, clock)
return &EventCorrelator{
filterFunc: spamFilter.Filter,
aggregator: NewEventAggregator(
cacheSize,
EventAggregatorByReasonFunc,
EventAggregatorByReasonMessageFunc,
defaultAggregateMaxEvents,
defaultAggregateIntervalInSeconds,
clock),
logger: newEventLogger(cacheSize, clock),
}
}
// EventCorrelate filters, aggregates, counts, and de-duplicates all incoming events
func (c *EventCorrelator) EventCorrelate(newEvent *v1.Event) (*EventCorrelateResult, error) {
if newEvent == nil {
return nil, fmt.Errorf("event is nil")
}
aggregateEvent, ckey := c.aggregator.EventAggregate(newEvent)
observedEvent, patch, err := c.logger.eventObserve(aggregateEvent, ckey)
if c.filterFunc(observedEvent) {
return &EventCorrelateResult{Skip: true}, nil
}
return &EventCorrelateResult{Event: observedEvent, Patch: patch}, err
}
// UpdateState based on the latest observed state from server
func (c *EventCorrelator) UpdateState(event *v1.Event) {
c.logger.updateState(event)
}

58
vendor/k8s.io/client-go/tools/record/fake.go generated vendored Normal file
View File

@ -0,0 +1,58 @@
/*
Copyright 2015 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package record
import (
"fmt"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
)
// FakeRecorder is used as a fake during tests. It is thread safe. It is usable
// when created manually and not by NewFakeRecorder, however all events may be
// thrown away in this case.
type FakeRecorder struct {
Events chan string
}
func (f *FakeRecorder) Event(object runtime.Object, eventtype, reason, message string) {
if f.Events != nil {
f.Events <- fmt.Sprintf("%s %s %s", eventtype, reason, message)
}
}
func (f *FakeRecorder) Eventf(object runtime.Object, eventtype, reason, messageFmt string, args ...interface{}) {
if f.Events != nil {
f.Events <- fmt.Sprintf(eventtype+" "+reason+" "+messageFmt, args...)
}
}
func (f *FakeRecorder) PastEventf(object runtime.Object, timestamp metav1.Time, eventtype, reason, messageFmt string, args ...interface{}) {
}
func (f *FakeRecorder) AnnotatedEventf(object runtime.Object, annotations map[string]string, eventtype, reason, messageFmt string, args ...interface{}) {
f.Eventf(object, eventtype, reason, messageFmt, args)
}
// NewFakeRecorder creates new fake event recorder with event channel with
// buffer of given size.
func NewFakeRecorder(bufferSize int) *FakeRecorder {
return &FakeRecorder{
Events: make(chan string, bufferSize),
}
}

3
vendor/modules.txt vendored
View File

@ -21,6 +21,8 @@ github.com/gogo/protobuf/gogoproto
github.com/gogo/protobuf/proto
github.com/gogo/protobuf/protoc-gen-gogo/descriptor
github.com/gogo/protobuf/sortkeys
# github.com/golang/groupcache v0.0.0-20190702054246-869f871628b6
github.com/golang/groupcache/lru
# github.com/golang/protobuf v1.3.2
github.com/golang/protobuf/proto
github.com/golang/protobuf/ptypes
@ -356,6 +358,7 @@ k8s.io/client-go/tools/clientcmd/api
k8s.io/client-go/tools/clientcmd/api/latest
k8s.io/client-go/tools/clientcmd/api/v1
k8s.io/client-go/tools/metrics
k8s.io/client-go/tools/record
k8s.io/client-go/tools/reference
k8s.io/client-go/transport
k8s.io/client-go/util/cert