node controller use shared pod informer

This commit is contained in:
Seth Jennings 2016-07-20 15:26:07 -05:00
parent 6d77f53af4
commit db6026c82a
4 changed files with 50 additions and 34 deletions

View File

@ -239,7 +239,7 @@ func StartControllers(s *options.CMServer, kubeClient *client.Client, kubeconfig
if err != nil { if err != nil {
glog.Warningf("Unsuccessful parsing of service CIDR %v: %v", s.ServiceCIDR, err) glog.Warningf("Unsuccessful parsing of service CIDR %v: %v", s.ServiceCIDR, err)
} }
nodeController, err := nodecontroller.NewNodeController(cloud, clientset.NewForConfigOrDie(restclient.AddUserAgent(kubeconfig, "node-controller")), nodeController, err := nodecontroller.NewNodeController(podInformer, cloud, clientset.NewForConfigOrDie(restclient.AddUserAgent(kubeconfig, "node-controller")),
s.PodEvictionTimeout.Duration, s.DeletingPodsQps, s.NodeMonitorGracePeriod.Duration, s.PodEvictionTimeout.Duration, s.DeletingPodsQps, s.NodeMonitorGracePeriod.Duration,
s.NodeStartupGracePeriod.Duration, s.NodeMonitorPeriod.Duration, clusterCIDR, serviceCIDR, s.NodeStartupGracePeriod.Duration, s.NodeMonitorPeriod.Duration, clusterCIDR, serviceCIDR,
int(s.NodeCIDRMaskSize), s.AllocateNodeCIDRs) int(s.NodeCIDRMaskSize), s.AllocateNodeCIDRs)

View File

@ -153,7 +153,7 @@ func (s *CMServer) Run(_ []string) error {
} }
_, clusterCIDR, _ := net.ParseCIDR(s.ClusterCIDR) _, clusterCIDR, _ := net.ParseCIDR(s.ClusterCIDR)
_, serviceCIDR, _ := net.ParseCIDR(s.ServiceCIDR) _, serviceCIDR, _ := net.ParseCIDR(s.ServiceCIDR)
nodeController, err := nodecontroller.NewNodeController(cloud, clientset.NewForConfigOrDie(restclient.AddUserAgent(kubeconfig, "node-controller")), nodeController, err := nodecontroller.NewNodeControllerFromClient(cloud, clientset.NewForConfigOrDie(restclient.AddUserAgent(kubeconfig, "node-controller")),
s.PodEvictionTimeout.Duration, s.DeletingPodsQps, s.PodEvictionTimeout.Duration, s.DeletingPodsQps,
s.NodeMonitorGracePeriod.Duration, s.NodeStartupGracePeriod.Duration, s.NodeMonitorPeriod.Duration, clusterCIDR, serviceCIDR, int(s.NodeCIDRMaskSize), s.AllocateNodeCIDRs) s.NodeMonitorGracePeriod.Duration, s.NodeStartupGracePeriod.Duration, s.NodeMonitorPeriod.Duration, clusterCIDR, serviceCIDR, int(s.NodeCIDRMaskSize), s.AllocateNodeCIDRs)
if err != nil { if err != nil {

View File

@ -34,6 +34,7 @@ import (
"k8s.io/kubernetes/pkg/cloudprovider" "k8s.io/kubernetes/pkg/cloudprovider"
"k8s.io/kubernetes/pkg/controller" "k8s.io/kubernetes/pkg/controller"
"k8s.io/kubernetes/pkg/controller/framework" "k8s.io/kubernetes/pkg/controller/framework"
"k8s.io/kubernetes/pkg/controller/framework/informers"
"k8s.io/kubernetes/pkg/fields" "k8s.io/kubernetes/pkg/fields"
"k8s.io/kubernetes/pkg/labels" "k8s.io/kubernetes/pkg/labels"
"k8s.io/kubernetes/pkg/runtime" "k8s.io/kubernetes/pkg/runtime"
@ -124,7 +125,7 @@ type NodeController struct {
maximumGracePeriod time.Duration maximumGracePeriod time.Duration
recorder record.EventRecorder recorder record.EventRecorder
// Pod framework and store // Pod framework and store
podController *framework.Controller podController framework.ControllerInterface
podStore cache.StoreToPodLister podStore cache.StoreToPodLister
// Node framework and store // Node framework and store
nodeController *framework.Controller nodeController *framework.Controller
@ -140,6 +141,13 @@ type NodeController struct {
computeZoneStateFunc func(nodeConditions []*api.NodeCondition) zoneState computeZoneStateFunc func(nodeConditions []*api.NodeCondition) zoneState
zoneStates map[string]zoneState zoneStates map[string]zoneState
// internalPodInformer is used to hold a personal informer. If we're using
// a normal shared informer, then the informer will be started for us. If
// we have a personal informer, we must start it ourselves. If you start
// the controller using NewDaemonSetsController(passing SharedInformer), this
// will be null
internalPodInformer framework.SharedIndexInformer
} }
// NewNodeController returns a new node controller to sync instances from cloudprovider. // NewNodeController returns a new node controller to sync instances from cloudprovider.
@ -147,6 +155,7 @@ type NodeController struct {
// podCIDRs it has already allocated to nodes. Since we don't allow podCIDR changes // podCIDRs it has already allocated to nodes. Since we don't allow podCIDR changes
// currently, this should be handled as a fatal error. // currently, this should be handled as a fatal error.
func NewNodeController( func NewNodeController(
podInformer framework.SharedIndexInformer,
cloud cloudprovider.Interface, cloud cloudprovider.Interface,
kubeClient clientset.Interface, kubeClient clientset.Interface,
podEvictionTimeout time.Duration, podEvictionTimeout time.Duration,
@ -209,30 +218,12 @@ func NewNodeController(
zoneStates: make(map[string]zoneState), zoneStates: make(map[string]zoneState),
} }
nc.podStore.Indexer, nc.podController = framework.NewIndexerInformer( podInformer.AddEventHandler(framework.ResourceEventHandlerFuncs{
&cache.ListWatch{ AddFunc: nc.maybeDeleteTerminatingPod,
ListFunc: func(options api.ListOptions) (runtime.Object, error) { UpdateFunc: func(_, obj interface{}) { nc.maybeDeleteTerminatingPod(obj) },
return nc.kubeClient.Core().Pods(api.NamespaceAll).List(options) })
}, nc.podStore.Indexer = podInformer.GetIndexer()
WatchFunc: func(options api.ListOptions) (watch.Interface, error) { nc.podController = podInformer.GetController()
return nc.kubeClient.Core().Pods(api.NamespaceAll).Watch(options)
},
},
&api.Pod{},
controller.NoResyncPeriodFunc(),
framework.ResourceEventHandlerFuncs{
AddFunc: func(obj interface{}) {
nc.maybeDeleteTerminatingPod(obj)
},
UpdateFunc: func(_, obj interface{}) {
nc.maybeDeleteTerminatingPod(obj)
},
},
// We don't need to build a index for podStore here actually, but build one for consistency.
// It will ensure that if people start making use of the podStore in more specific ways,
// they'll get the benefits they expect. It will also reserve the name for future refactorings.
cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc},
)
nodeEventHandlerFuncs := framework.ResourceEventHandlerFuncs{} nodeEventHandlerFuncs := framework.ResourceEventHandlerFuncs{}
if nc.allocateNodeCIDRs { if nc.allocateNodeCIDRs {
@ -337,11 +328,36 @@ func NewNodeController(
return nc, nil return nc, nil
} }
func NewNodeControllerFromClient(
cloud cloudprovider.Interface,
kubeClient clientset.Interface,
podEvictionTimeout time.Duration,
evictionLimiterQPS float32,
nodeMonitorGracePeriod time.Duration,
nodeStartupGracePeriod time.Duration,
nodeMonitorPeriod time.Duration,
clusterCIDR *net.IPNet,
serviceCIDR *net.IPNet,
nodeCIDRMaskSize int,
allocateNodeCIDRs bool) (*NodeController, error) {
podInformer := informers.CreateSharedPodIndexInformer(kubeClient, controller.NoResyncPeriodFunc())
nc, err := NewNodeController(podInformer, cloud, kubeClient, podEvictionTimeout, evictionLimiterQPS, nodeMonitorGracePeriod,
nodeStartupGracePeriod, nodeMonitorPeriod, clusterCIDR, serviceCIDR, nodeCIDRMaskSize, allocateNodeCIDRs)
if err != nil {
return nil, err
}
nc.internalPodInformer = podInformer
return nc, nil
}
// Run starts an asynchronous loop that monitors the status of cluster nodes. // Run starts an asynchronous loop that monitors the status of cluster nodes.
func (nc *NodeController) Run(period time.Duration) { func (nc *NodeController) Run(period time.Duration) {
go nc.nodeController.Run(wait.NeverStop) go nc.nodeController.Run(wait.NeverStop)
go nc.podController.Run(wait.NeverStop) go nc.podController.Run(wait.NeverStop)
go nc.daemonSetController.Run(wait.NeverStop) go nc.daemonSetController.Run(wait.NeverStop)
if nc.internalPodInformer != nil {
nc.internalPodInformer.Run(wait.NeverStop)
}
// Incorporate the results of node status pushed from kubelet to master. // Incorporate the results of node status pushed from kubelet to master.
go wait.Until(func() { go wait.Until(func() {

View File

@ -600,7 +600,7 @@ func TestMonitorNodeStatusEvictPods(t *testing.T) {
} }
for _, item := range table { for _, item := range table {
nodeController, _ := NewNodeController(nil, item.fakeNodeHandler, nodeController, _ := NewNodeControllerFromClient(nil, item.fakeNodeHandler,
evictionTimeout, testRateLimiterQPS, testNodeMonitorGracePeriod, evictionTimeout, testRateLimiterQPS, testNodeMonitorGracePeriod,
testNodeStartupGracePeriod, testNodeMonitorPeriod, nil, nil, 0, false) testNodeStartupGracePeriod, testNodeMonitorPeriod, nil, nil, 0, false)
nodeController.now = func() unversioned.Time { return fakeNow } nodeController.now = func() unversioned.Time { return fakeNow }
@ -673,7 +673,7 @@ func TestCloudProviderNoRateLimit(t *testing.T) {
Clientset: fake.NewSimpleClientset(&api.PodList{Items: []api.Pod{*newPod("pod0", "node0"), *newPod("pod1", "node0")}}), Clientset: fake.NewSimpleClientset(&api.PodList{Items: []api.Pod{*newPod("pod0", "node0"), *newPod("pod1", "node0")}}),
deleteWaitChan: make(chan struct{}), deleteWaitChan: make(chan struct{}),
} }
nodeController, _ := NewNodeController(nil, fnh, 10*time.Minute, nodeController, _ := NewNodeControllerFromClient(nil, fnh, 10*time.Minute,
testRateLimiterQPS, testRateLimiterQPS,
testNodeMonitorGracePeriod, testNodeStartupGracePeriod, testNodeMonitorGracePeriod, testNodeStartupGracePeriod,
testNodeMonitorPeriod, nil, nil, 0, false) testNodeMonitorPeriod, nil, nil, 0, false)
@ -907,7 +907,7 @@ func TestMonitorNodeStatusUpdateStatus(t *testing.T) {
} }
for i, item := range table { for i, item := range table {
nodeController, _ := NewNodeController(nil, item.fakeNodeHandler, 5*time.Minute, testRateLimiterQPS, nodeController, _ := NewNodeControllerFromClient(nil, item.fakeNodeHandler, 5*time.Minute, testRateLimiterQPS,
testNodeMonitorGracePeriod, testNodeStartupGracePeriod, testNodeMonitorPeriod, nil, nil, 0, false) testNodeMonitorGracePeriod, testNodeStartupGracePeriod, testNodeMonitorPeriod, nil, nil, 0, false)
nodeController.now = func() unversioned.Time { return fakeNow } nodeController.now = func() unversioned.Time { return fakeNow }
if err := nodeController.monitorNodeStatus(); err != nil { if err := nodeController.monitorNodeStatus(); err != nil {
@ -1057,7 +1057,7 @@ func TestMonitorNodeStatusMarkPodsNotReady(t *testing.T) {
} }
for i, item := range table { for i, item := range table {
nodeController, _ := NewNodeController(nil, item.fakeNodeHandler, 5*time.Minute, testRateLimiterQPS, nodeController, _ := NewNodeControllerFromClient(nil, item.fakeNodeHandler, 5*time.Minute, testRateLimiterQPS,
testNodeMonitorGracePeriod, testNodeStartupGracePeriod, testNodeMonitorPeriod, nil, nil, 0, false) testNodeMonitorGracePeriod, testNodeStartupGracePeriod, testNodeMonitorPeriod, nil, nil, 0, false)
nodeController.now = func() unversioned.Time { return fakeNow } nodeController.now = func() unversioned.Time { return fakeNow }
if err := nodeController.monitorNodeStatus(); err != nil { if err := nodeController.monitorNodeStatus(); err != nil {
@ -1139,7 +1139,7 @@ func TestNodeDeletion(t *testing.T) {
Clientset: fake.NewSimpleClientset(&api.PodList{Items: []api.Pod{*newPod("pod0", "node0"), *newPod("pod1", "node1")}}), Clientset: fake.NewSimpleClientset(&api.PodList{Items: []api.Pod{*newPod("pod0", "node0"), *newPod("pod1", "node1")}}),
} }
nodeController, _ := NewNodeController(nil, fakeNodeHandler, 5*time.Minute, testRateLimiterQPS, nodeController, _ := NewNodeControllerFromClient(nil, fakeNodeHandler, 5*time.Minute, testRateLimiterQPS,
testNodeMonitorGracePeriod, testNodeStartupGracePeriod, testNodeMonitorGracePeriod, testNodeStartupGracePeriod,
testNodeMonitorPeriod, nil, nil, 0, false) testNodeMonitorPeriod, nil, nil, 0, false)
nodeController.now = func() unversioned.Time { return fakeNow } nodeController.now = func() unversioned.Time { return fakeNow }
@ -1243,7 +1243,7 @@ func TestCheckPod(t *testing.T) {
}, },
} }
nc, _ := NewNodeController(nil, nil, 0, 0, 0, 0, 0, nil, nil, 0, false) nc, _ := NewNodeControllerFromClient(nil, nil, 0, 0, 0, 0, 0, nil, nil, 0, false)
nc.nodeStore.Store = cache.NewStore(cache.MetaNamespaceKeyFunc) nc.nodeStore.Store = cache.NewStore(cache.MetaNamespaceKeyFunc)
nc.nodeStore.Store.Add(&api.Node{ nc.nodeStore.Store.Add(&api.Node{
ObjectMeta: api.ObjectMeta{ ObjectMeta: api.ObjectMeta{
@ -1310,7 +1310,7 @@ func TestCleanupOrphanedPods(t *testing.T) {
newPod("b", "bar"), newPod("b", "bar"),
newPod("c", "gone"), newPod("c", "gone"),
} }
nc, _ := NewNodeController(nil, nil, 0, 0, 0, 0, 0, nil, nil, 0, false) nc, _ := NewNodeControllerFromClient(nil, nil, 0, 0, 0, 0, 0, nil, nil, 0, false)
nc.nodeStore.Store.Add(newNode("foo")) nc.nodeStore.Store.Add(newNode("foo"))
nc.nodeStore.Store.Add(newNode("bar")) nc.nodeStore.Store.Add(newNode("bar"))