golint fixes

This commit is contained in:
Bowei Du 2017-08-08 17:04:50 -07:00
parent 27854fa0d8
commit 61c43f6468
7 changed files with 108 additions and 60 deletions

View File

@ -31,7 +31,11 @@ type nodeAndCIDR struct {
type CIDRAllocatorType string
const (
// RangeAllocatorType is the allocator that uses an internal CIDR
// range allocator to do node CIDR range allocations.
RangeAllocatorType CIDRAllocatorType = "RangeAllocator"
// CloudAllocatorType is the allocator that uses cloud platform
// support to do node CIDR range allocations.
CloudAllocatorType CIDRAllocatorType = "CloudAllocator"
)

View File

@ -25,6 +25,8 @@ import (
"sync"
)
// CidrSet manages a set of CIDR ranges from which blocks of IPs can
// be allocated from.
type CidrSet struct {
sync.Mutex
clusterCIDR *net.IPNet
@ -46,10 +48,13 @@ const (
)
var (
// ErrCIDRRangeNoCIDRsRemaining occurs when there are no more space
// to allocate CIDR ranges.
ErrCIDRRangeNoCIDRsRemaining = errors.New(
"CIDR allocation failed; there are no remaining CIDRs left to allocate in the accepted range")
)
// NewCIDRSet creates a new CidrSet.
func NewCIDRSet(clusterCIDR *net.IPNet, subNetMaskSize int) *CidrSet {
clusterMask := clusterCIDR.Mask
clusterMaskSize, _ := clusterMask.Size()
@ -97,6 +102,7 @@ func (s *CidrSet) indexToCIDRBlock(index int) *net.IPNet {
}
}
// AllocateNext allocates the next free CIDR range.
func (s *CidrSet) AllocateNext() (*net.IPNet, error) {
s.Lock()
defer s.Unlock()
@ -166,6 +172,7 @@ func (s *CidrSet) getBeginingAndEndIndices(cidr *net.IPNet) (begin, end int, err
return begin, end, nil
}
// Release releases the given CIDR range.
func (s *CidrSet) Release(cidr *net.IPNet) error {
begin, end, err := s.getBeginingAndEndIndices(cidr)
if err != nil {
@ -179,6 +186,7 @@ func (s *CidrSet) Release(cidr *net.IPNet) error {
return nil
}
// Occupy marks the given CIDR range as used.
func (s *CidrSet) Occupy(cidr *net.IPNet) (err error) {
begin, end, err := s.getBeginingAndEndIndices(cidr)
if err != nil {

View File

@ -219,9 +219,9 @@ func TestCIDRSet_AllocationOccupied(t *testing.T) {
// allocate all the CIDRs
var cidrs []*net.IPNet
var num_cidrs = 256
var numCIDRs = 256
for i := 0; i < num_cidrs; i++ {
for i := 0; i < numCIDRs; i++ {
if c, err := a.AllocateNext(); err == nil {
cidrs = append(cidrs, c)
} else {
@ -239,13 +239,13 @@ func TestCIDRSet_AllocationOccupied(t *testing.T) {
a.Release(cidrs[i])
}
// occupy the last 128 CIDRs
for i := num_cidrs / 2; i < num_cidrs; i++ {
for i := numCIDRs / 2; i < numCIDRs; i++ {
a.Occupy(cidrs[i])
}
// allocate the first 128 CIDRs again
var rcidrs []*net.IPNet
for i := 0; i < num_cidrs/2; i++ {
for i := 0; i < numCIDRs/2; i++ {
if c, err := a.AllocateNext(); err == nil {
rcidrs = append(rcidrs, c)
} else {
@ -258,7 +258,7 @@ func TestCIDRSet_AllocationOccupied(t *testing.T) {
}
// check Occupy() work properly
for i := num_cidrs / 2; i < num_cidrs; i++ {
for i := numCIDRs / 2; i < numCIDRs; i++ {
rcidrs = append(rcidrs, cidrs[i])
}
if !reflect.DeepEqual(cidrs, rcidrs) {

View File

@ -51,6 +51,7 @@ type cloudCIDRAllocator struct {
var _ CIDRAllocator = (*cloudCIDRAllocator)(nil)
// NewCloudCIDRAllocator creates a new cloud CIDR allocator.
func NewCloudCIDRAllocator(
client clientset.Interface,
cloud cloudprovider.Interface) (ca CIDRAllocator, err error) {

View File

@ -28,11 +28,14 @@ import (
)
const (
// nodeStatusUpdateRetry controls the number of retries of writing NodeStatus update.
// NodeStatusUpdateRetry controls the number of retries of writing
// NodeStatus update.
NodeStatusUpdateRetry = 5
// controls how often NodeController will try to evict Pods from non-responsive Nodes.
// NodeEvictionPeriod controls how often NodeController will try to
// evict Pods from non-responsive Nodes.
NodeEvictionPeriod = 100 * time.Millisecond
// Burst value for all eviction rate limiters
// EvictionRateLimiterBurst is the burst value for all eviction rate
// limiters
EvictionRateLimiterBurst = 1
)
@ -46,19 +49,26 @@ type TimedValue struct {
}
// now is used to test time
var now func() time.Time = time.Now
var now = time.Now
// TimedQueue is a priority heap where the lowest ProcessAt is at the front of the queue
type TimedQueue []*TimedValue
func (h TimedQueue) Len() int { return len(h) }
func (h TimedQueue) Less(i, j int) bool { return h[i].ProcessAt.Before(h[j].ProcessAt) }
func (h TimedQueue) Swap(i, j int) { h[i], h[j] = h[j], h[i] }
// Len is the length of the queue.
func (h TimedQueue) Len() int { return len(h) }
// Less returns true if queue[i] < queue[j].
func (h TimedQueue) Less(i, j int) bool { return h[i].ProcessAt.Before(h[j].ProcessAt) }
// Swap swaps index i and j.
func (h TimedQueue) Swap(i, j int) { h[i], h[j] = h[j], h[i] }
// Push a new TimedValue on to the queue.
func (h *TimedQueue) Push(x interface{}) {
*h = append(*h, x.(*TimedValue))
}
// Pop the lowest ProcessAt item.
func (h *TimedQueue) Pop() interface{} {
old := *h
n := len(old)
@ -67,16 +77,17 @@ func (h *TimedQueue) Pop() interface{} {
return x
}
// A FIFO queue which additionally guarantees that any element can be added only once until
// it is removed.
// UniqueQueue is a FIFO queue which additionally guarantees that any
// element can be added only once until it is removed.
type UniqueQueue struct {
lock sync.Mutex
queue TimedQueue
set sets.String
}
// Adds a new value to the queue if it wasn't added before, or was explicitly removed by the
// Remove call. Returns true if new value was added.
// Add a new value to the queue if it wasn't added before, or was
// explicitly removed by the Remove call. Returns true if new value
// was added.
func (q *UniqueQueue) Add(value TimedValue) bool {
q.lock.Lock()
defer q.lock.Unlock()
@ -89,8 +100,9 @@ func (q *UniqueQueue) Add(value TimedValue) bool {
return true
}
// Replace replaces an existing value in the queue if it already exists, otherwise it does nothing.
// Returns true if the item was found.
// Replace replaces an existing value in the queue if it already
// exists, otherwise it does nothing. Returns true if the item was
// found.
func (q *UniqueQueue) Replace(value TimedValue) bool {
q.lock.Lock()
defer q.lock.Unlock()
@ -106,8 +118,9 @@ func (q *UniqueQueue) Replace(value TimedValue) bool {
return false
}
// Removes the value from the queue, but keeps it in the set, so it won't be added second time.
// Returns true if something was removed.
// RemoveFromQueue the value from the queue, but keeps it in the set,
// so it won't be added second time. Returns true if something was
// removed.
func (q *UniqueQueue) RemoveFromQueue(value string) bool {
q.lock.Lock()
defer q.lock.Unlock()
@ -124,8 +137,9 @@ func (q *UniqueQueue) RemoveFromQueue(value string) bool {
return false
}
// Removes the value from the queue, so Get() call won't return it, and allow subsequent addition
// of the given value. If the value is not present does nothing and returns false.
// Remove the value from the queue, so Get() call won't return it, and
// allow subsequent addition of the given value. If the value is not
// present does nothing and returns false.
func (q *UniqueQueue) Remove(value string) bool {
q.lock.Lock()
defer q.lock.Unlock()
@ -143,7 +157,7 @@ func (q *UniqueQueue) Remove(value string) bool {
return true
}
// Returns the oldest added value that wasn't returned yet.
// Get returns the oldest added value that wasn't returned yet.
func (q *UniqueQueue) Get() (TimedValue, bool) {
q.lock.Lock()
defer q.lock.Unlock()
@ -155,7 +169,8 @@ func (q *UniqueQueue) Get() (TimedValue, bool) {
return *result, true
}
// Head returns the oldest added value that wasn't returned yet without removing it.
// Head returns the oldest added value that wasn't returned yet
// without removing it.
func (q *UniqueQueue) Head() (TimedValue, bool) {
q.lock.Lock()
defer q.lock.Unlock()
@ -166,7 +181,8 @@ func (q *UniqueQueue) Head() (TimedValue, bool) {
return *result, true
}
// Clear removes all items from the queue and duplication preventing set.
// Clear removes all items from the queue and duplication preventing
// set.
func (q *UniqueQueue) Clear() {
q.lock.Lock()
defer q.lock.Unlock()
@ -178,15 +194,16 @@ func (q *UniqueQueue) Clear() {
}
}
// RateLimitedTimedQueue is a unique item priority queue ordered by the expected next time
// of execution. It is also rate limited.
// RateLimitedTimedQueue is a unique item priority queue ordered by
// the expected next time of execution. It is also rate limited.
type RateLimitedTimedQueue struct {
queue UniqueQueue
limiterLock sync.Mutex
limiter flowcontrol.RateLimiter
}
// Creates new queue which will use given RateLimiter to oversee execution.
// NewRateLimitedTimedQueue creates new queue which will use given
// RateLimiter to oversee execution.
func NewRateLimitedTimedQueue(limiter flowcontrol.RateLimiter) *RateLimitedTimedQueue {
return &RateLimitedTimedQueue{
queue: UniqueQueue{
@ -197,18 +214,21 @@ func NewRateLimitedTimedQueue(limiter flowcontrol.RateLimiter) *RateLimitedTimed
}
}
// ActionFunc takes a timed value and returns false if the item must be retried, with an optional
// time.Duration if some minimum wait interval should be used.
// ActionFunc takes a timed value and returns false if the item must
// be retried, with an optional time.Duration if some minimum wait
// interval should be used.
type ActionFunc func(TimedValue) (bool, time.Duration)
// Try processes the queue. Ends prematurely if RateLimiter forbids an action and leak is true.
// Otherwise, requeues the item to be processed. Each value is processed once if fn returns true,
// otherwise it is added back to the queue. The returned remaining is used to identify the minimum
// time to execute the next item in the queue. The same value is processed only once unless
// Remove is explicitly called on it (it's done by the cancelPodEviction function in NodeController
// when Node becomes Ready again)
// TODO: figure out a good way to do garbage collection for all Nodes that were removed from
// the cluster.
// Try processes the queue.Ends prematurely if RateLimiter forbids an
// action and leak is true. Otherwise, requeues the item to be
// processed. Each value is processed once if fn returns true,
// otherwise it is added back to the queue. The returned remaining is
// used to identify the minimum time to execute the next item in the
// queue. The same value is processed only once unless Remove is
// explicitly called on it (it's done by the cancelPodEviction
// function in NodeController when Node becomes Ready again) TODO:
// figure out a good way to do garbage collection for all Nodes that
// were removed from the cluster.
func (q *RateLimitedTimedQueue) Try(fn ActionFunc) {
val, ok := q.queue.Head()
q.limiterLock.Lock()
@ -236,8 +256,9 @@ func (q *RateLimitedTimedQueue) Try(fn ActionFunc) {
}
}
// Adds value to the queue to be processed. Won't add the same value(comparsion by value) a second time
// if it was already added and not removed.
// Add value to the queue to be processed. Won't add the same
// value(comparsion by value) a second time if it was already added
// and not removed.
func (q *RateLimitedTimedQueue) Add(value string, uid interface{}) bool {
now := now()
return q.queue.Add(TimedValue{
@ -248,17 +269,19 @@ func (q *RateLimitedTimedQueue) Add(value string, uid interface{}) bool {
})
}
// Removes Node from the Evictor. The Node won't be processed until added again.
// Remove Node from the Evictor. The Node won't be processed until
// added again.
func (q *RateLimitedTimedQueue) Remove(value string) bool {
return q.queue.Remove(value)
}
// Removes all items from the queue
// Clear removes all items from the queue
func (q *RateLimitedTimedQueue) Clear() {
q.queue.Clear()
}
// SwapLimiter safely swaps current limiter for this queue with the passed one if capacities or qps's differ.
// SwapLimiter safely swaps current limiter for this queue with the
// passed one if capacities or qps's differ.
func (q *RateLimitedTimedQueue) SwapLimiter(newQPS float32) {
q.limiterLock.Lock()
defer q.limiterLock.Unlock()

View File

@ -325,9 +325,8 @@ func (tc *NoExecuteTaintManager) processPodOnNode(
startTime = scheduledEviction.CreatedAt
if startTime.Add(minTolerationTime).Before(triggerTime) {
return
} else {
tc.cancelWorkWithEvent(podNamespacedName)
}
tc.cancelWorkWithEvent(podNamespacedName)
}
tc.taintEvictionQueue.AddWork(NewWorkArgs(podNamespacedName.Name, podNamespacedName.Namespace), startTime, triggerTime)
}

View File

@ -46,15 +46,18 @@ import (
)
var (
// ErrCloudInstance occurs when the cloud provider does not support
// the Instances API.
ErrCloudInstance = errors.New("cloud provider doesn't support instances")
// The minimum kubelet version for which the nodecontroller
// can safely flip pod.Status to NotReady.
// podStatusReconciliationVersion is the the minimum kubelet version
// for which the nodecontroller can safely flip pod.Status to
// NotReady.
podStatusReconciliationVersion = utilversion.MustParseSemantic("v1.2.0")
)
// DeletePods will delete all pods from master running on given node, and return true
// if any pods were deleted, or were found pending deletion.
// DeletePods will delete all pods from master running on given node,
// and return true if any pods were deleted, or were found pending
// deletion.
func DeletePods(kubeClient clientset.Interface, recorder record.EventRecorder, nodeName, nodeUID string, daemonStore extensionslisters.DaemonSetLister) (bool, error) {
remaining := false
selector := fields.OneTermEqualSelector(api.PodHostField, nodeName).String()
@ -109,8 +112,9 @@ func DeletePods(kubeClient clientset.Interface, recorder record.EventRecorder, n
return remaining, nil
}
// SetPodTerminationReason attempts to set a reason and message in the pod status, updates it in the apiserver,
// and returns an error if it encounters one.
// SetPodTerminationReason attempts to set a reason and message in the
// pod status, updates it in the apiserver, and returns an error if it
// encounters one.
func SetPodTerminationReason(kubeClient clientset.Interface, pod *v1.Pod, nodeName string) (*v1.Pod, error) {
if pod.Status.Reason == nodepkg.NodeUnreachablePodReason {
return pod, nil
@ -127,6 +131,7 @@ func SetPodTerminationReason(kubeClient clientset.Interface, pod *v1.Pod, nodeNa
return updatedPod, nil
}
// ForcefullyDeletePod deletes the pod immediately.
func ForcefullyDeletePod(c clientset.Interface, pod *v1.Pod) error {
var zero int64
glog.Infof("NodeController is force deleting Pod: %v:%v", pod.Namespace, pod.Name)
@ -137,8 +142,8 @@ func ForcefullyDeletePod(c clientset.Interface, pod *v1.Pod) error {
return err
}
// ForcefullyDeleteNode immediately the node. The pods on the node are cleaned
// up by the podGC.
// ForcefullyDeleteNode deletes the node immediately. The pods on the
// node are cleaned up by the podGC.
func ForcefullyDeleteNode(kubeClient clientset.Interface, nodeName string) error {
if err := kubeClient.Core().Nodes().Delete(nodeName, nil); err != nil {
return fmt.Errorf("unable to delete node %q: %v", nodeName, err)
@ -146,8 +151,8 @@ func ForcefullyDeleteNode(kubeClient clientset.Interface, nodeName string) error
return nil
}
// update ready status of all pods running on given node from master
// return true if success
// MarkAllPodsNotReady updates ready status of all pods running on
// given node from master return true if success
func MarkAllPodsNotReady(kubeClient clientset.Interface, node *v1.Node) error {
// Don't set pods to NotReady if the kubelet is running a version that
// doesn't understand how to correct readiness.
@ -207,6 +212,8 @@ func NodeRunningOutdatedKubelet(node *v1.Node) bool {
return false
}
// NodeExistsInCloudProvider returns true if the node exists in the
// cloud provider.
func NodeExistsInCloudProvider(cloud cloudprovider.Interface, nodeName types.NodeName) (bool, error) {
instances, ok := cloud.Instances()
if !ok {
@ -221,6 +228,7 @@ func NodeExistsInCloudProvider(cloud cloudprovider.Interface, nodeName types.Nod
return true, nil
}
// RecordNodeEvent records a event related to a node.
func RecordNodeEvent(recorder record.EventRecorder, nodeName, nodeUID, eventtype, reason, event string) {
ref := &v1.ObjectReference{
Kind: "Node",
@ -232,20 +240,22 @@ func RecordNodeEvent(recorder record.EventRecorder, nodeName, nodeUID, eventtype
recorder.Eventf(ref, eventtype, reason, "Node %s event: %s", nodeName, event)
}
func RecordNodeStatusChange(recorder record.EventRecorder, node *v1.Node, new_status string) {
// RecordNodeStatusChange records a event related to a node status change.
func RecordNodeStatusChange(recorder record.EventRecorder, node *v1.Node, newStatus string) {
ref := &v1.ObjectReference{
Kind: "Node",
Name: node.Name,
UID: node.UID,
Namespace: "",
}
glog.V(2).Infof("Recording status change %s event message for node %s", new_status, node.Name)
glog.V(2).Infof("Recording status change %s event message for node %s", newStatus, node.Name)
// TODO: This requires a transaction, either both node status is updated
// and event is recorded or neither should happen, see issue #6055.
recorder.Eventf(ref, v1.EventTypeNormal, new_status, "Node %s status is now: %s", node.Name, new_status)
recorder.Eventf(ref, v1.EventTypeNormal, newStatus, "Node %s status is now: %s", node.Name, newStatus)
}
// Returns true in case of success and false otherwise
// SwapNodeControllerTaint returns true in case of success and false
// otherwise.
func SwapNodeControllerTaint(kubeClient clientset.Interface, taintToAdd, taintToRemove *v1.Taint, node *v1.Node) bool {
taintToAdd.TimeAdded = metav1.Now()
err := controller.AddOrUpdateTaintOnNode(kubeClient, node.Name, taintToAdd)
@ -274,6 +284,7 @@ func SwapNodeControllerTaint(kubeClient clientset.Interface, taintToAdd, taintTo
return true
}
// CreateAddNodeHandler creates an add node handler.
func CreateAddNodeHandler(f func(node *v1.Node) error) func(obj interface{}) {
return func(originalObj interface{}) {
obj, err := scheme.Scheme.DeepCopy(originalObj)
@ -289,6 +300,7 @@ func CreateAddNodeHandler(f func(node *v1.Node) error) func(obj interface{}) {
}
}
// CreateUpdateNodeHandler creates a node update handler.
func CreateUpdateNodeHandler(f func(oldNode, newNode *v1.Node) error) func(oldObj, newObj interface{}) {
return func(origOldObj, origNewObj interface{}) {
oldObj, err := scheme.Scheme.DeepCopy(origOldObj)
@ -310,6 +322,7 @@ func CreateUpdateNodeHandler(f func(oldNode, newNode *v1.Node) error) func(oldOb
}
}
// CreateDeleteNodeHandler creates a delete node handler.
func CreateDeleteNodeHandler(f func(node *v1.Node) error) func(obj interface{}) {
return func(originalObj interface{}) {
obj, err := scheme.Scheme.DeepCopy(originalObj)