Merge pull request #102832 from Yuan-Junliang/migrateProxyEventAPI

Migrate kube-proxy event to use v1 Event API
This commit is contained in:
Kubernetes Prow Robot 2021-07-05 17:44:17 -07:00 committed by GitHub
commit 96dff7d0c7
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
14 changed files with 60 additions and 59 deletions

View File

@ -50,7 +50,7 @@ import (
"k8s.io/client-go/rest"
"k8s.io/client-go/tools/clientcmd"
clientcmdapi "k8s.io/client-go/tools/clientcmd/api"
"k8s.io/client-go/tools/record"
"k8s.io/client-go/tools/events"
cliflag "k8s.io/component-base/cli/flag"
componentbaseconfig "k8s.io/component-base/config"
"k8s.io/component-base/configz"
@ -524,8 +524,8 @@ type ProxyServer struct {
IpsetInterface utilipset.Interface
execer exec.Interface
Proxier proxy.Provider
Broadcaster record.EventBroadcaster
Recorder record.EventRecorder
Broadcaster events.EventBroadcaster
Recorder events.EventRecorder
ConntrackConfiguration kubeproxyconfig.KubeProxyConntrackConfiguration
Conntracker Conntracker // if nil, ignored
ProxyMode string
@ -654,7 +654,8 @@ func (s *ProxyServer) Run() error {
}
if s.Broadcaster != nil && s.EventClient != nil {
s.Broadcaster.StartRecordingToSink(&v1core.EventSinkImpl{Interface: s.EventClient.Events("")})
stopCh := make(chan struct{})
s.Broadcaster.StartRecordingToSink(stopCh)
}
// TODO(thockin): make it possible for healthz and metrics to be on the same port.
@ -692,7 +693,7 @@ func (s *ProxyServer) Run() error {
// TODO(random-liu): Remove this when the docker bug is fixed.
const message = "CRI error: /sys is read-only: " +
"cannot modify conntrack limits, problems may arise later (If running Docker, see docker issue #24000)"
s.Recorder.Eventf(s.NodeRef, api.EventTypeWarning, err.Error(), message)
s.Recorder.Eventf(s.NodeRef, nil, api.EventTypeWarning, err.Error(), "StartKubeProxy", message)
}
}
@ -776,7 +777,7 @@ func (s *ProxyServer) Run() error {
}
func (s *ProxyServer) birthCry() {
s.Recorder.Eventf(s.NodeRef, api.EventTypeNormal, "Starting", "Starting kube-proxy.")
s.Recorder.Eventf(s.NodeRef, nil, api.EventTypeNormal, "Starting", "StartKubeProxy", "")
}
func getConntrackMax(config kubeproxyconfig.KubeProxyConntrackConfiguration) (int, error) {

View File

@ -35,6 +35,7 @@ import (
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/client-go/tools/cache"
"k8s.io/client-go/tools/events"
"k8s.io/apimachinery/pkg/fields"
@ -45,14 +46,13 @@ import (
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
utilfeature "k8s.io/apiserver/pkg/util/feature"
clientset "k8s.io/client-go/kubernetes"
"k8s.io/client-go/tools/record"
toolswatch "k8s.io/client-go/tools/watch"
"k8s.io/component-base/configz"
"k8s.io/component-base/metrics"
"k8s.io/kubernetes/pkg/features"
"k8s.io/kubernetes/pkg/proxy"
proxyconfigapi "k8s.io/kubernetes/pkg/proxy/apis/config"
proxyconfigscheme "k8s.io/kubernetes/pkg/proxy/apis/config/scheme"
"k8s.io/kubernetes/pkg/proxy/apis/config/scheme"
"k8s.io/kubernetes/pkg/proxy/healthcheck"
"k8s.io/kubernetes/pkg/proxy/iptables"
"k8s.io/kubernetes/pkg/proxy/ipvs"
@ -140,8 +140,8 @@ func newProxyServer(
klog.Infof("Detected node IP %s", nodeIP.String())
// Create event recorder
eventBroadcaster := record.NewBroadcaster()
recorder := eventBroadcaster.NewRecorder(proxyconfigscheme.Scheme, v1.EventSource{Component: "kube-proxy", Host: hostname})
eventBroadcaster := events.NewBroadcaster(&events.EventSinkImpl{Interface: client.EventsV1()})
recorder := eventBroadcaster.NewRecorder(scheme.Scheme, "kube-proxy")
nodeRef := &v1.ObjectReference{
Kind: "Node",

View File

@ -33,7 +33,7 @@ import (
"k8s.io/apimachinery/pkg/types"
utilnet "k8s.io/apimachinery/pkg/util/net"
utilfeature "k8s.io/apiserver/pkg/util/feature"
"k8s.io/client-go/tools/record"
"k8s.io/client-go/tools/events"
"k8s.io/component-base/configz"
"k8s.io/component-base/metrics"
"k8s.io/klog/v2"
@ -85,8 +85,8 @@ func newProxyServer(config *proxyconfigapi.KubeProxyConfiguration, cleanupAndExi
if err != nil {
return nil, err
}
eventBroadcaster := record.NewBroadcaster()
recorder := eventBroadcaster.NewRecorder(proxyconfigscheme.Scheme, v1.EventSource{Component: "kube-proxy", Host: hostname})
eventBroadcaster := events.NewBroadcaster(&events.EventSinkImpl{Interface: client.EventsV1()})
recorder := eventBroadcaster.NewRecorder(proxyconfigscheme.Scheme, "kube-proxy")
nodeRef := &v1.ObjectReference{
Kind: "Node",

View File

@ -35,7 +35,7 @@ import (
clientset "k8s.io/client-go/kubernetes"
restclient "k8s.io/client-go/rest"
"k8s.io/client-go/tools/clientcmd"
"k8s.io/client-go/tools/record"
"k8s.io/client-go/tools/events"
cliflag "k8s.io/component-base/cli/flag"
"k8s.io/component-base/logs"
_ "k8s.io/component-base/metrics/prometheus/restclient" // for client metric registration
@ -274,8 +274,8 @@ func run(cmd *cobra.Command, config *hollowNodeConfig) {
execer := &fakeexec.FakeExec{
LookPathFunc: func(_ string) (string, error) { return "", errors.New("fake execer") },
}
eventBroadcaster := record.NewBroadcaster()
recorder := eventBroadcaster.NewRecorder(legacyscheme.Scheme, v1.EventSource{Component: "kube-proxy", Host: config.NodeName})
eventBroadcaster := events.NewBroadcaster(&events.EventSinkImpl{Interface: client.EventsV1()})
recorder := eventBroadcaster.NewRecorder(legacyscheme.Scheme, "kube-proxy")
hollowProxy, err := kubemark.NewHollowProxyOrDie(
config.NodeName,

View File

@ -25,7 +25,7 @@ import (
"k8s.io/apimachinery/pkg/types"
clientset "k8s.io/client-go/kubernetes"
v1core "k8s.io/client-go/kubernetes/typed/core/v1"
"k8s.io/client-go/tools/record"
"k8s.io/client-go/tools/events"
proxyapp "k8s.io/kubernetes/cmd/kube-proxy/app"
"k8s.io/kubernetes/pkg/proxy"
proxyconfig "k8s.io/kubernetes/pkg/proxy/config"
@ -69,8 +69,8 @@ func NewHollowProxyOrDie(
iptInterface utiliptables.Interface,
sysctl utilsysctl.Interface,
execer utilexec.Interface,
broadcaster record.EventBroadcaster,
recorder record.EventRecorder,
broadcaster events.EventBroadcaster,
recorder events.EventRecorder,
useRealProxier bool,
proxierSyncPeriod time.Duration,
proxierMinSyncPeriod time.Duration,

View File

@ -23,13 +23,13 @@ import (
"sync"
"time"
"k8s.io/client-go/tools/events"
"k8s.io/klog/v2"
v1 "k8s.io/api/core/v1"
discovery "k8s.io/api/discovery/v1"
"k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/sets"
"k8s.io/client-go/tools/record"
"k8s.io/kubernetes/pkg/proxy/metrics"
utilproxy "k8s.io/kubernetes/pkg/proxy/util"
utilnet "k8s.io/utils/net"
@ -170,7 +170,7 @@ type EndpointChangeTracker struct {
endpointSliceCache *EndpointSliceCache
// ipfamily identify the ip family on which the tracker is operating on
ipFamily v1.IPFamily
recorder record.EventRecorder
recorder events.EventRecorder
// Map from the Endpoints namespaced-name to the times of the triggers that caused the endpoints
// object to change. Used to calculate the network-programming-latency.
lastChangeTriggerTimes map[types.NamespacedName][]time.Time
@ -182,7 +182,7 @@ type EndpointChangeTracker struct {
}
// NewEndpointChangeTracker initializes an EndpointsChangeMap
func NewEndpointChangeTracker(hostname string, makeEndpointInfo makeEndpointFunc, ipFamily v1.IPFamily, recorder record.EventRecorder, endpointSlicesEnabled bool, processEndpointsMapChange processEndpointsMapChangeFunc) *EndpointChangeTracker {
func NewEndpointChangeTracker(hostname string, makeEndpointInfo makeEndpointFunc, ipFamily v1.IPFamily, recorder events.EventRecorder, endpointSlicesEnabled bool, processEndpointsMapChange processEndpointsMapChangeFunc) *EndpointChangeTracker {
ect := &EndpointChangeTracker{
hostname: hostname,
items: make(map[types.NamespacedName]*endpointsChange),

View File

@ -28,7 +28,7 @@ import (
"k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/sets"
utilfeature "k8s.io/apiserver/pkg/util/feature"
"k8s.io/client-go/tools/record"
"k8s.io/client-go/tools/events"
"k8s.io/klog/v2"
"k8s.io/kubernetes/pkg/features"
utilproxy "k8s.io/kubernetes/pkg/proxy/util"
@ -51,7 +51,7 @@ type EndpointSliceCache struct {
makeEndpointInfo makeEndpointFunc
hostname string
ipFamily v1.IPFamily
recorder record.EventRecorder
recorder events.EventRecorder
}
// endpointSliceTracker keeps track of EndpointSlices as they have been applied
@ -93,7 +93,7 @@ type endpointInfo struct {
type spToEndpointMap map[ServicePortName]map[string]Endpoint
// NewEndpointSliceCache initializes an EndpointSliceCache.
func NewEndpointSliceCache(hostname string, ipFamily v1.IPFamily, recorder record.EventRecorder, makeEndpointInfo makeEndpointFunc) *EndpointSliceCache {
func NewEndpointSliceCache(hostname string, ipFamily v1.IPFamily, recorder events.EventRecorder, makeEndpointInfo makeEndpointFunc) *EndpointSliceCache {
if makeEndpointInfo == nil {
makeEndpointInfo = standardEndpointInfo
}

View File

@ -22,9 +22,9 @@ import (
"sync/atomic"
"time"
"k8s.io/api/core/v1"
v1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/util/clock"
"k8s.io/client-go/tools/record"
"k8s.io/client-go/tools/events"
"k8s.io/klog/v2"
api "k8s.io/kubernetes/pkg/apis/core"
)
@ -54,7 +54,7 @@ type proxierHealthServer struct {
addr string
healthTimeout time.Duration
recorder record.EventRecorder
recorder events.EventRecorder
nodeRef *v1.ObjectReference
lastUpdated atomic.Value
@ -62,11 +62,11 @@ type proxierHealthServer struct {
}
// NewProxierHealthServer returns a proxier health http server.
func NewProxierHealthServer(addr string, healthTimeout time.Duration, recorder record.EventRecorder, nodeRef *v1.ObjectReference) ProxierHealthUpdater {
func NewProxierHealthServer(addr string, healthTimeout time.Duration, recorder events.EventRecorder, nodeRef *v1.ObjectReference) ProxierHealthUpdater {
return newProxierHealthServer(stdNetListener{}, stdHTTPServerFactory{}, clock.RealClock{}, addr, healthTimeout, recorder, nodeRef)
}
func newProxierHealthServer(listener listener, httpServerFactory httpServerFactory, c clock.Clock, addr string, healthTimeout time.Duration, recorder record.EventRecorder, nodeRef *v1.ObjectReference) *proxierHealthServer {
func newProxierHealthServer(listener listener, httpServerFactory httpServerFactory, c clock.Clock, addr string, healthTimeout time.Duration, recorder events.EventRecorder, nodeRef *v1.ObjectReference) *proxierHealthServer {
return &proxierHealthServer{
listener: listener,
httpFactory: httpServerFactory,
@ -99,7 +99,7 @@ func (hs *proxierHealthServer) Run() error {
msg := fmt.Sprintf("failed to start proxier healthz on %s: %v", hs.addr, err)
// TODO(thockin): move eventing back to caller
if hs.recorder != nil {
hs.recorder.Eventf(hs.nodeRef, api.EventTypeWarning, "FailedToStartProxierHealthcheck", msg)
hs.recorder.Eventf(hs.nodeRef, nil, api.EventTypeWarning, "FailedToStartProxierHealthcheck", "StartKubeProxy", msg)
}
return fmt.Errorf("%v", msg)
}

View File

@ -24,11 +24,11 @@ import (
"sync"
"github.com/lithammer/dedent"
v1 "k8s.io/api/core/v1"
"k8s.io/klog/v2"
"k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/types"
"k8s.io/client-go/tools/record"
"k8s.io/client-go/tools/events"
api "k8s.io/kubernetes/pkg/apis/core"
)
@ -48,7 +48,7 @@ type ServiceHealthServer interface {
SyncEndpoints(newEndpoints map[types.NamespacedName]int) error
}
func newServiceHealthServer(hostname string, recorder record.EventRecorder, listener listener, factory httpServerFactory) ServiceHealthServer {
func newServiceHealthServer(hostname string, recorder events.EventRecorder, listener listener, factory httpServerFactory) ServiceHealthServer {
return &server{
hostname: hostname,
recorder: recorder,
@ -59,13 +59,13 @@ func newServiceHealthServer(hostname string, recorder record.EventRecorder, list
}
// NewServiceHealthServer allocates a new service healthcheck server manager
func NewServiceHealthServer(hostname string, recorder record.EventRecorder) ServiceHealthServer {
func NewServiceHealthServer(hostname string, recorder events.EventRecorder) ServiceHealthServer {
return newServiceHealthServer(hostname, recorder, stdNetListener{}, stdHTTPServerFactory{})
}
type server struct {
hostname string
recorder record.EventRecorder // can be nil
recorder events.EventRecorder // can be nil
listener listener
httpFactory httpServerFactory
@ -111,7 +111,7 @@ func (hcs *server) SyncServices(newServices map[types.NamespacedName]uint16) err
Namespace: nsn.Namespace,
Name: nsn.Name,
UID: types.UID(nsn.String()),
}, api.EventTypeWarning, "FailedToStartServiceHealthcheck", msg)
}, nil, api.EventTypeWarning, "FailedToStartServiceHealthcheck", "Listen", msg)
}
klog.Error(msg)
continue

View File

@ -39,7 +39,7 @@ import (
"k8s.io/apimachinery/pkg/util/sets"
"k8s.io/apimachinery/pkg/util/wait"
utilfeature "k8s.io/apiserver/pkg/util/feature"
"k8s.io/client-go/tools/record"
"k8s.io/client-go/tools/events"
"k8s.io/klog/v2"
"k8s.io/kubernetes/pkg/features"
"k8s.io/kubernetes/pkg/proxy"
@ -210,7 +210,7 @@ type Proxier struct {
hostname string
nodeIP net.IP
portMapper utilnet.PortOpener
recorder record.EventRecorder
recorder events.EventRecorder
serviceHealthServer healthcheck.ServiceHealthServer
healthzServer healthcheck.ProxierHealthUpdater
@ -260,7 +260,7 @@ func NewProxier(ipt utiliptables.Interface,
localDetector proxyutiliptables.LocalTrafficDetector,
hostname string,
nodeIP net.IP,
recorder record.EventRecorder,
recorder events.EventRecorder,
healthzServer healthcheck.ProxierHealthUpdater,
nodePortAddresses []string,
) (*Proxier, error) {
@ -357,7 +357,7 @@ func NewDualStackProxier(
localDetectors [2]proxyutiliptables.LocalTrafficDetector,
hostname string,
nodeIP [2]net.IP,
recorder record.EventRecorder,
recorder events.EventRecorder,
healthzServer healthcheck.ProxierHealthUpdater,
nodePortAddresses []string,
) (proxy.Provider, error) {
@ -1121,7 +1121,7 @@ func (proxier *Proxier) syncProxyRules() {
Name: proxier.hostname,
UID: types.UID(proxier.hostname),
Namespace: "",
}, v1.EventTypeWarning, err.Error(), msg)
}, nil, v1.EventTypeWarning, err.Error(), "SyncProxyRules", msg)
klog.ErrorS(err, "can't open port, skipping it", "port", lp.String())
continue
}
@ -1293,7 +1293,7 @@ func (proxier *Proxier) syncProxyRules() {
Name: proxier.hostname,
UID: types.UID(proxier.hostname),
Namespace: "",
}, v1.EventTypeWarning, err.Error(), msg)
}, nil, v1.EventTypeWarning, err.Error(), "SyncProxyRules", msg)
klog.ErrorS(err, "can't open port, skipping it", "port", lp.String())
continue
}

View File

@ -43,7 +43,7 @@ import (
"k8s.io/apimachinery/pkg/util/version"
"k8s.io/apimachinery/pkg/util/wait"
utilfeature "k8s.io/apiserver/pkg/util/feature"
"k8s.io/client-go/tools/record"
"k8s.io/client-go/tools/events"
"k8s.io/kubernetes/pkg/features"
"k8s.io/kubernetes/pkg/proxy"
"k8s.io/kubernetes/pkg/proxy/healthcheck"
@ -247,7 +247,7 @@ type Proxier struct {
hostname string
nodeIP net.IP
portMapper utilnet.PortOpener
recorder record.EventRecorder
recorder events.EventRecorder
serviceHealthServer healthcheck.ServiceHealthServer
healthzServer healthcheck.ProxierHealthUpdater
@ -349,7 +349,7 @@ func NewProxier(ipt utiliptables.Interface,
localDetector proxyutiliptables.LocalTrafficDetector,
hostname string,
nodeIP net.IP,
recorder record.EventRecorder,
recorder events.EventRecorder,
healthzServer healthcheck.ProxierHealthUpdater,
scheduler string,
nodePortAddresses []string,
@ -517,7 +517,7 @@ func NewDualStackProxier(
localDetectors [2]proxyutiliptables.LocalTrafficDetector,
hostname string,
nodeIP [2]net.IP,
recorder record.EventRecorder,
recorder events.EventRecorder,
healthzServer healthcheck.ProxierHealthUpdater,
scheduler string,
nodePortAddresses []string,
@ -1277,7 +1277,7 @@ func (proxier *Proxier) syncProxyRules() {
Name: proxier.hostname,
UID: types.UID(proxier.hostname),
Namespace: "",
}, v1.EventTypeWarning, err.Error(), msg)
}, nil, v1.EventTypeWarning, err.Error(), "SyncProxyRules", msg)
klog.ErrorS(err, "can't open port, skipping it", "port", lp.String())
continue
}
@ -1478,7 +1478,7 @@ func (proxier *Proxier) syncProxyRules() {
Name: proxier.hostname,
UID: types.UID(proxier.hostname),
Namespace: "",
}, v1.EventTypeWarning, err.Error(), msg)
}, nil, v1.EventTypeWarning, err.Error(), "SyncProxyRules", msg)
klog.ErrorS(err, "can't open port, skipping it", "port", lp.String())
continue
}

View File

@ -23,13 +23,13 @@ import (
"strings"
"sync"
"k8s.io/client-go/tools/events"
"k8s.io/klog/v2"
v1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/sets"
utilfeature "k8s.io/apiserver/pkg/util/feature"
"k8s.io/client-go/tools/record"
apiservice "k8s.io/kubernetes/pkg/api/v1/service"
"k8s.io/kubernetes/pkg/features"
"k8s.io/kubernetes/pkg/proxy/metrics"
@ -250,11 +250,11 @@ type ServiceChangeTracker struct {
processServiceMapChange processServiceMapChangeFunc
ipFamily v1.IPFamily
recorder record.EventRecorder
recorder events.EventRecorder
}
// NewServiceChangeTracker initializes a ServiceChangeTracker
func NewServiceChangeTracker(makeServiceInfo makeServicePortFunc, ipFamily v1.IPFamily, recorder record.EventRecorder, processServiceMapChange processServiceMapChangeFunc) *ServiceChangeTracker {
func NewServiceChangeTracker(makeServiceInfo makeServicePortFunc, ipFamily v1.IPFamily, recorder events.EventRecorder, processServiceMapChange processServiceMapChangeFunc) *ServiceChangeTracker {
return &ServiceChangeTracker{
items: make(map[types.NamespacedName]*serviceChange),
makeServiceInfo: makeServiceInfo,

View File

@ -29,7 +29,7 @@ import (
"k8s.io/apimachinery/pkg/types"
utilrand "k8s.io/apimachinery/pkg/util/rand"
"k8s.io/apimachinery/pkg/util/sets"
"k8s.io/client-go/tools/record"
"k8s.io/client-go/tools/events"
helper "k8s.io/kubernetes/pkg/apis/core/v1/helper"
utilsysctl "k8s.io/kubernetes/pkg/util/sysctl"
utilnet "k8s.io/utils/net"
@ -252,7 +252,7 @@ func GetNodeAddresses(cidrs []string, nw NetworkInterfacer) (sets.String, error)
}
// LogAndEmitIncorrectIPVersionEvent logs and emits incorrect IP version event.
func LogAndEmitIncorrectIPVersionEvent(recorder record.EventRecorder, fieldName, fieldValue, svcNamespace, svcName string, svcUID types.UID) {
func LogAndEmitIncorrectIPVersionEvent(recorder events.EventRecorder, fieldName, fieldValue, svcNamespace, svcName string, svcUID types.UID) {
errMsg := fmt.Sprintf("%s in %s has incorrect IP version", fieldValue, fieldName)
klog.Errorf("%s (service %s/%s).", errMsg, svcNamespace, svcName)
if recorder != nil {
@ -262,7 +262,7 @@ func LogAndEmitIncorrectIPVersionEvent(recorder record.EventRecorder, fieldName,
Name: svcName,
Namespace: svcNamespace,
UID: svcUID,
}, v1.EventTypeWarning, "KubeProxyIncorrectIPVersion", errMsg)
}, nil, v1.EventTypeWarning, "KubeProxyIncorrectIPVersion", "GatherEndpoints", errMsg)
}
}

View File

@ -40,7 +40,7 @@ import (
"k8s.io/apimachinery/pkg/util/sets"
"k8s.io/apimachinery/pkg/util/wait"
utilfeature "k8s.io/apiserver/pkg/util/feature"
"k8s.io/client-go/tools/record"
"k8s.io/client-go/tools/events"
"k8s.io/klog/v2"
"k8s.io/kubernetes/pkg/apis/core/v1/helper"
"k8s.io/kubernetes/pkg/features"
@ -473,7 +473,7 @@ type Proxier struct {
clusterCIDR string
hostname string
nodeIP net.IP
recorder record.EventRecorder
recorder events.EventRecorder
serviceHealthServer healthcheck.ServiceHealthServer
healthzServer healthcheck.ProxierHealthUpdater
@ -531,7 +531,7 @@ func NewProxier(
clusterCIDR string,
hostname string,
nodeIP net.IP,
recorder record.EventRecorder,
recorder events.EventRecorder,
healthzServer healthcheck.ProxierHealthUpdater,
config config.KubeProxyWinkernelConfiguration,
) (*Proxier, error) {
@ -672,7 +672,7 @@ func NewDualStackProxier(
clusterCIDR string,
hostname string,
nodeIP [2]net.IP,
recorder record.EventRecorder,
recorder events.EventRecorder,
healthzServer healthcheck.ProxierHealthUpdater,
config config.KubeProxyWinkernelConfiguration,
) (proxy.Provider, error) {