diff --git a/go.mod b/go.mod index 0a326de53..28078728e 100644 --- a/go.mod +++ b/go.mod @@ -28,5 +28,6 @@ require ( k8s.io/code-generator v0.17.2 // indirect k8s.io/gengo v0.0.0-20200127102705-1e9b17e831be // indirect k8s.io/kube-openapi v0.0.0-20200121204235-bf4fb3bd569c // indirect + k8s.io/klog v1.0.0 k8s.io/kubernetes v1.13.0 ) diff --git a/go.sum b/go.sum index a26a24b03..e4c4ce8e0 100644 --- a/go.sum +++ b/go.sum @@ -73,6 +73,7 @@ github.com/gogo/protobuf v1.3.0/go.mod h1:SlYgWuQ5SjCEi6WLHjHCa1yvBfUnHcTbrrZtXP github.com/golang/freetype v0.0.0-20170609003504-e2365dfdc4a0/go.mod h1:E/TSTwGwJL78qG/PmXZO1EjYhfJinVAhrmmHX6Z8B9k= github.com/golang/glog v0.0.0-20160126235308-23def4e6c14b h1:VKtxabqXZkF25pY9ekfRL6a582T4P37/31XEstQ5p58= github.com/golang/glog v0.0.0-20160126235308-23def4e6c14b/go.mod h1:SBH7ygxi8pfUlaOkMMuAQtPIUF8ecWP5IEl/CR7VP2Q= +github.com/golang/groupcache v0.0.0-20190702054246-869f871628b6 h1:ZgQEtGgCBiWRM39fZuwSd1LwSqqSW0hOdXCYYDX0R3I= github.com/golang/groupcache v0.0.0-20190702054246-869f871628b6/go.mod h1:cIg4eruTrX1D+g88fzRXU5OdNfaM+9IcxsU14FzY7Hc= github.com/golang/mock v1.1.1/go.mod h1:oTYuIxOrZwtPieC+H1uAHpcLFnEyAGVDL/k47Jfbm0A= github.com/golang/protobuf v0.0.0-20161109072736-4bd1920723d7/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U= diff --git a/images/multus-daemonset-crio-pre1.16.yml b/images/multus-daemonset-crio-pre1.16.yml index 313080f9e..4b8a02c72 100644 --- a/images/multus-daemonset-crio-pre1.16.yml +++ b/images/multus-daemonset-crio-pre1.16.yml @@ -39,6 +39,15 @@ rules: verbs: - get - update + - apiGroups: + - "" + - events.k8s.io + resources: + - events + verbs: + - create + - patch + - update --- kind: ClusterRoleBinding apiVersion: rbac.authorization.k8s.io/v1beta1 diff --git a/images/multus-daemonset-crio.yml b/images/multus-daemonset-crio.yml index 7aad5ffae..f9a11901e 100644 --- a/images/multus-daemonset-crio.yml +++ b/images/multus-daemonset-crio.yml @@ -44,6 +44,15 @@ rules: verbs: - get - update + - apiGroups: + - "" + - events.k8s.io + resources: + - events + verbs: + - create + - patch + - update --- kind: ClusterRoleBinding apiVersion: rbac.authorization.k8s.io/v1 diff --git a/images/multus-daemonset-pre-1.16.yml b/images/multus-daemonset-pre-1.16.yml index 93c1351d5..36b1047af 100644 --- a/images/multus-daemonset-pre-1.16.yml +++ b/images/multus-daemonset-pre-1.16.yml @@ -39,6 +39,15 @@ rules: verbs: - get - update + - apiGroups: + - "" + - events.k8s.io + resources: + - events + verbs: + - create + - patch + - update --- kind: ClusterRoleBinding apiVersion: rbac.authorization.k8s.io/v1beta1 diff --git a/images/multus-daemonset.yml b/images/multus-daemonset.yml index a58750d95..3649136ae 100644 --- a/images/multus-daemonset.yml +++ b/images/multus-daemonset.yml @@ -44,6 +44,15 @@ rules: verbs: - get - update + - apiGroups: + - "" + - events.k8s.io + resources: + - events + verbs: + - create + - patch + - update --- kind: ClusterRoleBinding apiVersion: rbac.authorization.k8s.io/v1 diff --git a/k8sclient/k8sclient.go b/k8sclient/k8sclient.go index 4b2a94fbd..55a6db88c 100644 --- a/k8sclient/k8sclient.go +++ b/k8sclient/k8sclient.go @@ -24,9 +24,14 @@ import ( v1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/runtime" "k8s.io/client-go/kubernetes" + "k8s.io/client-go/kubernetes/scheme" + v1core "k8s.io/client-go/kubernetes/typed/core/v1" "k8s.io/client-go/rest" "k8s.io/client-go/tools/clientcmd" + "k8s.io/client-go/tools/record" + "k8s.io/klog" "github.com/containernetworking/cni/libcni" "github.com/containernetworking/cni/pkg/skel" @@ -52,8 +57,10 @@ type NoK8sNetworkError struct { // ClientInfo contains information given from k8s client type ClientInfo struct { - Client kubernetes.Interface - NetClient netclient.K8sCniCncfIoV1Interface + Client kubernetes.Interface + NetClient netclient.K8sCniCncfIoV1Interface + EventBroadcaster record.EventBroadcaster + EventRecorder record.EventRecorder } // AddPod adds pod into kubernetes @@ -76,6 +83,13 @@ func (c *ClientInfo) AddNetAttachDef(netattach *nettypes.NetworkAttachmentDefini return c.NetClient.NetworkAttachmentDefinitions(netattach.ObjectMeta.Namespace).Create(netattach) } +// Eventf puts event into kubernetes events +func (c *ClientInfo) Eventf(object runtime.Object, eventtype, reason, messageFmt string, args ...interface{}) { + if c != nil && c.EventRecorder != nil { + c.EventRecorder.Eventf(object, eventtype, reason, messageFmt, args...) + } +} + func (e *NoK8sNetworkError) Error() string { return string(e.message) } // SetNetworkStatus sets network status into Pod annotation @@ -282,33 +296,33 @@ func GetK8sArgs(args *skel.CmdArgs) (*types.K8sArgs, error) { // TryLoadPodDelegates attempts to load Kubernetes-defined delegates and add them to the Multus config. // Returns the number of Kubernetes-defined delegates added or an error. -func TryLoadPodDelegates(k8sArgs *types.K8sArgs, conf *types.NetConf, clientInfo *ClientInfo) (int, *ClientInfo, error) { +func TryLoadPodDelegates(k8sArgs *types.K8sArgs, conf *types.NetConf, clientInfo *ClientInfo) (int, *v1.Pod, *ClientInfo, error) { var err error logging.Debugf("TryLoadPodDelegates: %v, %v, %v", k8sArgs, conf, clientInfo) clientInfo, err = GetK8sClient(conf.Kubeconfig, clientInfo) if err != nil { - return 0, nil, err + return 0, nil, nil, err } if clientInfo == nil { if len(conf.Delegates) == 0 { // No available kube client and no delegates, we can't do anything - return 0, nil, logging.Errorf("TryLoadPodDelegates: must have either Kubernetes config or delegates") + return 0, nil, nil, logging.Errorf("TryLoadPodDelegates: must have either Kubernetes config or delegates") } - return 0, nil, nil + return 0, nil, nil, nil } // Get the pod info. If cannot get it, we use cached delegates pod, err := clientInfo.GetPod(string(k8sArgs.K8S_POD_NAMESPACE), string(k8sArgs.K8S_POD_NAME)) if err != nil { logging.Debugf("TryLoadPodDelegates: Err in loading K8s cluster default network from pod annotation: %v, use cached delegates", err) - return 0, nil, nil + return 0, nil, nil, nil } delegate, err := tryLoadK8sPodDefaultNetwork(clientInfo, pod, conf) if err != nil { - return 0, nil, logging.Errorf("TryLoadPodDelegates: error in loading K8s cluster default network from pod annotation: %v", err) + return 0, nil, nil, logging.Errorf("TryLoadPodDelegates: error in loading K8s cluster default network from pod annotation: %v", err) } if delegate != nil { logging.Debugf("TryLoadPodDelegates: Overwrite the cluster default network with %v from pod annotations", delegate) @@ -322,13 +336,13 @@ func TryLoadPodDelegates(k8sArgs *types.K8sArgs, conf *types.NetConf, clientInfo if err != nil { if _, ok := err.(*NoK8sNetworkError); ok { - return 0, clientInfo, nil + return 0, nil, clientInfo, nil } - return 0, nil, logging.Errorf("TryLoadPodDelegates: error in getting k8s network from pod: %v", err) + return 0, nil, nil, logging.Errorf("TryLoadPodDelegates: error in getting k8s network from pod: %v", err) } if err = conf.AddDelegates(delegates); err != nil { - return 0, nil, err + return 0, nil, nil, err } // Check gatewayRequest is configured in delegates @@ -345,10 +359,10 @@ func TryLoadPodDelegates(k8sArgs *types.K8sArgs, conf *types.NetConf, clientInfo types.CheckGatewayConfig(conf.Delegates) } - return len(delegates), clientInfo, nil + return len(delegates), pod, clientInfo, nil } - return 0, clientInfo, nil + return 0, pod, clientInfo, nil } // GetK8sClient gets client info from kubeconfig @@ -396,9 +410,16 @@ func GetK8sClient(kubeconfig string, kubeClient *ClientInfo) (*ClientInfo, error return nil, err } + broadcaster := record.NewBroadcaster() + broadcaster.StartLogging(klog.Infof) + broadcaster.StartRecordingToSink(&v1core.EventSinkImpl{Interface: client.CoreV1().Events("")}) + recorder := broadcaster.NewRecorder(scheme.Scheme, v1.EventSource{Component: "multus"}) + return &ClientInfo{ - Client: client, - NetClient: netclient, + Client: client, + NetClient: netclient, + EventBroadcaster: broadcaster, + EventRecorder: recorder, }, nil } diff --git a/k8sclient/k8sclient_test.go b/k8sclient/k8sclient_test.go index b02841c66..64eaa286d 100644 --- a/k8sclient/k8sclient_test.go +++ b/k8sclient/k8sclient_test.go @@ -617,8 +617,9 @@ var _ = Describe("k8sclient operations", func() { Expect(netConf.Delegates[0].Conf.Name).To(Equal("net2")) Expect(netConf.Delegates[0].Conf.Type).To(Equal("mynet2")) - numK8sDelegates, _, err := TryLoadPodDelegates(k8sArgs, netConf, clientInfo) + numK8sDelegates, pod, _, err := TryLoadPodDelegates(k8sArgs, netConf, clientInfo) Expect(err).NotTo(HaveOccurred()) + Expect(pod).NotTo(BeNil()) Expect(numK8sDelegates).To(Equal(0)) Expect(netConf.Delegates[0].Conf.Name).To(Equal("net1")) Expect(netConf.Delegates[0].Conf.Type).To(Equal("mynet1")) @@ -655,7 +656,7 @@ var _ = Describe("k8sclient operations", func() { Expect(err).To(HaveOccurred()) netConf.ConfDir = "badfilepath" - _, _, err = TryLoadPodDelegates(k8sArgs, netConf, clientInfo) + _, _, _, err = TryLoadPodDelegates(k8sArgs, netConf, clientInfo) Expect(err).To(HaveOccurred()) }) @@ -690,7 +691,7 @@ var _ = Describe("k8sclient operations", func() { k8sArgs, err := GetK8sArgs(args) Expect(err).NotTo(HaveOccurred()) - numK8sDelegates, _, err := TryLoadPodDelegates(k8sArgs, netConf, clientInfo) + numK8sDelegates, _, _, err := TryLoadPodDelegates(k8sArgs, netConf, clientInfo) Expect(err).NotTo(HaveOccurred()) Expect(numK8sDelegates).To(Equal(0)) Expect(netConf.Delegates[0].Conf.Name).To(Equal("net1")) @@ -727,7 +728,7 @@ var _ = Describe("k8sclient operations", func() { k8sArgs, err := GetK8sArgs(args) Expect(err).NotTo(HaveOccurred()) - _, _, err = TryLoadPodDelegates(k8sArgs, netConf, nil) + _, _, _, err = TryLoadPodDelegates(k8sArgs, netConf, nil) Expect(err).To(HaveOccurred()) }) @@ -760,12 +761,12 @@ var _ = Describe("k8sclient operations", func() { k8sArgs, err := GetK8sArgs(args) Expect(err).NotTo(HaveOccurred()) - _, _, err = TryLoadPodDelegates(k8sArgs, netConf, nil) + _, _, _, err = TryLoadPodDelegates(k8sArgs, netConf, nil) Expect(err).NotTo(HaveOccurred()) // additionally, we expect the test to fail with no delegates, as at least one is always required. netConf.Delegates = nil - _, _, err = TryLoadPodDelegates(k8sArgs, netConf, nil) + _, _, _, err = TryLoadPodDelegates(k8sArgs, netConf, nil) Expect(err).To(HaveOccurred()) }) @@ -824,7 +825,7 @@ users: k8sArgs, err := GetK8sArgs(args) Expect(err).NotTo(HaveOccurred()) - _, _, err = TryLoadPodDelegates(k8sArgs, netConf, nil) + _, _, _, err = TryLoadPodDelegates(k8sArgs, netConf, nil) Expect(err).NotTo(HaveOccurred()) }) diff --git a/multus/multus.go b/multus/multus.go index fc27f9d27..48225e57a 100644 --- a/multus/multus.go +++ b/multus/multus.go @@ -34,6 +34,7 @@ import ( "github.com/containernetworking/cni/pkg/invoke" "github.com/containernetworking/cni/pkg/skel" cnitypes "github.com/containernetworking/cni/pkg/types" + cnicurrent "github.com/containernetworking/cni/pkg/types/current" cniversion "github.com/containernetworking/cni/pkg/version" "github.com/containernetworking/plugins/pkg/ns" k8s "github.com/intel/multus-cni/k8sclient" @@ -43,6 +44,7 @@ import ( nettypes "github.com/k8snetworkplumbingwg/network-attachment-definition-client/pkg/apis/k8s.cni.cncf.io/v1" nadutils "github.com/k8snetworkplumbingwg/network-attachment-definition-client/pkg/utils" "github.com/vishvananda/netlink" + v1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/util/wait" ) @@ -263,7 +265,7 @@ func conflistDel(rt *libcni.RuntimeConf, rawnetconflist []byte, binDir string, e return err } -func delegateAdd(exec invoke.Exec, ifName string, delegate *types.DelegateNetConf, rt *libcni.RuntimeConf, binDir string, cniArgs string) (cnitypes.Result, error) { +func delegateAdd(exec invoke.Exec, kubeClient *k8s.ClientInfo, pod *v1.Pod, ifName string, delegate *types.DelegateNetConf, rt *libcni.RuntimeConf, binDir string, cniArgs string) (cnitypes.Result, error) { logging.Debugf("delegateAdd: %v, %s, %v, %v, %s", exec, ifName, delegate, rt, binDir) if os.Setenv("CNI_IFNAME", ifName) != nil { return nil, logging.Errorf("delegateAdd: error setting envionment variable CNI_IFNAME") @@ -322,7 +324,7 @@ func delegateAdd(exec invoke.Exec, ifName string, delegate *types.DelegateNetCon } else { result, err = confAdd(rt, delegate.Bytes, binDir, exec) if err != nil { - return nil, logging.Errorf("delegateAdd: error invoking DelegateAdd - %q: %v", delegate.Conf.Type, err) + return nil, logging.Errorf("delegateAdd: error invoking confAdd - %q: %v", delegate.Conf.Type, err) } } @@ -338,6 +340,24 @@ func delegateAdd(exec invoke.Exec, ifName string, delegate *types.DelegateNetCon logging.Verbosef("Add: %s:%s:%s:%s %s", rt.Args[1][1], rt.Args[2][1], confName, rt.IfName, string(data)) } + // get IP addresses from result + ips := []string{} + res, err := cnicurrent.NewResultFromResult(result) + if err != nil { + logging.Errorf("delegateAdd: error converting result: %v", err) + return result, nil + } + for _, ip := range res.IPs { + ips = append(ips, ip.Address.String()) + } + + // send kubernetes events + if delegate.Name != "" { + kubeClient.Eventf(pod, v1.EventTypeNormal, "AddedInterface", "Add %s %v from %s", rt.IfName, ips, delegate.Name) + } else { + kubeClient.Eventf(pod, v1.EventTypeNormal, "AddedInterface", "Add %s %v", rt.IfName, ips) + } + return result, nil } @@ -444,6 +464,11 @@ func cmdAdd(args *skel.CmdArgs, exec invoke.Exec, kubeClient *k8s.ClientInfo) (c return nil, cmdErr(nil, "error loading netconf: %v", err) } + kubeClient, err = k8s.GetK8sClient(n.Kubeconfig, kubeClient) + if err != nil { + return nil, cmdErr(nil, "error getting k8s client: %v", err) + } + k8sArgs, err := k8s.GetK8sArgs(args) if err != nil { return nil, cmdErr(nil, "error getting k8s args: %v", err) @@ -468,7 +493,7 @@ func cmdAdd(args *skel.CmdArgs, exec invoke.Exec, kubeClient *k8s.ClientInfo) (c n.Delegates[0].MasterPlugin = true } - _, kc, err := k8s.TryLoadPodDelegates(k8sArgs, n, kubeClient) + _, pod, kc, err := k8s.TryLoadPodDelegates(k8sArgs, n, kubeClient) if err != nil { return nil, cmdErr(k8sArgs, "error loading k8s delegates k8s args: %v", err) } @@ -486,7 +511,7 @@ func cmdAdd(args *skel.CmdArgs, exec invoke.Exec, kubeClient *k8s.ClientInfo) (c runtimeConfig := types.MergeCNIRuntimeConfig(n.RuntimeConfig, delegate) rt := types.CreateCNIRuntimeConf(args, k8sArgs, ifName, runtimeConfig) - tmpResult, err = delegateAdd(exec, ifName, delegate, rt, n.BinDir, cniArgs) + tmpResult, err = delegateAdd(exec, kubeClient, pod, ifName, delegate, rt, n.BinDir, cniArgs) if err != nil { // If the add failed, tear down all networks we already added netName := delegate.Conf.Name @@ -631,7 +656,7 @@ func cmdDel(args *skel.CmdArgs, exec invoke.Exec, kubeClient *k8s.ClientInfo) er } // Get pod annotation and so on - _, _, err := k8s.TryLoadPodDelegates(k8sArgs, in, kubeClient) + _, _, _, err := k8s.TryLoadPodDelegates(k8sArgs, in, kubeClient) if err != nil { if len(in.Delegates) == 0 { // No delegate available so send error diff --git a/multus/multus_test.go b/multus/multus_test.go index a1969efbf..e6257e289 100644 --- a/multus/multus_test.go +++ b/multus/multus_test.go @@ -16,6 +16,7 @@ package main import ( + "bytes" "context" "encoding/json" "fmt" @@ -25,7 +26,6 @@ import ( "reflect" "strings" "testing" - "bytes" "github.com/containernetworking/cni/pkg/skel" cnitypes "github.com/containernetworking/cni/pkg/types" @@ -40,6 +40,7 @@ import ( "github.com/intel/multus-cni/types" netfake "github.com/k8snetworkplumbingwg/network-attachment-definition-client/pkg/client/clientset/versioned/fake" "k8s.io/client-go/kubernetes/fake" + "k8s.io/client-go/tools/record" . "github.com/onsi/ginkgo" . "github.com/onsi/gomega" @@ -149,12 +150,12 @@ func (f *fakeExec) ExecPlugin(ctx context.Context, pluginPath string, stdinData dec := json.NewDecoder(reader) enc := json.NewEncoder(writer) - err = dec.Decode(&m) + err = dec.Decode(&m) Expect(err).NotTo(HaveOccurred()) for k := range m { - if k == "prevResult" { - delete(m, k) - } + if k == "prevResult" { + delete(m, k) + } } err = enc.Encode(&m) Expect(err).NotTo(HaveOccurred()) @@ -190,11 +191,26 @@ func (f *fakeExec) FindInPath(plugin string, paths []string) (string, error) { // NewFakeClientInfo returns fake client (just for testing) func NewFakeClientInfo() *k8sclient.ClientInfo { return &k8sclient.ClientInfo{ - Client: fake.NewSimpleClientset(), - NetClient: netfake.NewSimpleClientset().K8sCniCncfIoV1(), + Client: fake.NewSimpleClientset(), + NetClient: netfake.NewSimpleClientset().K8sCniCncfIoV1(), + EventRecorder: record.NewFakeRecorder(10), } } +func collectEvents(source <-chan string) []string { + done := false + events := make([]string, 0) + for !done { + select { + case ev := <-source: + events = append(events, ev) + default: + done = true + } + } + return events +} + var _ = Describe("multus operations cniVersion 0.2.0 config", func() { var testNS ns.NetNS var tmpDir string @@ -1210,7 +1226,7 @@ var _ = Describe("multus operations cniVersion 0.2.0 config", func() { _, err = cmdAdd(args, fExec, nil) Expect(fExec.addIndex).To(Equal(2)) Expect(fExec.delIndex).To(Equal(2)) - Expect(err).To(MatchError("Multus: [/]: error adding container to network \"other1\": delegateAdd: error invoking DelegateAdd - \"other-plugin\": error in getting result from AddNetwork: expected plugin failure")) + Expect(err).To(MatchError("Multus: [/]: error adding container to network \"other1\": delegateAdd: error invoking confAdd - \"other-plugin\": error in getting result from AddNetwork: expected plugin failure")) // Cleanup default network file. if _, errStat := os.Stat(configPath); errStat == nil { @@ -1391,10 +1407,9 @@ var _ = Describe("multus operations cniVersion 0.2.0 config", func() { r := result.(*current.Result) // plugin 1 is the masterplugin Expect(reflect.DeepEqual(r, expectedResult1)).To(BeTrue()) - }) - It("executes delegates and kubernetes networks", func() { + It("executes delegates and kubernetes networks with events check", func() { fakePod := testhelpers.NewFakePod("testpod", "net1,net2", "") net1 := `{ "name": "net1", @@ -1477,6 +1492,13 @@ var _ = Describe("multus operations cniVersion 0.2.0 config", func() { r := result.(*types020.Result) // plugin 1 is the masterplugin Expect(reflect.DeepEqual(r, expectedResult1)).To(BeTrue()) + + recorder := clientInfo.EventRecorder.(*record.FakeRecorder) + events := collectEvents(recorder.Events) + Expect(len(events)).To(Equal(3)) + Expect(events[0]).To(Equal("Normal AddedInterface Add eth0 [1.1.1.2/24]")) + Expect(events[1]).To(Equal("Normal AddedInterface Add net1 [1.1.1.3/24] from net1")) + Expect(events[2]).To(Equal("Normal AddedInterface Add net2 [1.1.1.4/24] from net2")) }) It("executes kubernetes networks and delete it after pod removal", func() { @@ -2442,7 +2464,7 @@ var _ = Describe("multus operations cniVersion 0.4.0 config", func() { _, err = cmdAdd(args, fExec, nil) Expect(fExec.addIndex).To(Equal(2)) Expect(fExec.delIndex).To(Equal(2)) - Expect(err).To(MatchError("Multus: [/]: error adding container to network \"other1\": delegateAdd: error invoking DelegateAdd - \"other-plugin\": error in getting result from AddNetwork: expected plugin failure")) + Expect(err).To(MatchError("Multus: [/]: error adding container to network \"other1\": delegateAdd: error invoking confAdd - \"other-plugin\": error in getting result from AddNetwork: expected plugin failure")) // Cleanup default network file. if _, errStat := os.Stat(configPath); errStat == nil { diff --git a/types/conf.go b/types/conf.go index 6c78dba30..080e9ea5b 100644 --- a/types/conf.go +++ b/types/conf.go @@ -96,6 +96,9 @@ func LoadDelegateNetConf(bytes []byte, net *NetworkSelectionElement, deviceID st } if net != nil { + if net.Name != "" { + delegateConf.Name = net.Name + } if net.InterfaceRequest != "" { delegateConf.IfnameRequest = net.InterfaceRequest } diff --git a/types/types.go b/types/types.go index e00003f24..2c569e580 100644 --- a/types/types.go +++ b/types/types.go @@ -94,6 +94,7 @@ type NetworkStatus struct { type DelegateNetConf struct { Conf types.NetConf ConfList types.NetConfList + Name string IfnameRequest string `json:"ifnameRequest,omitempty"` MacRequest string `json:"macRequest,omitempty"` IPRequest []string `json:"ipRequest,omitempty"` diff --git a/vendor/github.com/golang/groupcache/LICENSE b/vendor/github.com/golang/groupcache/LICENSE new file mode 100644 index 000000000..37ec93a14 --- /dev/null +++ b/vendor/github.com/golang/groupcache/LICENSE @@ -0,0 +1,191 @@ +Apache License +Version 2.0, January 2004 +http://www.apache.org/licenses/ + +TERMS AND CONDITIONS FOR USE, REPRODUCTION, AND DISTRIBUTION + +1. Definitions. + +"License" shall mean the terms and conditions for use, reproduction, and +distribution as defined by Sections 1 through 9 of this document. + +"Licensor" shall mean the copyright owner or entity authorized by the copyright +owner that is granting the License. + +"Legal Entity" shall mean the union of the acting entity and all other entities +that control, are controlled by, or are under common control with that entity. +For the purposes of this definition, "control" means (i) the power, direct or +indirect, to cause the direction or management of such entity, whether by +contract or otherwise, or (ii) ownership of fifty percent (50%) or more of the +outstanding shares, or (iii) beneficial ownership of such entity. + +"You" (or "Your") shall mean an individual or Legal Entity exercising +permissions granted by this License. + +"Source" form shall mean the preferred form for making modifications, including +but not limited to software source code, documentation source, and configuration +files. + +"Object" form shall mean any form resulting from mechanical transformation or +translation of a Source form, including but not limited to compiled object code, +generated documentation, and conversions to other media types. + +"Work" shall mean the work of authorship, whether in Source or Object form, made +available under the License, as indicated by a copyright notice that is included +in or attached to the work (an example is provided in the Appendix below). + +"Derivative Works" shall mean any work, whether in Source or Object form, that +is based on (or derived from) the Work and for which the editorial revisions, +annotations, elaborations, or other modifications represent, as a whole, an +original work of authorship. For the purposes of this License, Derivative Works +shall not include works that remain separable from, or merely link (or bind by +name) to the interfaces of, the Work and Derivative Works thereof. + +"Contribution" shall mean any work of authorship, including the original version +of the Work and any modifications or additions to that Work or Derivative Works +thereof, that is intentionally submitted to Licensor for inclusion in the Work +by the copyright owner or by an individual or Legal Entity authorized to submit +on behalf of the copyright owner. For the purposes of this definition, +"submitted" means any form of electronic, verbal, or written communication sent +to the Licensor or its representatives, including but not limited to +communication on electronic mailing lists, source code control systems, and +issue tracking systems that are managed by, or on behalf of, the Licensor for +the purpose of discussing and improving the Work, but excluding communication +that is conspicuously marked or otherwise designated in writing by the copyright +owner as "Not a Contribution." + +"Contributor" shall mean Licensor and any individual or Legal Entity on behalf +of whom a Contribution has been received by Licensor and subsequently +incorporated within the Work. + +2. Grant of Copyright License. + +Subject to the terms and conditions of this License, each Contributor hereby +grants to You a perpetual, worldwide, non-exclusive, no-charge, royalty-free, +irrevocable copyright license to reproduce, prepare Derivative Works of, +publicly display, publicly perform, sublicense, and distribute the Work and such +Derivative Works in Source or Object form. + +3. Grant of Patent License. + +Subject to the terms and conditions of this License, each Contributor hereby +grants to You a perpetual, worldwide, non-exclusive, no-charge, royalty-free, +irrevocable (except as stated in this section) patent license to make, have +made, use, offer to sell, sell, import, and otherwise transfer the Work, where +such license applies only to those patent claims licensable by such Contributor +that are necessarily infringed by their Contribution(s) alone or by combination +of their Contribution(s) with the Work to which such Contribution(s) was +submitted. If You institute patent litigation against any entity (including a +cross-claim or counterclaim in a lawsuit) alleging that the Work or a +Contribution incorporated within the Work constitutes direct or contributory +patent infringement, then any patent licenses granted to You under this License +for that Work shall terminate as of the date such litigation is filed. + +4. Redistribution. + +You may reproduce and distribute copies of the Work or Derivative Works thereof +in any medium, with or without modifications, and in Source or Object form, +provided that You meet the following conditions: + +You must give any other recipients of the Work or Derivative Works a copy of +this License; and +You must cause any modified files to carry prominent notices stating that You +changed the files; and +You must retain, in the Source form of any Derivative Works that You distribute, +all copyright, patent, trademark, and attribution notices from the Source form +of the Work, excluding those notices that do not pertain to any part of the +Derivative Works; and +If the Work includes a "NOTICE" text file as part of its distribution, then any +Derivative Works that You distribute must include a readable copy of the +attribution notices contained within such NOTICE file, excluding those notices +that do not pertain to any part of the Derivative Works, in at least one of the +following places: within a NOTICE text file distributed as part of the +Derivative Works; within the Source form or documentation, if provided along +with the Derivative Works; or, within a display generated by the Derivative +Works, if and wherever such third-party notices normally appear. The contents of +the NOTICE file are for informational purposes only and do not modify the +License. You may add Your own attribution notices within Derivative Works that +You distribute, alongside or as an addendum to the NOTICE text from the Work, +provided that such additional attribution notices cannot be construed as +modifying the License. +You may add Your own copyright statement to Your modifications and may provide +additional or different license terms and conditions for use, reproduction, or +distribution of Your modifications, or for any such Derivative Works as a whole, +provided Your use, reproduction, and distribution of the Work otherwise complies +with the conditions stated in this License. + +5. Submission of Contributions. + +Unless You explicitly state otherwise, any Contribution intentionally submitted +for inclusion in the Work by You to the Licensor shall be under the terms and +conditions of this License, without any additional terms or conditions. +Notwithstanding the above, nothing herein shall supersede or modify the terms of +any separate license agreement you may have executed with Licensor regarding +such Contributions. + +6. Trademarks. + +This License does not grant permission to use the trade names, trademarks, +service marks, or product names of the Licensor, except as required for +reasonable and customary use in describing the origin of the Work and +reproducing the content of the NOTICE file. + +7. Disclaimer of Warranty. + +Unless required by applicable law or agreed to in writing, Licensor provides the +Work (and each Contributor provides its Contributions) on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied, +including, without limitation, any warranties or conditions of TITLE, +NON-INFRINGEMENT, MERCHANTABILITY, or FITNESS FOR A PARTICULAR PURPOSE. You are +solely responsible for determining the appropriateness of using or +redistributing the Work and assume any risks associated with Your exercise of +permissions under this License. + +8. Limitation of Liability. + +In no event and under no legal theory, whether in tort (including negligence), +contract, or otherwise, unless required by applicable law (such as deliberate +and grossly negligent acts) or agreed to in writing, shall any Contributor be +liable to You for damages, including any direct, indirect, special, incidental, +or consequential damages of any character arising as a result of this License or +out of the use or inability to use the Work (including but not limited to +damages for loss of goodwill, work stoppage, computer failure or malfunction, or +any and all other commercial damages or losses), even if such Contributor has +been advised of the possibility of such damages. + +9. Accepting Warranty or Additional Liability. + +While redistributing the Work or Derivative Works thereof, You may choose to +offer, and charge a fee for, acceptance of support, warranty, indemnity, or +other liability obligations and/or rights consistent with this License. However, +in accepting such obligations, You may act only on Your own behalf and on Your +sole responsibility, not on behalf of any other Contributor, and only if You +agree to indemnify, defend, and hold each Contributor harmless for any liability +incurred by, or claims asserted against, such Contributor by reason of your +accepting any such warranty or additional liability. + +END OF TERMS AND CONDITIONS + +APPENDIX: How to apply the Apache License to your work + +To apply the Apache License to your work, attach the following boilerplate +notice, with the fields enclosed by brackets "[]" replaced with your own +identifying information. (Don't include the brackets!) The text should be +enclosed in the appropriate comment syntax for the file format. We also +recommend that a file or class name and description of purpose be included on +the same "printed page" as the copyright notice for easier identification within +third-party archives. + + Copyright [yyyy] [name of copyright owner] + + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. diff --git a/vendor/github.com/golang/groupcache/lru/lru.go b/vendor/github.com/golang/groupcache/lru/lru.go new file mode 100644 index 000000000..eac1c7664 --- /dev/null +++ b/vendor/github.com/golang/groupcache/lru/lru.go @@ -0,0 +1,133 @@ +/* +Copyright 2013 Google Inc. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +// Package lru implements an LRU cache. +package lru + +import "container/list" + +// Cache is an LRU cache. It is not safe for concurrent access. +type Cache struct { + // MaxEntries is the maximum number of cache entries before + // an item is evicted. Zero means no limit. + MaxEntries int + + // OnEvicted optionally specifies a callback function to be + // executed when an entry is purged from the cache. + OnEvicted func(key Key, value interface{}) + + ll *list.List + cache map[interface{}]*list.Element +} + +// A Key may be any value that is comparable. See http://golang.org/ref/spec#Comparison_operators +type Key interface{} + +type entry struct { + key Key + value interface{} +} + +// New creates a new Cache. +// If maxEntries is zero, the cache has no limit and it's assumed +// that eviction is done by the caller. +func New(maxEntries int) *Cache { + return &Cache{ + MaxEntries: maxEntries, + ll: list.New(), + cache: make(map[interface{}]*list.Element), + } +} + +// Add adds a value to the cache. +func (c *Cache) Add(key Key, value interface{}) { + if c.cache == nil { + c.cache = make(map[interface{}]*list.Element) + c.ll = list.New() + } + if ee, ok := c.cache[key]; ok { + c.ll.MoveToFront(ee) + ee.Value.(*entry).value = value + return + } + ele := c.ll.PushFront(&entry{key, value}) + c.cache[key] = ele + if c.MaxEntries != 0 && c.ll.Len() > c.MaxEntries { + c.RemoveOldest() + } +} + +// Get looks up a key's value from the cache. +func (c *Cache) Get(key Key) (value interface{}, ok bool) { + if c.cache == nil { + return + } + if ele, hit := c.cache[key]; hit { + c.ll.MoveToFront(ele) + return ele.Value.(*entry).value, true + } + return +} + +// Remove removes the provided key from the cache. +func (c *Cache) Remove(key Key) { + if c.cache == nil { + return + } + if ele, hit := c.cache[key]; hit { + c.removeElement(ele) + } +} + +// RemoveOldest removes the oldest item from the cache. +func (c *Cache) RemoveOldest() { + if c.cache == nil { + return + } + ele := c.ll.Back() + if ele != nil { + c.removeElement(ele) + } +} + +func (c *Cache) removeElement(e *list.Element) { + c.ll.Remove(e) + kv := e.Value.(*entry) + delete(c.cache, kv.key) + if c.OnEvicted != nil { + c.OnEvicted(kv.key, kv.value) + } +} + +// Len returns the number of items in the cache. +func (c *Cache) Len() int { + if c.cache == nil { + return 0 + } + return c.ll.Len() +} + +// Clear purges all stored items from the cache. +func (c *Cache) Clear() { + if c.OnEvicted != nil { + for _, e := range c.cache { + kv := e.Value.(*entry) + c.OnEvicted(kv.key, kv.value) + } + } + c.ll = nil + c.cache = nil +} diff --git a/vendor/k8s.io/client-go/tools/record/OWNERS b/vendor/k8s.io/client-go/tools/record/OWNERS new file mode 100644 index 000000000..4dd54bbce --- /dev/null +++ b/vendor/k8s.io/client-go/tools/record/OWNERS @@ -0,0 +1,27 @@ +reviewers: +- lavalamp +- smarterclayton +- wojtek-t +- deads2k +- derekwaynecarr +- caesarxuchao +- vishh +- mikedanese +- liggitt +- nikhiljindal +- erictune +- pmorie +- dchen1107 +- saad-ali +- luxas +- yifan-gu +- eparis +- mwielgus +- timothysc +- jsafrane +- dims +- krousey +- a-robinson +- aveshagarwal +- resouer +- cjcullen diff --git a/vendor/k8s.io/client-go/tools/record/doc.go b/vendor/k8s.io/client-go/tools/record/doc.go new file mode 100644 index 000000000..657ddecbc --- /dev/null +++ b/vendor/k8s.io/client-go/tools/record/doc.go @@ -0,0 +1,18 @@ +/* +Copyright 2014 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +// Package record has all client logic for recording and reporting events. +package record // import "k8s.io/client-go/tools/record" diff --git a/vendor/k8s.io/client-go/tools/record/event.go b/vendor/k8s.io/client-go/tools/record/event.go new file mode 100644 index 000000000..2ee69589c --- /dev/null +++ b/vendor/k8s.io/client-go/tools/record/event.go @@ -0,0 +1,322 @@ +/* +Copyright 2014 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package record + +import ( + "fmt" + "math/rand" + "time" + + "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/api/errors" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/util/clock" + utilruntime "k8s.io/apimachinery/pkg/util/runtime" + "k8s.io/apimachinery/pkg/watch" + restclient "k8s.io/client-go/rest" + ref "k8s.io/client-go/tools/reference" + + "net/http" + + "k8s.io/klog" +) + +const maxTriesPerEvent = 12 + +var defaultSleepDuration = 10 * time.Second + +const maxQueuedEvents = 1000 + +// EventSink knows how to store events (client.Client implements it.) +// EventSink must respect the namespace that will be embedded in 'event'. +// It is assumed that EventSink will return the same sorts of errors as +// pkg/client's REST client. +type EventSink interface { + Create(event *v1.Event) (*v1.Event, error) + Update(event *v1.Event) (*v1.Event, error) + Patch(oldEvent *v1.Event, data []byte) (*v1.Event, error) +} + +// EventRecorder knows how to record events on behalf of an EventSource. +type EventRecorder interface { + // Event constructs an event from the given information and puts it in the queue for sending. + // 'object' is the object this event is about. Event will make a reference-- or you may also + // pass a reference to the object directly. + // 'type' of this event, and can be one of Normal, Warning. New types could be added in future + // 'reason' is the reason this event is generated. 'reason' should be short and unique; it + // should be in UpperCamelCase format (starting with a capital letter). "reason" will be used + // to automate handling of events, so imagine people writing switch statements to handle them. + // You want to make that easy. + // 'message' is intended to be human readable. + // + // The resulting event will be created in the same namespace as the reference object. + Event(object runtime.Object, eventtype, reason, message string) + + // Eventf is just like Event, but with Sprintf for the message field. + Eventf(object runtime.Object, eventtype, reason, messageFmt string, args ...interface{}) + + // PastEventf is just like Eventf, but with an option to specify the event's 'timestamp' field. + PastEventf(object runtime.Object, timestamp metav1.Time, eventtype, reason, messageFmt string, args ...interface{}) + + // AnnotatedEventf is just like eventf, but with annotations attached + AnnotatedEventf(object runtime.Object, annotations map[string]string, eventtype, reason, messageFmt string, args ...interface{}) +} + +// EventBroadcaster knows how to receive events and send them to any EventSink, watcher, or log. +type EventBroadcaster interface { + // StartEventWatcher starts sending events received from this EventBroadcaster to the given + // event handler function. The return value can be ignored or used to stop recording, if + // desired. + StartEventWatcher(eventHandler func(*v1.Event)) watch.Interface + + // StartRecordingToSink starts sending events received from this EventBroadcaster to the given + // sink. The return value can be ignored or used to stop recording, if desired. + StartRecordingToSink(sink EventSink) watch.Interface + + // StartLogging starts sending events received from this EventBroadcaster to the given logging + // function. The return value can be ignored or used to stop recording, if desired. + StartLogging(logf func(format string, args ...interface{})) watch.Interface + + // NewRecorder returns an EventRecorder that can be used to send events to this EventBroadcaster + // with the event source set to the given event source. + NewRecorder(scheme *runtime.Scheme, source v1.EventSource) EventRecorder +} + +// Creates a new event broadcaster. +func NewBroadcaster() EventBroadcaster { + return &eventBroadcasterImpl{watch.NewBroadcaster(maxQueuedEvents, watch.DropIfChannelFull), defaultSleepDuration} +} + +func NewBroadcasterForTests(sleepDuration time.Duration) EventBroadcaster { + return &eventBroadcasterImpl{watch.NewBroadcaster(maxQueuedEvents, watch.DropIfChannelFull), sleepDuration} +} + +type eventBroadcasterImpl struct { + *watch.Broadcaster + sleepDuration time.Duration +} + +// StartRecordingToSink starts sending events received from the specified eventBroadcaster to the given sink. +// The return value can be ignored or used to stop recording, if desired. +// TODO: make me an object with parameterizable queue length and retry interval +func (eventBroadcaster *eventBroadcasterImpl) StartRecordingToSink(sink EventSink) watch.Interface { + // The default math/rand package functions aren't thread safe, so create a + // new Rand object for each StartRecording call. + randGen := rand.New(rand.NewSource(time.Now().UnixNano())) + eventCorrelator := NewEventCorrelator(clock.RealClock{}) + return eventBroadcaster.StartEventWatcher( + func(event *v1.Event) { + recordToSink(sink, event, eventCorrelator, randGen, eventBroadcaster.sleepDuration) + }) +} + +func recordToSink(sink EventSink, event *v1.Event, eventCorrelator *EventCorrelator, randGen *rand.Rand, sleepDuration time.Duration) { + // Make a copy before modification, because there could be multiple listeners. + // Events are safe to copy like this. + eventCopy := *event + event = &eventCopy + result, err := eventCorrelator.EventCorrelate(event) + if err != nil { + utilruntime.HandleError(err) + } + if result.Skip { + return + } + tries := 0 + for { + if recordEvent(sink, result.Event, result.Patch, result.Event.Count > 1, eventCorrelator) { + break + } + tries++ + if tries >= maxTriesPerEvent { + klog.Errorf("Unable to write event '%#v' (retry limit exceeded!)", event) + break + } + // Randomize the first sleep so that various clients won't all be + // synced up if the master goes down. + if tries == 1 { + time.Sleep(time.Duration(float64(sleepDuration) * randGen.Float64())) + } else { + time.Sleep(sleepDuration) + } + } +} + +func isKeyNotFoundError(err error) bool { + statusErr, _ := err.(*errors.StatusError) + + if statusErr != nil && statusErr.Status().Code == http.StatusNotFound { + return true + } + + return false +} + +// recordEvent attempts to write event to a sink. It returns true if the event +// was successfully recorded or discarded, false if it should be retried. +// If updateExistingEvent is false, it creates a new event, otherwise it updates +// existing event. +func recordEvent(sink EventSink, event *v1.Event, patch []byte, updateExistingEvent bool, eventCorrelator *EventCorrelator) bool { + var newEvent *v1.Event + var err error + if updateExistingEvent { + newEvent, err = sink.Patch(event, patch) + } + // Update can fail because the event may have been removed and it no longer exists. + if !updateExistingEvent || (updateExistingEvent && isKeyNotFoundError(err)) { + // Making sure that ResourceVersion is empty on creation + event.ResourceVersion = "" + newEvent, err = sink.Create(event) + } + if err == nil { + // we need to update our event correlator with the server returned state to handle name/resourceversion + eventCorrelator.UpdateState(newEvent) + return true + } + + // If we can't contact the server, then hold everything while we keep trying. + // Otherwise, something about the event is malformed and we should abandon it. + switch err.(type) { + case *restclient.RequestConstructionError: + // We will construct the request the same next time, so don't keep trying. + klog.Errorf("Unable to construct event '%#v': '%v' (will not retry!)", event, err) + return true + case *errors.StatusError: + if errors.IsAlreadyExists(err) { + klog.V(5).Infof("Server rejected event '%#v': '%v' (will not retry!)", event, err) + } else { + klog.Errorf("Server rejected event '%#v': '%v' (will not retry!)", event, err) + } + return true + case *errors.UnexpectedObjectError: + // We don't expect this; it implies the server's response didn't match a + // known pattern. Go ahead and retry. + default: + // This case includes actual http transport errors. Go ahead and retry. + } + klog.Errorf("Unable to write event: '%v' (may retry after sleeping)", err) + return false +} + +// StartLogging starts sending events received from this EventBroadcaster to the given logging function. +// The return value can be ignored or used to stop recording, if desired. +func (eventBroadcaster *eventBroadcasterImpl) StartLogging(logf func(format string, args ...interface{})) watch.Interface { + return eventBroadcaster.StartEventWatcher( + func(e *v1.Event) { + logf("Event(%#v): type: '%v' reason: '%v' %v", e.InvolvedObject, e.Type, e.Reason, e.Message) + }) +} + +// StartEventWatcher starts sending events received from this EventBroadcaster to the given event handler function. +// The return value can be ignored or used to stop recording, if desired. +func (eventBroadcaster *eventBroadcasterImpl) StartEventWatcher(eventHandler func(*v1.Event)) watch.Interface { + watcher := eventBroadcaster.Watch() + go func() { + defer utilruntime.HandleCrash() + for watchEvent := range watcher.ResultChan() { + event, ok := watchEvent.Object.(*v1.Event) + if !ok { + // This is all local, so there's no reason this should + // ever happen. + continue + } + eventHandler(event) + } + }() + return watcher +} + +// NewRecorder returns an EventRecorder that records events with the given event source. +func (eventBroadcaster *eventBroadcasterImpl) NewRecorder(scheme *runtime.Scheme, source v1.EventSource) EventRecorder { + return &recorderImpl{scheme, source, eventBroadcaster.Broadcaster, clock.RealClock{}} +} + +type recorderImpl struct { + scheme *runtime.Scheme + source v1.EventSource + *watch.Broadcaster + clock clock.Clock +} + +func (recorder *recorderImpl) generateEvent(object runtime.Object, annotations map[string]string, timestamp metav1.Time, eventtype, reason, message string) { + ref, err := ref.GetReference(recorder.scheme, object) + if err != nil { + klog.Errorf("Could not construct reference to: '%#v' due to: '%v'. Will not report event: '%v' '%v' '%v'", object, err, eventtype, reason, message) + return + } + + if !validateEventType(eventtype) { + klog.Errorf("Unsupported event type: '%v'", eventtype) + return + } + + event := recorder.makeEvent(ref, annotations, eventtype, reason, message) + event.Source = recorder.source + + go func() { + // NOTE: events should be a non-blocking operation + defer utilruntime.HandleCrash() + recorder.Action(watch.Added, event) + }() +} + +func validateEventType(eventtype string) bool { + switch eventtype { + case v1.EventTypeNormal, v1.EventTypeWarning: + return true + } + return false +} + +func (recorder *recorderImpl) Event(object runtime.Object, eventtype, reason, message string) { + recorder.generateEvent(object, nil, metav1.Now(), eventtype, reason, message) +} + +func (recorder *recorderImpl) Eventf(object runtime.Object, eventtype, reason, messageFmt string, args ...interface{}) { + recorder.Event(object, eventtype, reason, fmt.Sprintf(messageFmt, args...)) +} + +func (recorder *recorderImpl) PastEventf(object runtime.Object, timestamp metav1.Time, eventtype, reason, messageFmt string, args ...interface{}) { + recorder.generateEvent(object, nil, timestamp, eventtype, reason, fmt.Sprintf(messageFmt, args...)) +} + +func (recorder *recorderImpl) AnnotatedEventf(object runtime.Object, annotations map[string]string, eventtype, reason, messageFmt string, args ...interface{}) { + recorder.generateEvent(object, annotations, metav1.Now(), eventtype, reason, fmt.Sprintf(messageFmt, args...)) +} + +func (recorder *recorderImpl) makeEvent(ref *v1.ObjectReference, annotations map[string]string, eventtype, reason, message string) *v1.Event { + t := metav1.Time{Time: recorder.clock.Now()} + namespace := ref.Namespace + if namespace == "" { + namespace = metav1.NamespaceDefault + } + return &v1.Event{ + ObjectMeta: metav1.ObjectMeta{ + Name: fmt.Sprintf("%v.%x", ref.Name, t.UnixNano()), + Namespace: namespace, + Annotations: annotations, + }, + InvolvedObject: *ref, + Reason: reason, + Message: message, + FirstTimestamp: t, + LastTimestamp: t, + Count: 1, + Type: eventtype, + } +} diff --git a/vendor/k8s.io/client-go/tools/record/events_cache.go b/vendor/k8s.io/client-go/tools/record/events_cache.go new file mode 100644 index 000000000..a42084f3a --- /dev/null +++ b/vendor/k8s.io/client-go/tools/record/events_cache.go @@ -0,0 +1,462 @@ +/* +Copyright 2015 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package record + +import ( + "encoding/json" + "fmt" + "strings" + "sync" + "time" + + "github.com/golang/groupcache/lru" + + "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/util/clock" + "k8s.io/apimachinery/pkg/util/sets" + "k8s.io/apimachinery/pkg/util/strategicpatch" + "k8s.io/client-go/util/flowcontrol" +) + +const ( + maxLruCacheEntries = 4096 + + // if we see the same event that varies only by message + // more than 10 times in a 10 minute period, aggregate the event + defaultAggregateMaxEvents = 10 + defaultAggregateIntervalInSeconds = 600 + + // by default, allow a source to send 25 events about an object + // but control the refill rate to 1 new event every 5 minutes + // this helps control the long-tail of events for things that are always + // unhealthy + defaultSpamBurst = 25 + defaultSpamQPS = 1. / 300. +) + +// getEventKey builds unique event key based on source, involvedObject, reason, message +func getEventKey(event *v1.Event) string { + return strings.Join([]string{ + event.Source.Component, + event.Source.Host, + event.InvolvedObject.Kind, + event.InvolvedObject.Namespace, + event.InvolvedObject.Name, + event.InvolvedObject.FieldPath, + string(event.InvolvedObject.UID), + event.InvolvedObject.APIVersion, + event.Type, + event.Reason, + event.Message, + }, + "") +} + +// getSpamKey builds unique event key based on source, involvedObject +func getSpamKey(event *v1.Event) string { + return strings.Join([]string{ + event.Source.Component, + event.Source.Host, + event.InvolvedObject.Kind, + event.InvolvedObject.Namespace, + event.InvolvedObject.Name, + string(event.InvolvedObject.UID), + event.InvolvedObject.APIVersion, + }, + "") +} + +// EventFilterFunc is a function that returns true if the event should be skipped +type EventFilterFunc func(event *v1.Event) bool + +// EventSourceObjectSpamFilter is responsible for throttling +// the amount of events a source and object can produce. +type EventSourceObjectSpamFilter struct { + sync.RWMutex + + // the cache that manages last synced state + cache *lru.Cache + + // burst is the amount of events we allow per source + object + burst int + + // qps is the refill rate of the token bucket in queries per second + qps float32 + + // clock is used to allow for testing over a time interval + clock clock.Clock +} + +// NewEventSourceObjectSpamFilter allows burst events from a source about an object with the specified qps refill. +func NewEventSourceObjectSpamFilter(lruCacheSize, burst int, qps float32, clock clock.Clock) *EventSourceObjectSpamFilter { + return &EventSourceObjectSpamFilter{ + cache: lru.New(lruCacheSize), + burst: burst, + qps: qps, + clock: clock, + } +} + +// spamRecord holds data used to perform spam filtering decisions. +type spamRecord struct { + // rateLimiter controls the rate of events about this object + rateLimiter flowcontrol.RateLimiter +} + +// Filter controls that a given source+object are not exceeding the allowed rate. +func (f *EventSourceObjectSpamFilter) Filter(event *v1.Event) bool { + var record spamRecord + + // controls our cached information about this event (source+object) + eventKey := getSpamKey(event) + + // do we have a record of similar events in our cache? + f.Lock() + defer f.Unlock() + value, found := f.cache.Get(eventKey) + if found { + record = value.(spamRecord) + } + + // verify we have a rate limiter for this record + if record.rateLimiter == nil { + record.rateLimiter = flowcontrol.NewTokenBucketRateLimiterWithClock(f.qps, f.burst, f.clock) + } + + // ensure we have available rate + filter := !record.rateLimiter.TryAccept() + + // update the cache + f.cache.Add(eventKey, record) + + return filter +} + +// EventAggregatorKeyFunc is responsible for grouping events for aggregation +// It returns a tuple of the following: +// aggregateKey - key the identifies the aggregate group to bucket this event +// localKey - key that makes this event in the local group +type EventAggregatorKeyFunc func(event *v1.Event) (aggregateKey string, localKey string) + +// EventAggregatorByReasonFunc aggregates events by exact match on event.Source, event.InvolvedObject, event.Type and event.Reason +func EventAggregatorByReasonFunc(event *v1.Event) (string, string) { + return strings.Join([]string{ + event.Source.Component, + event.Source.Host, + event.InvolvedObject.Kind, + event.InvolvedObject.Namespace, + event.InvolvedObject.Name, + string(event.InvolvedObject.UID), + event.InvolvedObject.APIVersion, + event.Type, + event.Reason, + }, + ""), event.Message +} + +// EventAggregatorMessageFunc is responsible for producing an aggregation message +type EventAggregatorMessageFunc func(event *v1.Event) string + +// EventAggregratorByReasonMessageFunc returns an aggregate message by prefixing the incoming message +func EventAggregatorByReasonMessageFunc(event *v1.Event) string { + return "(combined from similar events): " + event.Message +} + +// EventAggregator identifies similar events and aggregates them into a single event +type EventAggregator struct { + sync.RWMutex + + // The cache that manages aggregation state + cache *lru.Cache + + // The function that groups events for aggregation + keyFunc EventAggregatorKeyFunc + + // The function that generates a message for an aggregate event + messageFunc EventAggregatorMessageFunc + + // The maximum number of events in the specified interval before aggregation occurs + maxEvents uint + + // The amount of time in seconds that must transpire since the last occurrence of a similar event before it's considered new + maxIntervalInSeconds uint + + // clock is used to allow for testing over a time interval + clock clock.Clock +} + +// NewEventAggregator returns a new instance of an EventAggregator +func NewEventAggregator(lruCacheSize int, keyFunc EventAggregatorKeyFunc, messageFunc EventAggregatorMessageFunc, + maxEvents int, maxIntervalInSeconds int, clock clock.Clock) *EventAggregator { + return &EventAggregator{ + cache: lru.New(lruCacheSize), + keyFunc: keyFunc, + messageFunc: messageFunc, + maxEvents: uint(maxEvents), + maxIntervalInSeconds: uint(maxIntervalInSeconds), + clock: clock, + } +} + +// aggregateRecord holds data used to perform aggregation decisions +type aggregateRecord struct { + // we track the number of unique local keys we have seen in the aggregate set to know when to actually aggregate + // if the size of this set exceeds the max, we know we need to aggregate + localKeys sets.String + // The last time at which the aggregate was recorded + lastTimestamp metav1.Time +} + +// EventAggregate checks if a similar event has been seen according to the +// aggregation configuration (max events, max interval, etc) and returns: +// +// - The (potentially modified) event that should be created +// - The cache key for the event, for correlation purposes. This will be set to +// the full key for normal events, and to the result of +// EventAggregatorMessageFunc for aggregate events. +func (e *EventAggregator) EventAggregate(newEvent *v1.Event) (*v1.Event, string) { + now := metav1.NewTime(e.clock.Now()) + var record aggregateRecord + // eventKey is the full cache key for this event + eventKey := getEventKey(newEvent) + // aggregateKey is for the aggregate event, if one is needed. + aggregateKey, localKey := e.keyFunc(newEvent) + + // Do we have a record of similar events in our cache? + e.Lock() + defer e.Unlock() + value, found := e.cache.Get(aggregateKey) + if found { + record = value.(aggregateRecord) + } + + // Is the previous record too old? If so, make a fresh one. Note: if we didn't + // find a similar record, its lastTimestamp will be the zero value, so we + // create a new one in that case. + maxInterval := time.Duration(e.maxIntervalInSeconds) * time.Second + interval := now.Time.Sub(record.lastTimestamp.Time) + if interval > maxInterval { + record = aggregateRecord{localKeys: sets.NewString()} + } + + // Write the new event into the aggregation record and put it on the cache + record.localKeys.Insert(localKey) + record.lastTimestamp = now + e.cache.Add(aggregateKey, record) + + // If we are not yet over the threshold for unique events, don't correlate them + if uint(record.localKeys.Len()) < e.maxEvents { + return newEvent, eventKey + } + + // do not grow our local key set any larger than max + record.localKeys.PopAny() + + // create a new aggregate event, and return the aggregateKey as the cache key + // (so that it can be overwritten.) + eventCopy := &v1.Event{ + ObjectMeta: metav1.ObjectMeta{ + Name: fmt.Sprintf("%v.%x", newEvent.InvolvedObject.Name, now.UnixNano()), + Namespace: newEvent.Namespace, + }, + Count: 1, + FirstTimestamp: now, + InvolvedObject: newEvent.InvolvedObject, + LastTimestamp: now, + Message: e.messageFunc(newEvent), + Type: newEvent.Type, + Reason: newEvent.Reason, + Source: newEvent.Source, + } + return eventCopy, aggregateKey +} + +// eventLog records data about when an event was observed +type eventLog struct { + // The number of times the event has occurred since first occurrence. + count uint + + // The time at which the event was first recorded. + firstTimestamp metav1.Time + + // The unique name of the first occurrence of this event + name string + + // Resource version returned from previous interaction with server + resourceVersion string +} + +// eventLogger logs occurrences of an event +type eventLogger struct { + sync.RWMutex + cache *lru.Cache + clock clock.Clock +} + +// newEventLogger observes events and counts their frequencies +func newEventLogger(lruCacheEntries int, clock clock.Clock) *eventLogger { + return &eventLogger{cache: lru.New(lruCacheEntries), clock: clock} +} + +// eventObserve records an event, or updates an existing one if key is a cache hit +func (e *eventLogger) eventObserve(newEvent *v1.Event, key string) (*v1.Event, []byte, error) { + var ( + patch []byte + err error + ) + eventCopy := *newEvent + event := &eventCopy + + e.Lock() + defer e.Unlock() + + // Check if there is an existing event we should update + lastObservation := e.lastEventObservationFromCache(key) + + // If we found a result, prepare a patch + if lastObservation.count > 0 { + // update the event based on the last observation so patch will work as desired + event.Name = lastObservation.name + event.ResourceVersion = lastObservation.resourceVersion + event.FirstTimestamp = lastObservation.firstTimestamp + event.Count = int32(lastObservation.count) + 1 + + eventCopy2 := *event + eventCopy2.Count = 0 + eventCopy2.LastTimestamp = metav1.NewTime(time.Unix(0, 0)) + eventCopy2.Message = "" + + newData, _ := json.Marshal(event) + oldData, _ := json.Marshal(eventCopy2) + patch, err = strategicpatch.CreateTwoWayMergePatch(oldData, newData, event) + } + + // record our new observation + e.cache.Add( + key, + eventLog{ + count: uint(event.Count), + firstTimestamp: event.FirstTimestamp, + name: event.Name, + resourceVersion: event.ResourceVersion, + }, + ) + return event, patch, err +} + +// updateState updates its internal tracking information based on latest server state +func (e *eventLogger) updateState(event *v1.Event) { + key := getEventKey(event) + e.Lock() + defer e.Unlock() + // record our new observation + e.cache.Add( + key, + eventLog{ + count: uint(event.Count), + firstTimestamp: event.FirstTimestamp, + name: event.Name, + resourceVersion: event.ResourceVersion, + }, + ) +} + +// lastEventObservationFromCache returns the event from the cache, reads must be protected via external lock +func (e *eventLogger) lastEventObservationFromCache(key string) eventLog { + value, ok := e.cache.Get(key) + if ok { + observationValue, ok := value.(eventLog) + if ok { + return observationValue + } + } + return eventLog{} +} + +// EventCorrelator processes all incoming events and performs analysis to avoid overwhelming the system. It can filter all +// incoming events to see if the event should be filtered from further processing. It can aggregate similar events that occur +// frequently to protect the system from spamming events that are difficult for users to distinguish. It performs de-duplication +// to ensure events that are observed multiple times are compacted into a single event with increasing counts. +type EventCorrelator struct { + // the function to filter the event + filterFunc EventFilterFunc + // the object that performs event aggregation + aggregator *EventAggregator + // the object that observes events as they come through + logger *eventLogger +} + +// EventCorrelateResult is the result of a Correlate +type EventCorrelateResult struct { + // the event after correlation + Event *v1.Event + // if provided, perform a strategic patch when updating the record on the server + Patch []byte + // if true, do no further processing of the event + Skip bool +} + +// NewEventCorrelator returns an EventCorrelator configured with default values. +// +// The EventCorrelator is responsible for event filtering, aggregating, and counting +// prior to interacting with the API server to record the event. +// +// The default behavior is as follows: +// * Aggregation is performed if a similar event is recorded 10 times in a +// in a 10 minute rolling interval. A similar event is an event that varies only by +// the Event.Message field. Rather than recording the precise event, aggregation +// will create a new event whose message reports that it has combined events with +// the same reason. +// * Events are incrementally counted if the exact same event is encountered multiple +// times. +// * A source may burst 25 events about an object, but has a refill rate budget +// per object of 1 event every 5 minutes to control long-tail of spam. +func NewEventCorrelator(clock clock.Clock) *EventCorrelator { + cacheSize := maxLruCacheEntries + spamFilter := NewEventSourceObjectSpamFilter(cacheSize, defaultSpamBurst, defaultSpamQPS, clock) + return &EventCorrelator{ + filterFunc: spamFilter.Filter, + aggregator: NewEventAggregator( + cacheSize, + EventAggregatorByReasonFunc, + EventAggregatorByReasonMessageFunc, + defaultAggregateMaxEvents, + defaultAggregateIntervalInSeconds, + clock), + + logger: newEventLogger(cacheSize, clock), + } +} + +// EventCorrelate filters, aggregates, counts, and de-duplicates all incoming events +func (c *EventCorrelator) EventCorrelate(newEvent *v1.Event) (*EventCorrelateResult, error) { + if newEvent == nil { + return nil, fmt.Errorf("event is nil") + } + aggregateEvent, ckey := c.aggregator.EventAggregate(newEvent) + observedEvent, patch, err := c.logger.eventObserve(aggregateEvent, ckey) + if c.filterFunc(observedEvent) { + return &EventCorrelateResult{Skip: true}, nil + } + return &EventCorrelateResult{Event: observedEvent, Patch: patch}, err +} + +// UpdateState based on the latest observed state from server +func (c *EventCorrelator) UpdateState(event *v1.Event) { + c.logger.updateState(event) +} diff --git a/vendor/k8s.io/client-go/tools/record/fake.go b/vendor/k8s.io/client-go/tools/record/fake.go new file mode 100644 index 000000000..6e031daaf --- /dev/null +++ b/vendor/k8s.io/client-go/tools/record/fake.go @@ -0,0 +1,58 @@ +/* +Copyright 2015 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package record + +import ( + "fmt" + + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/runtime" +) + +// FakeRecorder is used as a fake during tests. It is thread safe. It is usable +// when created manually and not by NewFakeRecorder, however all events may be +// thrown away in this case. +type FakeRecorder struct { + Events chan string +} + +func (f *FakeRecorder) Event(object runtime.Object, eventtype, reason, message string) { + if f.Events != nil { + f.Events <- fmt.Sprintf("%s %s %s", eventtype, reason, message) + } +} + +func (f *FakeRecorder) Eventf(object runtime.Object, eventtype, reason, messageFmt string, args ...interface{}) { + if f.Events != nil { + f.Events <- fmt.Sprintf(eventtype+" "+reason+" "+messageFmt, args...) + } +} + +func (f *FakeRecorder) PastEventf(object runtime.Object, timestamp metav1.Time, eventtype, reason, messageFmt string, args ...interface{}) { +} + +func (f *FakeRecorder) AnnotatedEventf(object runtime.Object, annotations map[string]string, eventtype, reason, messageFmt string, args ...interface{}) { + f.Eventf(object, eventtype, reason, messageFmt, args) +} + +// NewFakeRecorder creates new fake event recorder with event channel with +// buffer of given size. +func NewFakeRecorder(bufferSize int) *FakeRecorder { + return &FakeRecorder{ + Events: make(chan string, bufferSize), + } +} diff --git a/vendor/modules.txt b/vendor/modules.txt index 1d467e3c3..0eb248cac 100644 --- a/vendor/modules.txt +++ b/vendor/modules.txt @@ -21,6 +21,8 @@ github.com/gogo/protobuf/gogoproto github.com/gogo/protobuf/proto github.com/gogo/protobuf/protoc-gen-gogo/descriptor github.com/gogo/protobuf/sortkeys +# github.com/golang/groupcache v0.0.0-20190702054246-869f871628b6 +github.com/golang/groupcache/lru # github.com/golang/protobuf v1.3.2 github.com/golang/protobuf/proto github.com/golang/protobuf/ptypes @@ -356,6 +358,7 @@ k8s.io/client-go/tools/clientcmd/api k8s.io/client-go/tools/clientcmd/api/latest k8s.io/client-go/tools/clientcmd/api/v1 k8s.io/client-go/tools/metrics +k8s.io/client-go/tools/record k8s.io/client-go/tools/reference k8s.io/client-go/transport k8s.io/client-go/util/cert