diff --git a/pkg/controller/node/ipam/cidr_allocator.go b/pkg/controller/node/ipam/cidr_allocator.go index d4c44a6c930..e6c7f47869c 100644 --- a/pkg/controller/node/ipam/cidr_allocator.go +++ b/pkg/controller/node/ipam/cidr_allocator.go @@ -31,7 +31,11 @@ type nodeAndCIDR struct { type CIDRAllocatorType string const ( + // RangeAllocatorType is the allocator that uses an internal CIDR + // range allocator to do node CIDR range allocations. RangeAllocatorType CIDRAllocatorType = "RangeAllocator" + // CloudAllocatorType is the allocator that uses cloud platform + // support to do node CIDR range allocations. CloudAllocatorType CIDRAllocatorType = "CloudAllocator" ) diff --git a/pkg/controller/node/ipam/cidrset/cidr_set.go b/pkg/controller/node/ipam/cidrset/cidr_set.go index 158427cacc7..ce08dd8af3f 100644 --- a/pkg/controller/node/ipam/cidrset/cidr_set.go +++ b/pkg/controller/node/ipam/cidrset/cidr_set.go @@ -25,6 +25,8 @@ import ( "sync" ) +// CidrSet manages a set of CIDR ranges from which blocks of IPs can +// be allocated from. type CidrSet struct { sync.Mutex clusterCIDR *net.IPNet @@ -46,10 +48,13 @@ const ( ) var ( + // ErrCIDRRangeNoCIDRsRemaining occurs when there are no more space + // to allocate CIDR ranges. ErrCIDRRangeNoCIDRsRemaining = errors.New( "CIDR allocation failed; there are no remaining CIDRs left to allocate in the accepted range") ) +// NewCIDRSet creates a new CidrSet. func NewCIDRSet(clusterCIDR *net.IPNet, subNetMaskSize int) *CidrSet { clusterMask := clusterCIDR.Mask clusterMaskSize, _ := clusterMask.Size() @@ -97,6 +102,7 @@ func (s *CidrSet) indexToCIDRBlock(index int) *net.IPNet { } } +// AllocateNext allocates the next free CIDR range. func (s *CidrSet) AllocateNext() (*net.IPNet, error) { s.Lock() defer s.Unlock() @@ -166,6 +172,7 @@ func (s *CidrSet) getBeginingAndEndIndices(cidr *net.IPNet) (begin, end int, err return begin, end, nil } +// Release releases the given CIDR range. func (s *CidrSet) Release(cidr *net.IPNet) error { begin, end, err := s.getBeginingAndEndIndices(cidr) if err != nil { @@ -179,6 +186,7 @@ func (s *CidrSet) Release(cidr *net.IPNet) error { return nil } +// Occupy marks the given CIDR range as used. func (s *CidrSet) Occupy(cidr *net.IPNet) (err error) { begin, end, err := s.getBeginingAndEndIndices(cidr) if err != nil { diff --git a/pkg/controller/node/ipam/cidrset/cidr_set_test.go b/pkg/controller/node/ipam/cidrset/cidr_set_test.go index 3efa7f2116a..fbd9886e53d 100644 --- a/pkg/controller/node/ipam/cidrset/cidr_set_test.go +++ b/pkg/controller/node/ipam/cidrset/cidr_set_test.go @@ -219,9 +219,9 @@ func TestCIDRSet_AllocationOccupied(t *testing.T) { // allocate all the CIDRs var cidrs []*net.IPNet - var num_cidrs = 256 + var numCIDRs = 256 - for i := 0; i < num_cidrs; i++ { + for i := 0; i < numCIDRs; i++ { if c, err := a.AllocateNext(); err == nil { cidrs = append(cidrs, c) } else { @@ -239,13 +239,13 @@ func TestCIDRSet_AllocationOccupied(t *testing.T) { a.Release(cidrs[i]) } // occupy the last 128 CIDRs - for i := num_cidrs / 2; i < num_cidrs; i++ { + for i := numCIDRs / 2; i < numCIDRs; i++ { a.Occupy(cidrs[i]) } // allocate the first 128 CIDRs again var rcidrs []*net.IPNet - for i := 0; i < num_cidrs/2; i++ { + for i := 0; i < numCIDRs/2; i++ { if c, err := a.AllocateNext(); err == nil { rcidrs = append(rcidrs, c) } else { @@ -258,7 +258,7 @@ func TestCIDRSet_AllocationOccupied(t *testing.T) { } // check Occupy() work properly - for i := num_cidrs / 2; i < num_cidrs; i++ { + for i := numCIDRs / 2; i < numCIDRs; i++ { rcidrs = append(rcidrs, cidrs[i]) } if !reflect.DeepEqual(cidrs, rcidrs) { diff --git a/pkg/controller/node/ipam/cloud_cidr_allocator.go b/pkg/controller/node/ipam/cloud_cidr_allocator.go index 5ea78f739f5..9dfb9543f83 100644 --- a/pkg/controller/node/ipam/cloud_cidr_allocator.go +++ b/pkg/controller/node/ipam/cloud_cidr_allocator.go @@ -51,6 +51,7 @@ type cloudCIDRAllocator struct { var _ CIDRAllocator = (*cloudCIDRAllocator)(nil) +// NewCloudCIDRAllocator creates a new cloud CIDR allocator. func NewCloudCIDRAllocator( client clientset.Interface, cloud cloudprovider.Interface) (ca CIDRAllocator, err error) { diff --git a/pkg/controller/node/scheduler/rate_limited_queue.go b/pkg/controller/node/scheduler/rate_limited_queue.go index 33848d23054..984443ff52b 100644 --- a/pkg/controller/node/scheduler/rate_limited_queue.go +++ b/pkg/controller/node/scheduler/rate_limited_queue.go @@ -28,11 +28,14 @@ import ( ) const ( - // nodeStatusUpdateRetry controls the number of retries of writing NodeStatus update. + // NodeStatusUpdateRetry controls the number of retries of writing + // NodeStatus update. NodeStatusUpdateRetry = 5 - // controls how often NodeController will try to evict Pods from non-responsive Nodes. + // NodeEvictionPeriod controls how often NodeController will try to + // evict Pods from non-responsive Nodes. NodeEvictionPeriod = 100 * time.Millisecond - // Burst value for all eviction rate limiters + // EvictionRateLimiterBurst is the burst value for all eviction rate + // limiters EvictionRateLimiterBurst = 1 ) @@ -46,19 +49,26 @@ type TimedValue struct { } // now is used to test time -var now func() time.Time = time.Now +var now = time.Now // TimedQueue is a priority heap where the lowest ProcessAt is at the front of the queue type TimedQueue []*TimedValue -func (h TimedQueue) Len() int { return len(h) } -func (h TimedQueue) Less(i, j int) bool { return h[i].ProcessAt.Before(h[j].ProcessAt) } -func (h TimedQueue) Swap(i, j int) { h[i], h[j] = h[j], h[i] } +// Len is the length of the queue. +func (h TimedQueue) Len() int { return len(h) } +// Less returns true if queue[i] < queue[j]. +func (h TimedQueue) Less(i, j int) bool { return h[i].ProcessAt.Before(h[j].ProcessAt) } + +// Swap swaps index i and j. +func (h TimedQueue) Swap(i, j int) { h[i], h[j] = h[j], h[i] } + +// Push a new TimedValue on to the queue. func (h *TimedQueue) Push(x interface{}) { *h = append(*h, x.(*TimedValue)) } +// Pop the lowest ProcessAt item. func (h *TimedQueue) Pop() interface{} { old := *h n := len(old) @@ -67,16 +77,17 @@ func (h *TimedQueue) Pop() interface{} { return x } -// A FIFO queue which additionally guarantees that any element can be added only once until -// it is removed. +// UniqueQueue is a FIFO queue which additionally guarantees that any +// element can be added only once until it is removed. type UniqueQueue struct { lock sync.Mutex queue TimedQueue set sets.String } -// Adds a new value to the queue if it wasn't added before, or was explicitly removed by the -// Remove call. Returns true if new value was added. +// Add a new value to the queue if it wasn't added before, or was +// explicitly removed by the Remove call. Returns true if new value +// was added. func (q *UniqueQueue) Add(value TimedValue) bool { q.lock.Lock() defer q.lock.Unlock() @@ -89,8 +100,9 @@ func (q *UniqueQueue) Add(value TimedValue) bool { return true } -// Replace replaces an existing value in the queue if it already exists, otherwise it does nothing. -// Returns true if the item was found. +// Replace replaces an existing value in the queue if it already +// exists, otherwise it does nothing. Returns true if the item was +// found. func (q *UniqueQueue) Replace(value TimedValue) bool { q.lock.Lock() defer q.lock.Unlock() @@ -106,8 +118,9 @@ func (q *UniqueQueue) Replace(value TimedValue) bool { return false } -// Removes the value from the queue, but keeps it in the set, so it won't be added second time. -// Returns true if something was removed. +// RemoveFromQueue the value from the queue, but keeps it in the set, +// so it won't be added second time. Returns true if something was +// removed. func (q *UniqueQueue) RemoveFromQueue(value string) bool { q.lock.Lock() defer q.lock.Unlock() @@ -124,8 +137,9 @@ func (q *UniqueQueue) RemoveFromQueue(value string) bool { return false } -// Removes the value from the queue, so Get() call won't return it, and allow subsequent addition -// of the given value. If the value is not present does nothing and returns false. +// Remove the value from the queue, so Get() call won't return it, and +// allow subsequent addition of the given value. If the value is not +// present does nothing and returns false. func (q *UniqueQueue) Remove(value string) bool { q.lock.Lock() defer q.lock.Unlock() @@ -143,7 +157,7 @@ func (q *UniqueQueue) Remove(value string) bool { return true } -// Returns the oldest added value that wasn't returned yet. +// Get returns the oldest added value that wasn't returned yet. func (q *UniqueQueue) Get() (TimedValue, bool) { q.lock.Lock() defer q.lock.Unlock() @@ -155,7 +169,8 @@ func (q *UniqueQueue) Get() (TimedValue, bool) { return *result, true } -// Head returns the oldest added value that wasn't returned yet without removing it. +// Head returns the oldest added value that wasn't returned yet +// without removing it. func (q *UniqueQueue) Head() (TimedValue, bool) { q.lock.Lock() defer q.lock.Unlock() @@ -166,7 +181,8 @@ func (q *UniqueQueue) Head() (TimedValue, bool) { return *result, true } -// Clear removes all items from the queue and duplication preventing set. +// Clear removes all items from the queue and duplication preventing +// set. func (q *UniqueQueue) Clear() { q.lock.Lock() defer q.lock.Unlock() @@ -178,15 +194,16 @@ func (q *UniqueQueue) Clear() { } } -// RateLimitedTimedQueue is a unique item priority queue ordered by the expected next time -// of execution. It is also rate limited. +// RateLimitedTimedQueue is a unique item priority queue ordered by +// the expected next time of execution. It is also rate limited. type RateLimitedTimedQueue struct { queue UniqueQueue limiterLock sync.Mutex limiter flowcontrol.RateLimiter } -// Creates new queue which will use given RateLimiter to oversee execution. +// NewRateLimitedTimedQueue creates new queue which will use given +// RateLimiter to oversee execution. func NewRateLimitedTimedQueue(limiter flowcontrol.RateLimiter) *RateLimitedTimedQueue { return &RateLimitedTimedQueue{ queue: UniqueQueue{ @@ -197,18 +214,21 @@ func NewRateLimitedTimedQueue(limiter flowcontrol.RateLimiter) *RateLimitedTimed } } -// ActionFunc takes a timed value and returns false if the item must be retried, with an optional -// time.Duration if some minimum wait interval should be used. +// ActionFunc takes a timed value and returns false if the item must +// be retried, with an optional time.Duration if some minimum wait +// interval should be used. type ActionFunc func(TimedValue) (bool, time.Duration) -// Try processes the queue. Ends prematurely if RateLimiter forbids an action and leak is true. -// Otherwise, requeues the item to be processed. Each value is processed once if fn returns true, -// otherwise it is added back to the queue. The returned remaining is used to identify the minimum -// time to execute the next item in the queue. The same value is processed only once unless -// Remove is explicitly called on it (it's done by the cancelPodEviction function in NodeController -// when Node becomes Ready again) -// TODO: figure out a good way to do garbage collection for all Nodes that were removed from -// the cluster. +// Try processes the queue.Ends prematurely if RateLimiter forbids an +// action and leak is true. Otherwise, requeues the item to be +// processed. Each value is processed once if fn returns true, +// otherwise it is added back to the queue. The returned remaining is +// used to identify the minimum time to execute the next item in the +// queue. The same value is processed only once unless Remove is +// explicitly called on it (it's done by the cancelPodEviction +// function in NodeController when Node becomes Ready again) TODO: +// figure out a good way to do garbage collection for all Nodes that +// were removed from the cluster. func (q *RateLimitedTimedQueue) Try(fn ActionFunc) { val, ok := q.queue.Head() q.limiterLock.Lock() @@ -236,8 +256,9 @@ func (q *RateLimitedTimedQueue) Try(fn ActionFunc) { } } -// Adds value to the queue to be processed. Won't add the same value(comparsion by value) a second time -// if it was already added and not removed. +// Add value to the queue to be processed. Won't add the same +// value(comparsion by value) a second time if it was already added +// and not removed. func (q *RateLimitedTimedQueue) Add(value string, uid interface{}) bool { now := now() return q.queue.Add(TimedValue{ @@ -248,17 +269,19 @@ func (q *RateLimitedTimedQueue) Add(value string, uid interface{}) bool { }) } -// Removes Node from the Evictor. The Node won't be processed until added again. +// Remove Node from the Evictor. The Node won't be processed until +// added again. func (q *RateLimitedTimedQueue) Remove(value string) bool { return q.queue.Remove(value) } -// Removes all items from the queue +// Clear removes all items from the queue func (q *RateLimitedTimedQueue) Clear() { q.queue.Clear() } -// SwapLimiter safely swaps current limiter for this queue with the passed one if capacities or qps's differ. +// SwapLimiter safely swaps current limiter for this queue with the +// passed one if capacities or qps's differ. func (q *RateLimitedTimedQueue) SwapLimiter(newQPS float32) { q.limiterLock.Lock() defer q.limiterLock.Unlock() diff --git a/pkg/controller/node/scheduler/taint_controller.go b/pkg/controller/node/scheduler/taint_controller.go index e51f3d3609c..9892defc5fe 100644 --- a/pkg/controller/node/scheduler/taint_controller.go +++ b/pkg/controller/node/scheduler/taint_controller.go @@ -325,9 +325,8 @@ func (tc *NoExecuteTaintManager) processPodOnNode( startTime = scheduledEviction.CreatedAt if startTime.Add(minTolerationTime).Before(triggerTime) { return - } else { - tc.cancelWorkWithEvent(podNamespacedName) } + tc.cancelWorkWithEvent(podNamespacedName) } tc.taintEvictionQueue.AddWork(NewWorkArgs(podNamespacedName.Name, podNamespacedName.Namespace), startTime, triggerTime) } diff --git a/pkg/controller/node/util/controller_utils.go b/pkg/controller/node/util/controller_utils.go index 958b343ef89..8365a0686a9 100644 --- a/pkg/controller/node/util/controller_utils.go +++ b/pkg/controller/node/util/controller_utils.go @@ -46,15 +46,18 @@ import ( ) var ( + // ErrCloudInstance occurs when the cloud provider does not support + // the Instances API. ErrCloudInstance = errors.New("cloud provider doesn't support instances") - - // The minimum kubelet version for which the nodecontroller - // can safely flip pod.Status to NotReady. + // podStatusReconciliationVersion is the the minimum kubelet version + // for which the nodecontroller can safely flip pod.Status to + // NotReady. podStatusReconciliationVersion = utilversion.MustParseSemantic("v1.2.0") ) -// DeletePods will delete all pods from master running on given node, and return true -// if any pods were deleted, or were found pending deletion. +// DeletePods will delete all pods from master running on given node, +// and return true if any pods were deleted, or were found pending +// deletion. func DeletePods(kubeClient clientset.Interface, recorder record.EventRecorder, nodeName, nodeUID string, daemonStore extensionslisters.DaemonSetLister) (bool, error) { remaining := false selector := fields.OneTermEqualSelector(api.PodHostField, nodeName).String() @@ -109,8 +112,9 @@ func DeletePods(kubeClient clientset.Interface, recorder record.EventRecorder, n return remaining, nil } -// SetPodTerminationReason attempts to set a reason and message in the pod status, updates it in the apiserver, -// and returns an error if it encounters one. +// SetPodTerminationReason attempts to set a reason and message in the +// pod status, updates it in the apiserver, and returns an error if it +// encounters one. func SetPodTerminationReason(kubeClient clientset.Interface, pod *v1.Pod, nodeName string) (*v1.Pod, error) { if pod.Status.Reason == nodepkg.NodeUnreachablePodReason { return pod, nil @@ -127,6 +131,7 @@ func SetPodTerminationReason(kubeClient clientset.Interface, pod *v1.Pod, nodeNa return updatedPod, nil } +// ForcefullyDeletePod deletes the pod immediately. func ForcefullyDeletePod(c clientset.Interface, pod *v1.Pod) error { var zero int64 glog.Infof("NodeController is force deleting Pod: %v:%v", pod.Namespace, pod.Name) @@ -137,8 +142,8 @@ func ForcefullyDeletePod(c clientset.Interface, pod *v1.Pod) error { return err } -// ForcefullyDeleteNode immediately the node. The pods on the node are cleaned -// up by the podGC. +// ForcefullyDeleteNode deletes the node immediately. The pods on the +// node are cleaned up by the podGC. func ForcefullyDeleteNode(kubeClient clientset.Interface, nodeName string) error { if err := kubeClient.Core().Nodes().Delete(nodeName, nil); err != nil { return fmt.Errorf("unable to delete node %q: %v", nodeName, err) @@ -146,8 +151,8 @@ func ForcefullyDeleteNode(kubeClient clientset.Interface, nodeName string) error return nil } -// update ready status of all pods running on given node from master -// return true if success +// MarkAllPodsNotReady updates ready status of all pods running on +// given node from master return true if success func MarkAllPodsNotReady(kubeClient clientset.Interface, node *v1.Node) error { // Don't set pods to NotReady if the kubelet is running a version that // doesn't understand how to correct readiness. @@ -207,6 +212,8 @@ func NodeRunningOutdatedKubelet(node *v1.Node) bool { return false } +// NodeExistsInCloudProvider returns true if the node exists in the +// cloud provider. func NodeExistsInCloudProvider(cloud cloudprovider.Interface, nodeName types.NodeName) (bool, error) { instances, ok := cloud.Instances() if !ok { @@ -221,6 +228,7 @@ func NodeExistsInCloudProvider(cloud cloudprovider.Interface, nodeName types.Nod return true, nil } +// RecordNodeEvent records a event related to a node. func RecordNodeEvent(recorder record.EventRecorder, nodeName, nodeUID, eventtype, reason, event string) { ref := &v1.ObjectReference{ Kind: "Node", @@ -232,20 +240,22 @@ func RecordNodeEvent(recorder record.EventRecorder, nodeName, nodeUID, eventtype recorder.Eventf(ref, eventtype, reason, "Node %s event: %s", nodeName, event) } -func RecordNodeStatusChange(recorder record.EventRecorder, node *v1.Node, new_status string) { +// RecordNodeStatusChange records a event related to a node status change. +func RecordNodeStatusChange(recorder record.EventRecorder, node *v1.Node, newStatus string) { ref := &v1.ObjectReference{ Kind: "Node", Name: node.Name, UID: node.UID, Namespace: "", } - glog.V(2).Infof("Recording status change %s event message for node %s", new_status, node.Name) + glog.V(2).Infof("Recording status change %s event message for node %s", newStatus, node.Name) // TODO: This requires a transaction, either both node status is updated // and event is recorded or neither should happen, see issue #6055. - recorder.Eventf(ref, v1.EventTypeNormal, new_status, "Node %s status is now: %s", node.Name, new_status) + recorder.Eventf(ref, v1.EventTypeNormal, newStatus, "Node %s status is now: %s", node.Name, newStatus) } -// Returns true in case of success and false otherwise +// SwapNodeControllerTaint returns true in case of success and false +// otherwise. func SwapNodeControllerTaint(kubeClient clientset.Interface, taintToAdd, taintToRemove *v1.Taint, node *v1.Node) bool { taintToAdd.TimeAdded = metav1.Now() err := controller.AddOrUpdateTaintOnNode(kubeClient, node.Name, taintToAdd) @@ -274,6 +284,7 @@ func SwapNodeControllerTaint(kubeClient clientset.Interface, taintToAdd, taintTo return true } +// CreateAddNodeHandler creates an add node handler. func CreateAddNodeHandler(f func(node *v1.Node) error) func(obj interface{}) { return func(originalObj interface{}) { obj, err := scheme.Scheme.DeepCopy(originalObj) @@ -289,6 +300,7 @@ func CreateAddNodeHandler(f func(node *v1.Node) error) func(obj interface{}) { } } +// CreateUpdateNodeHandler creates a node update handler. func CreateUpdateNodeHandler(f func(oldNode, newNode *v1.Node) error) func(oldObj, newObj interface{}) { return func(origOldObj, origNewObj interface{}) { oldObj, err := scheme.Scheme.DeepCopy(origOldObj) @@ -310,6 +322,7 @@ func CreateUpdateNodeHandler(f func(oldNode, newNode *v1.Node) error) func(oldOb } } +// CreateDeleteNodeHandler creates a delete node handler. func CreateDeleteNodeHandler(f func(node *v1.Node) error) func(obj interface{}) { return func(originalObj interface{}) { obj, err := scheme.Scheme.DeepCopy(originalObj)