mirror of
https://github.com/k3s-io/kubernetes.git
synced 2025-07-30 15:05:27 +00:00
Make scheduler not miss deletion events even in the case of a resync.
This commit is contained in:
parent
880f922bb6
commit
5f7715f0e9
3
pkg/client/cache/store.go
vendored
3
pkg/client/cache/store.go
vendored
@ -68,6 +68,9 @@ type ExplicitKey string
|
||||
// The key uses the format <namespace>/<name> unless <namespace> is empty, then
|
||||
// it's just <name>.
|
||||
func MetaNamespaceKeyFunc(obj interface{}) (string, error) {
|
||||
if key, ok := obj.(ExplicitKey); ok {
|
||||
return string(key), nil
|
||||
}
|
||||
meta, err := meta.Accessor(obj)
|
||||
if err != nil {
|
||||
return "", fmt.Errorf("object has no meta: %v", err)
|
||||
|
@ -26,6 +26,7 @@ import (
|
||||
"github.com/GoogleCloudPlatform/kubernetes/pkg/api"
|
||||
"github.com/GoogleCloudPlatform/kubernetes/pkg/client"
|
||||
"github.com/GoogleCloudPlatform/kubernetes/pkg/client/cache"
|
||||
"github.com/GoogleCloudPlatform/kubernetes/pkg/controller/framework"
|
||||
"github.com/GoogleCloudPlatform/kubernetes/pkg/fields"
|
||||
"github.com/GoogleCloudPlatform/kubernetes/pkg/labels"
|
||||
algorithm "github.com/GoogleCloudPlatform/kubernetes/pkg/scheduler"
|
||||
@ -51,7 +52,11 @@ type ConfigFactory struct {
|
||||
// a means to list all services
|
||||
ServiceLister *cache.StoreToServiceLister
|
||||
|
||||
modeler scheduler.SystemModeler
|
||||
// Close this to stop all reflectors
|
||||
StopEverything chan struct{}
|
||||
|
||||
scheduledPodPopulator *framework.Controller
|
||||
modeler scheduler.SystemModeler
|
||||
}
|
||||
|
||||
// Initializes the factory.
|
||||
@ -59,13 +64,40 @@ func NewConfigFactory(client *client.Client) *ConfigFactory {
|
||||
c := &ConfigFactory{
|
||||
Client: client,
|
||||
PodQueue: cache.NewFIFO(cache.MetaNamespaceKeyFunc),
|
||||
ScheduledPodLister: &cache.StoreToPodLister{cache.NewStore(cache.MetaNamespaceKeyFunc)},
|
||||
ScheduledPodLister: &cache.StoreToPodLister{},
|
||||
NodeLister: &cache.StoreToNodeLister{cache.NewStore(cache.MetaNamespaceKeyFunc)},
|
||||
ServiceLister: &cache.StoreToServiceLister{cache.NewStore(cache.MetaNamespaceKeyFunc)},
|
||||
StopEverything: make(chan struct{}),
|
||||
}
|
||||
modeler := scheduler.NewSimpleModeler(&cache.StoreToPodLister{c.PodQueue}, c.ScheduledPodLister)
|
||||
c.modeler = modeler
|
||||
c.PodLister = modeler.PodLister()
|
||||
|
||||
// On add/delete to the scheduled pods, remove from the assumed pods.
|
||||
// We construct this here instead of in CreateFromKeys because
|
||||
// ScheduledPodLister is something we provide to plug in functions that
|
||||
// they may need to call.
|
||||
c.ScheduledPodLister.Store, c.scheduledPodPopulator = framework.NewInformer(
|
||||
c.createAssignedPodLW(),
|
||||
&api.Pod{},
|
||||
0,
|
||||
framework.ResourceEventHandlerFuncs{
|
||||
AddFunc: func(obj interface{}) {
|
||||
if pod, ok := obj.(*api.Pod); ok {
|
||||
c.modeler.ForgetPod(pod)
|
||||
}
|
||||
},
|
||||
DeleteFunc: func(obj interface{}) {
|
||||
switch t := obj.(type) {
|
||||
case *api.Pod:
|
||||
c.modeler.ForgetPod(t)
|
||||
case cache.DeletedFinalStateUnknown:
|
||||
c.modeler.ForgetPodByKey(t.Key)
|
||||
}
|
||||
},
|
||||
},
|
||||
)
|
||||
|
||||
return c
|
||||
}
|
||||
|
||||
@ -109,21 +141,6 @@ func (f *ConfigFactory) CreateFromConfig(policy schedulerapi.Policy) (*scheduler
|
||||
return f.CreateFromKeys(predicateKeys, priorityKeys)
|
||||
}
|
||||
|
||||
// ReflectorDeletionHook passes all operations through to Store, but calls
|
||||
// OnDelete in a goroutine if there is a deletion.
|
||||
type ReflectorDeletionHook struct {
|
||||
cache.Store
|
||||
OnDelete func(obj interface{})
|
||||
}
|
||||
|
||||
func (r ReflectorDeletionHook) Delete(obj interface{}) error {
|
||||
go func() {
|
||||
defer util.HandleCrash()
|
||||
r.OnDelete(obj)
|
||||
}()
|
||||
return r.Store.Delete(obj)
|
||||
}
|
||||
|
||||
// Creates a scheduler from a set of registered fit predicate keys and priority keys.
|
||||
func (f *ConfigFactory) CreateFromKeys(predicateKeys, priorityKeys util.StringSet) (*scheduler.Config, error) {
|
||||
glog.V(2).Infof("creating scheduler with fit predicates '%v' and priority functions '%v", predicateKeys, priorityKeys)
|
||||
@ -144,39 +161,25 @@ func (f *ConfigFactory) CreateFromKeys(predicateKeys, priorityKeys util.StringSe
|
||||
}
|
||||
|
||||
// Watch and queue pods that need scheduling.
|
||||
cache.NewReflector(f.createUnassignedPodLW(), &api.Pod{}, f.PodQueue, 0).Run()
|
||||
cache.NewReflector(f.createUnassignedPodLW(), &api.Pod{}, f.PodQueue, 0).RunUntil(f.StopEverything)
|
||||
|
||||
// Pass through all events to the scheduled pod store, but on a deletion,
|
||||
// also remove from the assumed pods.
|
||||
assumedPodDeleter := ReflectorDeletionHook{
|
||||
Store: f.ScheduledPodLister.Store,
|
||||
OnDelete: func(obj interface{}) {
|
||||
if pod, ok := obj.(*api.Pod); ok {
|
||||
f.modeler.LockedAction(func() {
|
||||
f.modeler.ForgetPod(pod)
|
||||
})
|
||||
}
|
||||
},
|
||||
}
|
||||
|
||||
// Watch and cache all running pods. Scheduler needs to find all pods
|
||||
// so it knows where it's safe to place a pod. Cache this locally.
|
||||
cache.NewReflector(f.createAssignedPodLW(), &api.Pod{}, assumedPodDeleter, 0).Run()
|
||||
// Begin populating scheduled pods.
|
||||
f.scheduledPodPopulator.Run(f.StopEverything)
|
||||
|
||||
// Watch minions.
|
||||
// Minions may be listed frequently, so provide a local up-to-date cache.
|
||||
if false {
|
||||
// Disable this code until minions support watches. Note when this code is enabled,
|
||||
// we need to make sure minion ListWatcher has proper FieldSelector.
|
||||
cache.NewReflector(f.createMinionLW(), &api.Node{}, f.NodeLister.Store, 0).Run()
|
||||
cache.NewReflector(f.createMinionLW(), &api.Node{}, f.NodeLister.Store, 10*time.Second).RunUntil(f.StopEverything)
|
||||
} else {
|
||||
cache.NewPoller(f.pollMinions, 10*time.Second, f.NodeLister.Store).Run()
|
||||
cache.NewPoller(f.pollMinions, 10*time.Second, f.NodeLister.Store).RunUntil(f.StopEverything)
|
||||
}
|
||||
|
||||
// Watch and cache all service objects. Scheduler needs to find all pods
|
||||
// created by the same service, so that it can spread them correctly.
|
||||
// Cache this locally.
|
||||
cache.NewReflector(f.createServiceLW(), &api.Service{}, f.ServiceLister.Store, 0).Run()
|
||||
cache.NewReflector(f.createServiceLW(), &api.Service{}, f.ServiceLister.Store, 0).RunUntil(f.StopEverything)
|
||||
|
||||
r := rand.New(rand.NewSource(time.Now().UnixNano()))
|
||||
|
||||
@ -200,7 +203,8 @@ func (f *ConfigFactory) CreateFromKeys(predicateKeys, priorityKeys util.StringSe
|
||||
glog.V(2).Infof("About to try and schedule pod %v", pod.Name)
|
||||
return pod
|
||||
},
|
||||
Error: f.makeDefaultErrorFunc(&podBackoff, f.PodQueue),
|
||||
Error: f.makeDefaultErrorFunc(&podBackoff, f.PodQueue),
|
||||
StopEverything: f.StopEverything,
|
||||
}, nil
|
||||
}
|
||||
|
||||
|
@ -57,8 +57,9 @@ func (a *actionLocker) LockedAction(do func()) {
|
||||
|
||||
// FakeModeler implements the SystemModeler interface.
|
||||
type FakeModeler struct {
|
||||
AssumePodFunc func(pod *api.Pod)
|
||||
ForgetPodFunc func(pod *api.Pod)
|
||||
AssumePodFunc func(pod *api.Pod)
|
||||
ForgetPodFunc func(pod *api.Pod)
|
||||
ForgetPodByKeyFunc func(key string)
|
||||
actionLocker
|
||||
}
|
||||
|
||||
@ -76,6 +77,13 @@ func (f *FakeModeler) ForgetPod(pod *api.Pod) {
|
||||
}
|
||||
}
|
||||
|
||||
// ForgetPodByKey calls the function variable if it is not nil.
|
||||
func (f *FakeModeler) ForgetPodByKey(key string) {
|
||||
if f.ForgetPodFunc != nil {
|
||||
f.ForgetPodByKeyFunc(key)
|
||||
}
|
||||
}
|
||||
|
||||
// SimpleModeler implements the SystemModeler interface with a timed pod cache.
|
||||
type SimpleModeler struct {
|
||||
queuedPods ExtendedPodLister
|
||||
@ -110,6 +118,10 @@ func (s *SimpleModeler) ForgetPod(pod *api.Pod) {
|
||||
s.assumedPods.Delete(pod)
|
||||
}
|
||||
|
||||
func (s *SimpleModeler) ForgetPodByKey(key string) {
|
||||
s.assumedPods.Delete(cache.ExplicitKey(key))
|
||||
}
|
||||
|
||||
// Extract names for readable logging.
|
||||
func podNames(pods []api.Pod) []string {
|
||||
out := make([]string, len(pods))
|
||||
|
@ -51,6 +51,7 @@ type SystemModeler interface {
|
||||
// show the absence of the given pod if the pod is in the scheduled
|
||||
// pods list!)
|
||||
ForgetPod(pod *api.Pod)
|
||||
ForgetPodByKey(key string)
|
||||
|
||||
// For serializing calls to Assume/ForgetPod: imagine you want to add
|
||||
// a pod iff a bind succeeds, but also remove a pod if it is deleted.
|
||||
@ -85,6 +86,9 @@ type Config struct {
|
||||
|
||||
// Recorder is the EventRecorder to use
|
||||
Recorder record.EventRecorder
|
||||
|
||||
// Close this to shut down the scheduler.
|
||||
StopEverything chan struct{}
|
||||
}
|
||||
|
||||
// New returns a new scheduler.
|
||||
@ -98,7 +102,7 @@ func New(c *Config) *Scheduler {
|
||||
|
||||
// Run begins watching and scheduling. It starts a goroutine and returns immediately.
|
||||
func (s *Scheduler) Run() {
|
||||
go util.Forever(s.scheduleOne, 0)
|
||||
go util.Until(s.scheduleOne, 0, s.config.StopEverything)
|
||||
}
|
||||
|
||||
func (s *Scheduler) scheduleOne() {
|
||||
|
Loading…
Reference in New Issue
Block a user