switch node controller to shared informers

This commit is contained in:
deads2k 2016-09-23 12:01:58 -04:00
parent 24031f50d6
commit 0961784a9b
5 changed files with 74 additions and 89 deletions

View File

@ -240,7 +240,9 @@ func StartControllers(s *options.CMServer, kubeconfig *restclient.Config, stop <
if err != nil { if err != nil {
glog.Warningf("Unsuccessful parsing of service CIDR %v: %v", s.ServiceCIDR, err) glog.Warningf("Unsuccessful parsing of service CIDR %v: %v", s.ServiceCIDR, err)
} }
nodeController, err := nodecontroller.NewNodeController(sharedInformers.Pods().Informer(), cloud, client("node-controller"), nodeController, err := nodecontroller.NewNodeController(
sharedInformers.Pods(), sharedInformers.Nodes(), sharedInformers.DaemonSets(),
cloud, client("node-controller"),
s.PodEvictionTimeout.Duration, s.NodeEvictionRate, s.SecondaryNodeEvictionRate, s.LargeClusterSizeThreshold, s.UnhealthyZoneThreshold, s.NodeMonitorGracePeriod.Duration, s.PodEvictionTimeout.Duration, s.NodeEvictionRate, s.SecondaryNodeEvictionRate, s.LargeClusterSizeThreshold, s.UnhealthyZoneThreshold, s.NodeMonitorGracePeriod.Duration,
s.NodeStartupGracePeriod.Duration, s.NodeMonitorPeriod.Duration, clusterCIDR, serviceCIDR, s.NodeStartupGracePeriod.Duration, s.NodeMonitorPeriod.Duration, clusterCIDR, serviceCIDR,
int(s.NodeCIDRMaskSize), s.AllocateNodeCIDRs) int(s.NodeCIDRMaskSize), s.AllocateNodeCIDRs)

View File

@ -26,17 +26,14 @@ import (
"github.com/golang/glog" "github.com/golang/glog"
"k8s.io/kubernetes/pkg/api" "k8s.io/kubernetes/pkg/api"
"k8s.io/kubernetes/pkg/api/unversioned" "k8s.io/kubernetes/pkg/api/unversioned"
"k8s.io/kubernetes/pkg/apis/extensions"
"k8s.io/kubernetes/pkg/client/cache" "k8s.io/kubernetes/pkg/client/cache"
clientset "k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset" clientset "k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset"
unversionedcore "k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset/typed/core/unversioned" unversionedcore "k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset/typed/core/unversioned"
"k8s.io/kubernetes/pkg/client/record" "k8s.io/kubernetes/pkg/client/record"
"k8s.io/kubernetes/pkg/cloudprovider" "k8s.io/kubernetes/pkg/cloudprovider"
"k8s.io/kubernetes/pkg/controller"
"k8s.io/kubernetes/pkg/controller/informers" "k8s.io/kubernetes/pkg/controller/informers"
"k8s.io/kubernetes/pkg/fields" "k8s.io/kubernetes/pkg/fields"
"k8s.io/kubernetes/pkg/labels" "k8s.io/kubernetes/pkg/labels"
"k8s.io/kubernetes/pkg/runtime"
"k8s.io/kubernetes/pkg/types" "k8s.io/kubernetes/pkg/types"
"k8s.io/kubernetes/pkg/util/flowcontrol" "k8s.io/kubernetes/pkg/util/flowcontrol"
"k8s.io/kubernetes/pkg/util/metrics" "k8s.io/kubernetes/pkg/util/metrics"
@ -45,7 +42,6 @@ import (
"k8s.io/kubernetes/pkg/util/system" "k8s.io/kubernetes/pkg/util/system"
"k8s.io/kubernetes/pkg/util/wait" "k8s.io/kubernetes/pkg/util/wait"
"k8s.io/kubernetes/pkg/version" "k8s.io/kubernetes/pkg/version"
"k8s.io/kubernetes/pkg/watch"
) )
func init() { func init() {
@ -135,15 +131,9 @@ type NodeController struct {
// The maximum duration before a pod evicted from a node can be forcefully terminated. // The maximum duration before a pod evicted from a node can be forcefully terminated.
maximumGracePeriod time.Duration maximumGracePeriod time.Duration
recorder record.EventRecorder recorder record.EventRecorder
// Pod framework and store podStore cache.StoreToPodLister
podController cache.ControllerInterface nodeStore cache.StoreToNodeLister
podStore cache.StoreToPodLister daemonSetStore cache.StoreToDaemonSetLister
// Node framework and store
nodeController *cache.Controller
nodeStore cache.StoreToNodeLister
// DaemonSet framework and store
daemonSetController *cache.Controller
daemonSetStore cache.StoreToDaemonSetLister
// allocate/recycle CIDRs for node if allocateNodeCIDRs == true // allocate/recycle CIDRs for node if allocateNodeCIDRs == true
cidrAllocator CIDRAllocator cidrAllocator CIDRAllocator
@ -172,7 +162,9 @@ type NodeController struct {
// podCIDRs it has already allocated to nodes. Since we don't allow podCIDR changes // podCIDRs it has already allocated to nodes. Since we don't allow podCIDR changes
// currently, this should be handled as a fatal error. // currently, this should be handled as a fatal error.
func NewNodeController( func NewNodeController(
podInformer cache.SharedIndexInformer, podInformer informers.PodInformer,
nodeInformer informers.NodeInformer,
daemonSetInformer informers.DaemonSetInformer,
cloud cloudprovider.Interface, cloud cloudprovider.Interface,
kubeClient clientset.Interface, kubeClient clientset.Interface,
podEvictionTimeout time.Duration, podEvictionTimeout time.Duration,
@ -241,21 +233,25 @@ func NewNodeController(
nc.enterFullDisruptionFunc = nc.HealthyQPSFunc nc.enterFullDisruptionFunc = nc.HealthyQPSFunc
nc.computeZoneStateFunc = nc.ComputeZoneState nc.computeZoneStateFunc = nc.ComputeZoneState
podInformer.AddEventHandler(cache.ResourceEventHandlerFuncs{ podInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: nc.maybeDeleteTerminatingPod, AddFunc: nc.maybeDeleteTerminatingPod,
UpdateFunc: func(_, obj interface{}) { nc.maybeDeleteTerminatingPod(obj) }, UpdateFunc: func(_, obj interface{}) { nc.maybeDeleteTerminatingPod(obj) },
}) })
nc.podStore.Indexer = podInformer.GetIndexer() nc.podStore = *podInformer.Lister()
nc.podController = podInformer.GetController()
nodeEventHandlerFuncs := cache.ResourceEventHandlerFuncs{} nodeEventHandlerFuncs := cache.ResourceEventHandlerFuncs{}
if nc.allocateNodeCIDRs { if nc.allocateNodeCIDRs {
nodeEventHandlerFuncs = cache.ResourceEventHandlerFuncs{ nodeEventHandlerFuncs = cache.ResourceEventHandlerFuncs{
AddFunc: func(obj interface{}) { AddFunc: func(originalObj interface{}) {
node := obj.(*api.Node) obj, err := api.Scheme.DeepCopy(originalObj)
err := nc.cidrAllocator.AllocateOrOccupyCIDR(node)
if err != nil { if err != nil {
glog.Errorf("Error allocating CIDR: %v", err) utilruntime.HandleError(err)
return
}
node := obj.(*api.Node)
if err := nc.cidrAllocator.AllocateOrOccupyCIDR(node); err != nil {
utilruntime.HandleError(fmt.Errorf("Error allocating CIDR: %v", err))
} }
}, },
UpdateFunc: func(_, obj interface{}) { UpdateFunc: func(_, obj interface{}) {
@ -280,49 +276,36 @@ func NewNodeController(
// state is correct. // state is correct.
// Restart of NC fixes the issue. // Restart of NC fixes the issue.
if node.Spec.PodCIDR == "" { if node.Spec.PodCIDR == "" {
err := nc.cidrAllocator.AllocateOrOccupyCIDR(node) nodeCopy, err := api.Scheme.Copy(node)
if err != nil { if err != nil {
glog.Errorf("Error allocating CIDR: %v", err) utilruntime.HandleError(err)
return
}
if err := nc.cidrAllocator.AllocateOrOccupyCIDR(nodeCopy.(*api.Node)); err != nil {
utilruntime.HandleError(fmt.Errorf("Error allocating CIDR: %v", err))
} }
} }
}, },
DeleteFunc: func(obj interface{}) { DeleteFunc: func(originalObj interface{}) {
node := obj.(*api.Node) obj, err := api.Scheme.DeepCopy(originalObj)
err := nc.cidrAllocator.ReleaseCIDR(node)
if err != nil { if err != nil {
utilruntime.HandleError(err)
return
}
node := obj.(*api.Node)
if err := nc.cidrAllocator.ReleaseCIDR(node); err != nil {
glog.Errorf("Error releasing CIDR: %v", err) glog.Errorf("Error releasing CIDR: %v", err)
} }
}, },
} }
} }
nc.nodeStore.Store, nc.nodeController = cache.NewInformer( nodeInformer.Informer().AddEventHandler(nodeEventHandlerFuncs)
&cache.ListWatch{ nc.nodeStore = *nodeInformer.Lister()
ListFunc: func(options api.ListOptions) (runtime.Object, error) {
return nc.kubeClient.Core().Nodes().List(options)
},
WatchFunc: func(options api.ListOptions) (watch.Interface, error) {
return nc.kubeClient.Core().Nodes().Watch(options)
},
},
&api.Node{},
controller.NoResyncPeriodFunc(),
nodeEventHandlerFuncs,
)
nc.daemonSetStore.Store, nc.daemonSetController = cache.NewInformer( nc.daemonSetStore = *daemonSetInformer.Lister()
&cache.ListWatch{
ListFunc: func(options api.ListOptions) (runtime.Object, error) {
return nc.kubeClient.Extensions().DaemonSets(api.NamespaceAll).List(options)
},
WatchFunc: func(options api.ListOptions) (watch.Interface, error) {
return nc.kubeClient.Extensions().DaemonSets(api.NamespaceAll).Watch(options)
},
},
&extensions.DaemonSet{},
controller.NoResyncPeriodFunc(),
cache.ResourceEventHandlerFuncs{},
)
if allocateNodeCIDRs { if allocateNodeCIDRs {
var nodeList *api.NodeList var nodeList *api.NodeList
@ -351,41 +334,8 @@ func NewNodeController(
return nc, nil return nc, nil
} }
func NewNodeControllerFromClient(
cloud cloudprovider.Interface,
kubeClient clientset.Interface,
podEvictionTimeout time.Duration,
evictionLimiterQPS float32,
secondaryEvictionLimiterQPS float32,
largeClusterThreshold int32,
unhealthyZoneThreshold float32,
nodeMonitorGracePeriod time.Duration,
nodeStartupGracePeriod time.Duration,
nodeMonitorPeriod time.Duration,
clusterCIDR *net.IPNet,
serviceCIDR *net.IPNet,
nodeCIDRMaskSize int,
allocateNodeCIDRs bool) (*NodeController, error) {
podInformer := informers.NewPodInformer(kubeClient, controller.NoResyncPeriodFunc())
nc, err := NewNodeController(podInformer, cloud, kubeClient, podEvictionTimeout, evictionLimiterQPS, secondaryEvictionLimiterQPS,
largeClusterThreshold, unhealthyZoneThreshold, nodeMonitorGracePeriod, nodeStartupGracePeriod, nodeMonitorPeriod, clusterCIDR,
serviceCIDR, nodeCIDRMaskSize, allocateNodeCIDRs)
if err != nil {
return nil, err
}
nc.internalPodInformer = podInformer
return nc, nil
}
// Run starts an asynchronous loop that monitors the status of cluster nodes. // Run starts an asynchronous loop that monitors the status of cluster nodes.
func (nc *NodeController) Run() { func (nc *NodeController) Run() {
go nc.nodeController.Run(wait.NeverStop)
go nc.podController.Run(wait.NeverStop)
go nc.daemonSetController.Run(wait.NeverStop)
if nc.internalPodInformer != nil {
go nc.internalPodInformer.Run(wait.NeverStop)
}
// Incorporate the results of node status pushed from kubelet to master. // Incorporate the results of node status pushed from kubelet to master.
go wait.Until(func() { go wait.Until(func() {
if err := nc.monitorNodeStatus(); err != nil { if err := nc.monitorNodeStatus(); err != nil {

View File

@ -17,6 +17,7 @@ limitations under the License.
package node package node
import ( import (
"net"
"strings" "strings"
"testing" "testing"
"time" "time"
@ -26,8 +27,12 @@ import (
"k8s.io/kubernetes/pkg/api/unversioned" "k8s.io/kubernetes/pkg/api/unversioned"
"k8s.io/kubernetes/pkg/apis/extensions" "k8s.io/kubernetes/pkg/apis/extensions"
"k8s.io/kubernetes/pkg/client/cache" "k8s.io/kubernetes/pkg/client/cache"
clientset "k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset"
"k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset/fake" "k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset/fake"
"k8s.io/kubernetes/pkg/cloudprovider"
fakecloud "k8s.io/kubernetes/pkg/cloudprovider/providers/fake" fakecloud "k8s.io/kubernetes/pkg/cloudprovider/providers/fake"
"k8s.io/kubernetes/pkg/controller"
"k8s.io/kubernetes/pkg/controller/informers"
"k8s.io/kubernetes/pkg/types" "k8s.io/kubernetes/pkg/types"
"k8s.io/kubernetes/pkg/util/diff" "k8s.io/kubernetes/pkg/util/diff"
"k8s.io/kubernetes/pkg/util/wait" "k8s.io/kubernetes/pkg/util/wait"
@ -42,6 +47,34 @@ const (
testUnhealtyThreshold = float32(0.55) testUnhealtyThreshold = float32(0.55)
) )
func NewNodeControllerFromClient(
cloud cloudprovider.Interface,
kubeClient clientset.Interface,
podEvictionTimeout time.Duration,
evictionLimiterQPS float32,
secondaryEvictionLimiterQPS float32,
largeClusterThreshold int32,
unhealthyZoneThreshold float32,
nodeMonitorGracePeriod time.Duration,
nodeStartupGracePeriod time.Duration,
nodeMonitorPeriod time.Duration,
clusterCIDR *net.IPNet,
serviceCIDR *net.IPNet,
nodeCIDRMaskSize int,
allocateNodeCIDRs bool) (*NodeController, error) {
factory := informers.NewSharedInformerFactory(kubeClient, controller.NoResyncPeriodFunc())
nc, err := NewNodeController(factory.Pods(), factory.Nodes(), factory.DaemonSets(), cloud, kubeClient, podEvictionTimeout, evictionLimiterQPS, secondaryEvictionLimiterQPS,
largeClusterThreshold, unhealthyZoneThreshold, nodeMonitorGracePeriod, nodeStartupGracePeriod, nodeMonitorPeriod, clusterCIDR,
serviceCIDR, nodeCIDRMaskSize, allocateNodeCIDRs)
if err != nil {
return nil, err
}
return nc, nil
}
func TestMonitorNodeStatusEvictPods(t *testing.T) { func TestMonitorNodeStatusEvictPods(t *testing.T) {
fakeNow := unversioned.Date(2015, 1, 1, 12, 0, 0, 0, time.UTC) fakeNow := unversioned.Date(2015, 1, 1, 12, 0, 0, 0, time.UTC)
evictionTimeout := 10 * time.Minute evictionTimeout := 10 * time.Minute
@ -1701,7 +1734,7 @@ func TestCheckPod(t *testing.T) {
}, },
} }
nc, _ := NewNodeControllerFromClient(nil, nil, 0, 0, 0, 0, 0, 0, 0, 0, nil, nil, 0, false) nc, _ := NewNodeControllerFromClient(nil, fake.NewSimpleClientset(), 0, 0, 0, 0, 0, 0, 0, 0, nil, nil, 0, false)
nc.nodeStore.Store = cache.NewStore(cache.MetaNamespaceKeyFunc) nc.nodeStore.Store = cache.NewStore(cache.MetaNamespaceKeyFunc)
nc.nodeStore.Store.Add(&api.Node{ nc.nodeStore.Store.Add(&api.Node{
ObjectMeta: api.ObjectMeta{ ObjectMeta: api.ObjectMeta{

View File

@ -202,7 +202,7 @@ func (m *FakeNodeHandler) PatchStatus(nodeName string, data []byte) (*api.Node,
} }
func (m *FakeNodeHandler) Watch(opts api.ListOptions) (watch.Interface, error) { func (m *FakeNodeHandler) Watch(opts api.ListOptions) (watch.Interface, error) {
return nil, nil return watch.NewFake(), nil
} }
func (m *FakeNodeHandler) Patch(name string, pt api.PatchType, data []byte, subresources ...string) (*api.Node, error) { func (m *FakeNodeHandler) Patch(name string, pt api.PatchType, data []byte, subresources ...string) (*api.Node, error) {

View File

@ -9148,7 +9148,7 @@ var OpenAPIDefinitions *common.OpenAPIDefinitions = &common.OpenAPIDefinitions{
"v1.Node": { "v1.Node": {
Schema: spec.Schema{ Schema: spec.Schema{
SchemaProps: spec.SchemaProps{ SchemaProps: spec.SchemaProps{
Description: "Node is a worker node in Kubernetes, formerly known as minion. Each node will have a unique identifier in the cache (i.e. in etcd).", Description: "Node is a worker node in Kubernetes. Each node will have a unique identifier in the cache (i.e. in etcd).",
Properties: map[string]spec.Schema{ Properties: map[string]spec.Schema{
"metadata": { "metadata": {
SchemaProps: spec.SchemaProps{ SchemaProps: spec.SchemaProps{