PVC informer lister supports listing

This commit is contained in:
derekwaynecarr 2016-10-14 14:26:25 -04:00
parent 6e1bb2d2f7
commit 555231fad7
5 changed files with 72 additions and 35 deletions

View File

@ -288,25 +288,6 @@ func (s *StoreToPVFetcher) GetPersistentVolumeInfo(id string) (*api.PersistentVo
return o.(*api.PersistentVolume), nil return o.(*api.PersistentVolume), nil
} }
// Typed wrapper around a store of PersistentVolumeClaims
type StoreToPVCFetcher struct {
Store
}
// GetPersistentVolumeClaimInfo returns cached data for the PersistentVolumeClaim 'id'.
func (s *StoreToPVCFetcher) GetPersistentVolumeClaimInfo(namespace string, id string) (*api.PersistentVolumeClaim, error) {
o, exists, err := s.Get(&api.PersistentVolumeClaim{ObjectMeta: api.ObjectMeta{Namespace: namespace, Name: id}})
if err != nil {
return nil, fmt.Errorf("error retrieving PersistentVolumeClaim '%s/%s' from cache: %v", namespace, id, err)
}
if !exists {
return nil, fmt.Errorf("PersistentVolumeClaim '%s/%s' not found", namespace, id)
}
return o.(*api.PersistentVolumeClaim), nil
}
// StoreToPetSetLister gives a store List and Exists methods. The store must contain only PetSets. // StoreToPetSetLister gives a store List and Exists methods. The store must contain only PetSets.
type StoreToPetSetLister struct { type StoreToPetSetLister struct {
Store Store

View File

@ -216,6 +216,19 @@ func (s *StoreToLimitRangeLister) List(selector labels.Selector) (ret []*api.Lim
return ret, err return ret, err
} }
// StoreToPersistentVolumeClaimLister helps list pvcs
type StoreToPersistentVolumeClaimLister struct {
Indexer Indexer
}
// List returns all persistentvolumeclaims that match the specified selector
func (s *StoreToPersistentVolumeClaimLister) List(selector labels.Selector) (ret []*api.PersistentVolumeClaim, err error) {
err = ListAll(s.Indexer, selector, func(m interface{}) {
ret = append(ret, m.(*api.PersistentVolumeClaim))
})
return ret, err
}
func (s *StoreToLimitRangeLister) LimitRanges(namespace string) storeLimitRangesNamespacer { func (s *StoreToLimitRangeLister) LimitRanges(namespace string) storeLimitRangesNamespacer {
return storeLimitRangesNamespacer{s.Indexer, namespace} return storeLimitRangesNamespacer{s.Indexer, namespace}
} }
@ -242,3 +255,31 @@ func (s storeLimitRangesNamespacer) Get(name string) (*api.LimitRange, error) {
} }
return obj.(*api.LimitRange), nil return obj.(*api.LimitRange), nil
} }
// PersistentVolumeClaims returns all claims in a specified namespace.
func (s *StoreToPersistentVolumeClaimLister) PersistentVolumeClaims(namespace string) storePersistentVolumeClaimsNamespacer {
return storePersistentVolumeClaimsNamespacer{Indexer: s.Indexer, namespace: namespace}
}
type storePersistentVolumeClaimsNamespacer struct {
Indexer Indexer
namespace string
}
func (s storePersistentVolumeClaimsNamespacer) List(selector labels.Selector) (ret []*api.PersistentVolumeClaim, err error) {
err = ListAllByNamespace(s.Indexer, s.namespace, selector, func(m interface{}) {
ret = append(ret, m.(*api.PersistentVolumeClaim))
})
return ret, err
}
func (s storePersistentVolumeClaimsNamespacer) Get(name string) (*api.PersistentVolumeClaim, error) {
obj, exists, err := s.Indexer.GetByKey(s.namespace + "/" + name)
if err != nil {
return nil, err
}
if !exists {
return nil, errors.NewNotFound(api.Resource("persistentvolumeclaims"), name)
}
return obj.(*api.PersistentVolumeClaim), nil
}

View File

@ -139,7 +139,7 @@ func (f *nodeInformer) Lister() *cache.StoreToNodeLister {
// Interface provides constructor for informer and lister for persistent volume claims // Interface provides constructor for informer and lister for persistent volume claims
type PVCInformer interface { type PVCInformer interface {
Informer() cache.SharedIndexInformer Informer() cache.SharedIndexInformer
Lister() *cache.StoreToPVCFetcher Lister() *cache.StoreToPersistentVolumeClaimLister
} }
type pvcInformer struct { type pvcInformer struct {
@ -164,9 +164,9 @@ func (f *pvcInformer) Informer() cache.SharedIndexInformer {
} }
// Lister returns lister for pvcInformer // Lister returns lister for pvcInformer
func (f *pvcInformer) Lister() *cache.StoreToPVCFetcher { func (f *pvcInformer) Lister() *cache.StoreToPersistentVolumeClaimLister {
informer := f.Informer() informer := f.Informer()
return &cache.StoreToPVCFetcher{Store: informer.GetStore()} return &cache.StoreToPersistentVolumeClaimLister{Indexer: informer.GetIndexer()}
} }
//***************************************************************************** //*****************************************************************************
@ -291,7 +291,8 @@ func NewPVCInformer(client clientset.Interface, resyncPeriod time.Duration) cach
}, },
&api.PersistentVolumeClaim{}, &api.PersistentVolumeClaim{},
resyncPeriod, resyncPeriod,
cache.Indexers{}) cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc},
)
return sharedIndexInformer return sharedIndexInformer
} }

View File

@ -45,7 +45,17 @@ type PersistentVolumeInfo interface {
} }
type PersistentVolumeClaimInfo interface { type PersistentVolumeClaimInfo interface {
GetPersistentVolumeClaimInfo(namespace string, pvcID string) (*api.PersistentVolumeClaim, error) GetPersistentVolumeClaimInfo(namespace string, name string) (*api.PersistentVolumeClaim, error)
}
// CachedPersistentVolumeClaimInfo implements PersistentVolumeClaimInfo
type CachedPersistentVolumeClaimInfo struct {
*cache.StoreToPersistentVolumeClaimLister
}
// GetPersistentVolumeClaimInfo fetches the claim in specified namespace with specified name
func (c *CachedPersistentVolumeClaimInfo) GetPersistentVolumeClaimInfo(namespace string, name string) (*api.PersistentVolumeClaim, error) {
return c.PersistentVolumeClaims(namespace).Get(name)
} }
type CachedNodeInfo struct { type CachedNodeInfo struct {

View File

@ -29,6 +29,7 @@ import (
"k8s.io/kubernetes/pkg/api/errors" "k8s.io/kubernetes/pkg/api/errors"
"k8s.io/kubernetes/pkg/client/cache" "k8s.io/kubernetes/pkg/client/cache"
clientset "k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset" clientset "k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset"
"k8s.io/kubernetes/pkg/controller/informers"
"k8s.io/kubernetes/pkg/fields" "k8s.io/kubernetes/pkg/fields"
"k8s.io/kubernetes/pkg/types" "k8s.io/kubernetes/pkg/types"
"k8s.io/kubernetes/pkg/util/runtime" "k8s.io/kubernetes/pkg/util/runtime"
@ -65,7 +66,7 @@ type ConfigFactory struct {
// a means to list all PersistentVolumes // a means to list all PersistentVolumes
PVLister *cache.StoreToPVFetcher PVLister *cache.StoreToPVFetcher
// a means to list all PersistentVolumeClaims // a means to list all PersistentVolumeClaims
PVCLister *cache.StoreToPVCFetcher PVCLister *cache.StoreToPersistentVolumeClaimLister
// a means to list all services // a means to list all services
ServiceLister *cache.StoreToServiceLister ServiceLister *cache.StoreToServiceLister
// a means to list all controllers // a means to list all controllers
@ -76,10 +77,11 @@ type ConfigFactory struct {
// Close this to stop all reflectors // Close this to stop all reflectors
StopEverything chan struct{} StopEverything chan struct{}
informerFactory informers.SharedInformerFactory
scheduledPodPopulator *cache.Controller scheduledPodPopulator *cache.Controller
nodePopulator *cache.Controller nodePopulator *cache.Controller
pvPopulator *cache.Controller pvPopulator *cache.Controller
pvcPopulator *cache.Controller pvcPopulator cache.ControllerInterface
servicePopulator *cache.Controller servicePopulator *cache.Controller
controllerPopulator *cache.Controller controllerPopulator *cache.Controller
@ -107,14 +109,20 @@ func NewConfigFactory(client clientset.Interface, schedulerName string, hardPodA
stopEverything := make(chan struct{}) stopEverything := make(chan struct{})
schedulerCache := schedulercache.New(30*time.Second, stopEverything) schedulerCache := schedulercache.New(30*time.Second, stopEverything)
// TODO: pass this in as an argument...
informerFactory := informers.NewSharedInformerFactory(client, 0)
pvcInformer := informerFactory.PersistentVolumeClaims()
c := &ConfigFactory{ c := &ConfigFactory{
Client: client, Client: client,
PodQueue: cache.NewFIFO(cache.MetaNamespaceKeyFunc), PodQueue: cache.NewFIFO(cache.MetaNamespaceKeyFunc),
ScheduledPodLister: &cache.StoreToPodLister{}, ScheduledPodLister: &cache.StoreToPodLister{},
informerFactory: informerFactory,
// Only nodes in the "Ready" condition with status == "True" are schedulable // Only nodes in the "Ready" condition with status == "True" are schedulable
NodeLister: &cache.StoreToNodeLister{}, NodeLister: &cache.StoreToNodeLister{},
PVLister: &cache.StoreToPVFetcher{Store: cache.NewStore(cache.MetaNamespaceKeyFunc)}, PVLister: &cache.StoreToPVFetcher{Store: cache.NewStore(cache.MetaNamespaceKeyFunc)},
PVCLister: &cache.StoreToPVCFetcher{Store: cache.NewStore(cache.MetaNamespaceKeyFunc)}, PVCLister: pvcInformer.Lister(),
pvcPopulator: pvcInformer.Informer().GetController(),
ServiceLister: &cache.StoreToServiceLister{Indexer: cache.NewIndexer(cache.MetaNamespaceKeyFunc, cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc})}, ServiceLister: &cache.StoreToServiceLister{Indexer: cache.NewIndexer(cache.MetaNamespaceKeyFunc, cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc})},
ControllerLister: &cache.StoreToReplicationControllerLister{Indexer: cache.NewIndexer(cache.MetaNamespaceKeyFunc, cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc})}, ControllerLister: &cache.StoreToReplicationControllerLister{Indexer: cache.NewIndexer(cache.MetaNamespaceKeyFunc, cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc})},
ReplicaSetLister: &cache.StoreToReplicaSetLister{Indexer: cache.NewIndexer(cache.MetaNamespaceKeyFunc, cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc})}, ReplicaSetLister: &cache.StoreToReplicaSetLister{Indexer: cache.NewIndexer(cache.MetaNamespaceKeyFunc, cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc})},
@ -162,13 +170,6 @@ func NewConfigFactory(client clientset.Interface, schedulerName string, hardPodA
cache.ResourceEventHandlerFuncs{}, cache.ResourceEventHandlerFuncs{},
) )
c.PVCLister.Store, c.pvcPopulator = cache.NewInformer(
c.createPersistentVolumeClaimLW(),
&api.PersistentVolumeClaim{},
0,
cache.ResourceEventHandlerFuncs{},
)
c.ServiceLister.Indexer, c.servicePopulator = cache.NewIndexerInformer( c.ServiceLister.Indexer, c.servicePopulator = cache.NewIndexerInformer(
c.createServiceLW(), c.createServiceLW(),
&api.Service{}, &api.Service{},
@ -434,7 +435,7 @@ func (f *ConfigFactory) getPluginArgs() (*PluginFactoryArgs, error) {
NodeLister: f.NodeLister.NodeCondition(getNodeConditionPredicate()), NodeLister: f.NodeLister.NodeCondition(getNodeConditionPredicate()),
NodeInfo: &predicates.CachedNodeInfo{StoreToNodeLister: f.NodeLister}, NodeInfo: &predicates.CachedNodeInfo{StoreToNodeLister: f.NodeLister},
PVInfo: f.PVLister, PVInfo: f.PVLister,
PVCInfo: f.PVCLister, PVCInfo: &predicates.CachedPersistentVolumeClaimInfo{StoreToPersistentVolumeClaimLister: f.PVCLister},
HardPodAffinitySymmetricWeight: f.HardPodAffinitySymmetricWeight, HardPodAffinitySymmetricWeight: f.HardPodAffinitySymmetricWeight,
FailureDomains: sets.NewString(failureDomainArgs...).List(), FailureDomains: sets.NewString(failureDomainArgs...).List(),
}, nil }, nil
@ -460,6 +461,9 @@ func (f *ConfigFactory) Run() {
// Begin populating controllers // Begin populating controllers
go f.controllerPopulator.Run(f.StopEverything) go f.controllerPopulator.Run(f.StopEverything)
// start informers...
f.informerFactory.Start(f.StopEverything)
// Watch and cache all ReplicaSet objects. Scheduler needs to find all pods // Watch and cache all ReplicaSet objects. Scheduler needs to find all pods
// created by the same services or ReplicationControllers/ReplicaSets, so that it can spread them correctly. // created by the same services or ReplicationControllers/ReplicaSets, so that it can spread them correctly.
// Cache this locally. // Cache this locally.