Merge pull request #83692 from yastij/fix-events-scheduler

add a fallback for kube-scheduler  when events.k8s.io is disabled
This commit is contained in:
Kubernetes Prow Robot 2019-10-28 14:08:43 -07:00 committed by GitHub
commit 486e2380bd
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 47 additions and 16 deletions

View File

@ -20,6 +20,8 @@ go_library(
"//pkg/scheduler/metrics:go_default_library",
"//pkg/util/configz:go_default_library",
"//pkg/util/flag:go_default_library",
"//staging/src/k8s.io/api/core/v1:go_default_library",
"//staging/src/k8s.io/api/events/v1beta1:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/util/errors:go_default_library",
"//staging/src/k8s.io/apiserver/pkg/authentication/authenticator:go_default_library",
"//staging/src/k8s.io/apiserver/pkg/authorization/authorizer:go_default_library",
@ -30,8 +32,11 @@ go_library(
"//staging/src/k8s.io/apiserver/pkg/server/mux:go_default_library",
"//staging/src/k8s.io/apiserver/pkg/server/routes:go_default_library",
"//staging/src/k8s.io/apiserver/pkg/util/term:go_default_library",
"//staging/src/k8s.io/client-go/kubernetes/scheme:go_default_library",
"//staging/src/k8s.io/client-go/kubernetes/typed/core/v1:go_default_library",
"//staging/src/k8s.io/client-go/tools/events:go_default_library",
"//staging/src/k8s.io/client-go/tools/leaderelection:go_default_library",
"//staging/src/k8s.io/client-go/tools/record:go_default_library",
"//staging/src/k8s.io/component-base/cli/flag:go_default_library",
"//staging/src/k8s.io/component-base/cli/globalflag:go_default_library",
"//staging/src/k8s.io/component-base/logs:go_default_library",

View File

@ -47,12 +47,12 @@ type Config struct {
Client clientset.Interface
InformerFactory informers.SharedInformerFactory
PodInformer coreinformers.PodInformer
EventClient v1beta1.EventsGetter
// TODO: Remove the following after fully migrating to the new events api.
CoreEventClient v1core.EventsGetter
LeaderElectionBroadcaster record.EventBroadcaster
CoreEventClient v1core.EventsGetter
CoreBroadcaster record.EventBroadcaster
EventClient v1beta1.EventsGetter
Recorder events.EventRecorder
Broadcaster events.EventBroadcaster

View File

@ -32,7 +32,6 @@ go_library(
"//staging/src/k8s.io/client-go/rest:go_default_library",
"//staging/src/k8s.io/client-go/tools/clientcmd:go_default_library",
"//staging/src/k8s.io/client-go/tools/clientcmd/api:go_default_library",
"//staging/src/k8s.io/client-go/tools/events:go_default_library",
"//staging/src/k8s.io/client-go/tools/leaderelection:go_default_library",
"//staging/src/k8s.io/client-go/tools/leaderelection/resourcelock:go_default_library",
"//staging/src/k8s.io/client-go/tools/record:go_default_library",

View File

@ -34,7 +34,6 @@ import (
restclient "k8s.io/client-go/rest"
"k8s.io/client-go/tools/clientcmd"
clientcmdapi "k8s.io/client-go/tools/clientcmd/api"
"k8s.io/client-go/tools/events"
"k8s.io/client-go/tools/leaderelection"
"k8s.io/client-go/tools/leaderelection/resourcelock"
"k8s.io/client-go/tools/record"
@ -236,16 +235,13 @@ func (o *Options) Config() (*schedulerappconfig.Config, error) {
return nil, err
}
// Prepare event clients.
eventBroadcaster := events.NewBroadcaster(&events.EventSinkImpl{Interface: eventClient.EventsV1beta1().Events("")})
recorder := eventBroadcaster.NewRecorder(scheme.Scheme, c.ComponentConfig.SchedulerName)
leaderElectionBroadcaster := record.NewBroadcaster()
leaderElectionRecorder := leaderElectionBroadcaster.NewRecorder(scheme.Scheme, corev1.EventSource{Component: c.ComponentConfig.SchedulerName})
coreBroadcaster := record.NewBroadcaster()
coreRecorder := coreBroadcaster.NewRecorder(scheme.Scheme, corev1.EventSource{Component: c.ComponentConfig.SchedulerName})
// Set up leader election if enabled.
var leaderElectionConfig *leaderelection.LeaderElectionConfig
if c.ComponentConfig.LeaderElection.LeaderElect {
leaderElectionConfig, err = makeLeaderElectionConfig(c.ComponentConfig.LeaderElection, leaderElectionClient, leaderElectionRecorder)
leaderElectionConfig, err = makeLeaderElectionConfig(c.ComponentConfig.LeaderElection, leaderElectionClient, coreRecorder)
if err != nil {
return nil, err
}
@ -256,9 +252,7 @@ func (o *Options) Config() (*schedulerappconfig.Config, error) {
c.PodInformer = scheduler.NewPodInformer(client, 0)
c.EventClient = eventClient.EventsV1beta1()
c.CoreEventClient = eventClient.CoreV1()
c.Recorder = recorder
c.Broadcaster = eventBroadcaster
c.LeaderElectionBroadcaster = leaderElectionBroadcaster
c.CoreBroadcaster = coreBroadcaster
c.LeaderElection = leaderElectionConfig
return c, nil

View File

@ -27,6 +27,8 @@ import (
"github.com/spf13/cobra"
"k8s.io/api/core/v1"
eventsv1beta1 "k8s.io/api/events/v1beta1"
utilerrors "k8s.io/apimachinery/pkg/util/errors"
"k8s.io/apiserver/pkg/authentication/authenticator"
"k8s.io/apiserver/pkg/authorization/authorizer"
@ -37,8 +39,11 @@ import (
"k8s.io/apiserver/pkg/server/mux"
"k8s.io/apiserver/pkg/server/routes"
"k8s.io/apiserver/pkg/util/term"
"k8s.io/client-go/kubernetes/scheme"
corev1 "k8s.io/client-go/kubernetes/typed/core/v1"
"k8s.io/client-go/tools/events"
"k8s.io/client-go/tools/leaderelection"
"k8s.io/client-go/tools/record"
cliflag "k8s.io/component-base/cli/flag"
"k8s.io/component-base/cli/globalflag"
"k8s.io/component-base/logs"
@ -168,6 +173,15 @@ func Run(ctx context.Context, cc schedulerserverconfig.CompletedConfig, outOfTre
}
}
// Prepare event clients.
if _, err := cc.Client.Discovery().ServerResourcesForGroupVersion(eventsv1beta1.SchemeGroupVersion.String()); err == nil {
cc.Broadcaster = events.NewBroadcaster(&events.EventSinkImpl{Interface: cc.EventClient.Events("")})
cc.Recorder = cc.Broadcaster.NewRecorder(scheme.Scheme, cc.ComponentConfig.SchedulerName)
} else {
recorder := cc.CoreBroadcaster.NewRecorder(scheme.Scheme, v1.EventSource{Component: cc.ComponentConfig.SchedulerName})
cc.Recorder = record.NewEventRecorderAdapter(recorder)
}
// Create the scheduler.
sched, err := scheduler.New(cc.Client,
cc.InformerFactory,
@ -194,8 +208,8 @@ func Run(ctx context.Context, cc schedulerserverconfig.CompletedConfig, outOfTre
if cc.Broadcaster != nil && cc.EventClient != nil {
cc.Broadcaster.StartRecordingToSink(ctx.Done())
}
if cc.LeaderElectionBroadcaster != nil && cc.CoreEventClient != nil {
cc.LeaderElectionBroadcaster.StartRecordingToSink(&corev1.EventSinkImpl{Interface: cc.CoreEventClient.Events("")})
if cc.CoreBroadcaster != nil && cc.CoreEventClient != nil {
cc.CoreBroadcaster.StartRecordingToSink(&corev1.EventSinkImpl{Interface: cc.CoreEventClient.Events("")})
}
// Setup healthz checks.
var checks []healthz.HealthChecker

View File

@ -132,6 +132,25 @@ type EventBroadcaster interface {
Shutdown()
}
// EventRecorderAdapter is a wrapper around EventRecorder implementing the
// new EventRecorder interface.
type EventRecorderAdapter struct {
recorder EventRecorder
}
// NewEventRecorderAdapter returns an adapter implementing new EventRecorder
// interface.
func NewEventRecorderAdapter(recorder EventRecorder) *EventRecorderAdapter {
return &EventRecorderAdapter{
recorder: recorder,
}
}
// Eventf is a wrapper around v1 Eventf
func (a *EventRecorderAdapter) Eventf(regarding, _ runtime.Object, eventtype, reason, action, note string, args ...interface{}) {
a.recorder.Eventf(regarding, eventtype, reason, note, args...)
}
// Creates a new event broadcaster.
func NewBroadcaster() EventBroadcaster {
return &eventBroadcasterImpl{