diff --git a/cmd/kube-controller-manager/app/BUILD b/cmd/kube-controller-manager/app/BUILD index 9c3b6ef699a..ec464becb36 100644 --- a/cmd/kube-controller-manager/app/BUILD +++ b/cmd/kube-controller-manager/app/BUILD @@ -63,6 +63,7 @@ go_library( "//pkg/controller/job:go_default_library", "//pkg/controller/namespace:go_default_library", "//pkg/controller/node:go_default_library", + "//pkg/controller/node/ipam:go_default_library", "//pkg/controller/podautoscaler:go_default_library", "//pkg/controller/podautoscaler/metrics:go_default_library", "//pkg/controller/podgc:go_default_library", diff --git a/cmd/kube-controller-manager/app/core.go b/cmd/kube-controller-manager/app/core.go index 5dd170140af..4aca6e0e614 100644 --- a/cmd/kube-controller-manager/app/core.go +++ b/cmd/kube-controller-manager/app/core.go @@ -42,6 +42,7 @@ import ( "k8s.io/kubernetes/pkg/controller/garbagecollector" namespacecontroller "k8s.io/kubernetes/pkg/controller/namespace" nodecontroller "k8s.io/kubernetes/pkg/controller/node" + "k8s.io/kubernetes/pkg/controller/node/ipam" "k8s.io/kubernetes/pkg/controller/podgc" replicationcontroller "k8s.io/kubernetes/pkg/controller/replication" resourcequotacontroller "k8s.io/kubernetes/pkg/controller/resourcequota" @@ -108,7 +109,7 @@ func startNodeController(ctx ControllerContext) (bool, error) { serviceCIDR, int(ctx.Options.NodeCIDRMaskSize), ctx.Options.AllocateNodeCIDRs, - nodecontroller.CIDRAllocatorType(ctx.Options.CIDRAllocatorType), + ipam.CIDRAllocatorType(ctx.Options.CIDRAllocatorType), ctx.Options.EnableTaintManager, utilfeature.DefaultFeatureGate.Enabled(features.TaintBasedEvictions), ) diff --git a/pkg/controller/node/BUILD b/pkg/controller/node/BUILD index d580fdf4cd1..958b1a2264b 100644 --- a/pkg/controller/node/BUILD +++ b/pkg/controller/node/BUILD @@ -10,25 +10,20 @@ load( go_test( name = "go_default_test", - srcs = [ - "cidr_allocator_test.go", - "cidr_set_test.go", - "nodecontroller_test.go", - "rate_limited_queue_test.go", - "taint_controller_test.go", - "timed_workers_test.go", - ], + srcs = ["nodecontroller_test.go"], library = ":go_default_library", tags = ["automanaged"], deps = [ "//pkg/cloudprovider:go_default_library", "//pkg/cloudprovider/providers/fake:go_default_library", "//pkg/controller:go_default_library", + "//pkg/controller/node/ipam:go_default_library", + "//pkg/controller/node/scheduler:go_default_library", + "//pkg/controller/node/util:go_default_library", "//pkg/controller/testutil:go_default_library", "//pkg/kubelet/apis:go_default_library", "//pkg/util/node:go_default_library", "//pkg/util/taints:go_default_library", - "//vendor/github.com/golang/glog:go_default_library", "//vendor/k8s.io/api/core/v1:go_default_library", "//vendor/k8s.io/api/extensions/v1beta1:go_default_library", "//vendor/k8s.io/apimachinery/pkg/api/equality:go_default_library", @@ -36,7 +31,6 @@ go_test( "//vendor/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library", "//vendor/k8s.io/apimachinery/pkg/types:go_default_library", "//vendor/k8s.io/apimachinery/pkg/util/diff:go_default_library", - "//vendor/k8s.io/apimachinery/pkg/util/sets:go_default_library", "//vendor/k8s.io/apimachinery/pkg/util/wait:go_default_library", "//vendor/k8s.io/client-go/informers:go_default_library", "//vendor/k8s.io/client-go/informers/core/v1:go_default_library", @@ -44,35 +38,24 @@ go_test( "//vendor/k8s.io/client-go/kubernetes:go_default_library", "//vendor/k8s.io/client-go/kubernetes/fake:go_default_library", "//vendor/k8s.io/client-go/testing:go_default_library", - "//vendor/k8s.io/client-go/util/flowcontrol:go_default_library", ], ) go_library( name = "go_default_library", srcs = [ - "cidr_allocator.go", - "cidr_set.go", - "cloud_cidr_allocator.go", - "controller_utils.go", "doc.go", "metrics.go", "node_controller.go", - "range_allocator.go", - "rate_limited_queue.go", - "taint_controller.go", - "timed_workers.go", ], tags = ["automanaged"], deps = [ - "//pkg/api:go_default_library", - "//pkg/api/helper:go_default_library", - "//pkg/api/v1/helper:go_default_library", "//pkg/api/v1/node:go_default_library", "//pkg/cloudprovider:go_default_library", - "//pkg/cloudprovider/providers/gce:go_default_library", "//pkg/controller:go_default_library", - "//pkg/kubelet/util/format:go_default_library", + "//pkg/controller/node/ipam:go_default_library", + "//pkg/controller/node/scheduler:go_default_library", + "//pkg/controller/node/util:go_default_library", "//pkg/util/metrics:go_default_library", "//pkg/util/node:go_default_library", "//pkg/util/system:go_default_library", @@ -88,9 +71,7 @@ go_library( "//vendor/k8s.io/apimachinery/pkg/fields:go_default_library", "//vendor/k8s.io/apimachinery/pkg/labels:go_default_library", "//vendor/k8s.io/apimachinery/pkg/types:go_default_library", - "//vendor/k8s.io/apimachinery/pkg/util/errors:go_default_library", "//vendor/k8s.io/apimachinery/pkg/util/runtime:go_default_library", - "//vendor/k8s.io/apimachinery/pkg/util/sets:go_default_library", "//vendor/k8s.io/apimachinery/pkg/util/wait:go_default_library", "//vendor/k8s.io/client-go/informers/core/v1:go_default_library", "//vendor/k8s.io/client-go/informers/extensions/v1beta1:go_default_library", @@ -102,7 +83,6 @@ go_library( "//vendor/k8s.io/client-go/tools/cache:go_default_library", "//vendor/k8s.io/client-go/tools/record:go_default_library", "//vendor/k8s.io/client-go/util/flowcontrol:go_default_library", - "//vendor/k8s.io/client-go/util/workqueue:go_default_library", ], ) @@ -115,6 +95,11 @@ filegroup( filegroup( name = "all-srcs", - srcs = [":package-srcs"], + srcs = [ + ":package-srcs", + "//pkg/controller/node/ipam:all-srcs", + "//pkg/controller/node/scheduler:all-srcs", + "//pkg/controller/node/util:all-srcs", + ], tags = ["automanaged"], ) diff --git a/pkg/controller/node/ipam/BUILD b/pkg/controller/node/ipam/BUILD new file mode 100644 index 00000000000..cd39c6d0924 --- /dev/null +++ b/pkg/controller/node/ipam/BUILD @@ -0,0 +1,68 @@ +package(default_visibility = ["//visibility:public"]) + +licenses(["notice"]) + +load( + "@io_bazel_rules_go//go:def.bzl", + "go_library", + "go_test", +) + +go_test( + name = "go_default_test", + srcs = ["cidr_allocator_test.go"], + library = ":go_default_library", + tags = ["automanaged"], + deps = [ + "//pkg/controller/testutil:go_default_library", + "//vendor/k8s.io/api/core/v1:go_default_library", + "//vendor/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library", + "//vendor/k8s.io/apimachinery/pkg/util/wait:go_default_library", + "//vendor/k8s.io/client-go/kubernetes/fake:go_default_library", + ], +) + +go_library( + name = "go_default_library", + srcs = [ + "cidr_allocator.go", + "cloud_cidr_allocator.go", + "range_allocator.go", + ], + tags = ["automanaged"], + deps = [ + "//pkg/api:go_default_library", + "//pkg/cloudprovider:go_default_library", + "//pkg/cloudprovider/providers/gce:go_default_library", + "//pkg/controller/node/ipam/cidrset:go_default_library", + "//pkg/controller/node/util:go_default_library", + "//pkg/util/node:go_default_library", + "//vendor/github.com/golang/glog:go_default_library", + "//vendor/k8s.io/api/core/v1:go_default_library", + "//vendor/k8s.io/apimachinery/pkg/api/errors:go_default_library", + "//vendor/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library", + "//vendor/k8s.io/apimachinery/pkg/types:go_default_library", + "//vendor/k8s.io/apimachinery/pkg/util/sets:go_default_library", + "//vendor/k8s.io/apimachinery/pkg/util/wait:go_default_library", + "//vendor/k8s.io/client-go/kubernetes:go_default_library", + "//vendor/k8s.io/client-go/kubernetes/scheme:go_default_library", + "//vendor/k8s.io/client-go/kubernetes/typed/core/v1:go_default_library", + "//vendor/k8s.io/client-go/tools/record:go_default_library", + ], +) + +filegroup( + name = "package-srcs", + srcs = glob(["**"]), + tags = ["automanaged"], + visibility = ["//visibility:private"], +) + +filegroup( + name = "all-srcs", + srcs = [ + ":package-srcs", + "//pkg/controller/node/ipam/cidrset:all-srcs", + ], + tags = ["automanaged"], +) diff --git a/pkg/controller/node/ipam/OWNERS b/pkg/controller/node/ipam/OWNERS new file mode 100755 index 00000000000..8164b49a07a --- /dev/null +++ b/pkg/controller/node/ipam/OWNERS @@ -0,0 +1,7 @@ +approvers: +- bowei +- dnardo +reviewers: +- bowei +- dnardo +- freehan diff --git a/pkg/controller/node/cidr_allocator.go b/pkg/controller/node/ipam/cidr_allocator.go similarity index 84% rename from pkg/controller/node/cidr_allocator.go rename to pkg/controller/node/ipam/cidr_allocator.go index cfc30c7b28f..e6c7f47869c 100644 --- a/pkg/controller/node/cidr_allocator.go +++ b/pkg/controller/node/ipam/cidr_allocator.go @@ -14,18 +14,14 @@ See the License for the specific language governing permissions and limitations under the License. */ -package node +package ipam import ( - "errors" "net" v1 "k8s.io/api/core/v1" ) -var errCIDRRangeNoCIDRsRemaining = errors.New( - "CIDR allocation failed; there are no remaining CIDRs left to allocate in the accepted range") - type nodeAndCIDR struct { cidr *net.IPNet nodeName string @@ -35,7 +31,11 @@ type nodeAndCIDR struct { type CIDRAllocatorType string const ( + // RangeAllocatorType is the allocator that uses an internal CIDR + // range allocator to do node CIDR range allocations. RangeAllocatorType CIDRAllocatorType = "RangeAllocator" + // CloudAllocatorType is the allocator that uses cloud platform + // support to do node CIDR range allocations. CloudAllocatorType CIDRAllocatorType = "CloudAllocator" ) diff --git a/pkg/controller/node/cidr_allocator_test.go b/pkg/controller/node/ipam/cidr_allocator_test.go similarity index 98% rename from pkg/controller/node/cidr_allocator_test.go rename to pkg/controller/node/ipam/cidr_allocator_test.go index 7d2ffb69d1d..3f911a899b3 100644 --- a/pkg/controller/node/cidr_allocator_test.go +++ b/pkg/controller/node/ipam/cidr_allocator_test.go @@ -14,7 +14,7 @@ See the License for the specific language governing permissions and limitations under the License. */ -package node +package ipam import ( "net" @@ -143,7 +143,7 @@ func TestAllocateOrOccupyCIDRSuccess(t *testing.T) { return } rangeAllocator.recorder = testutil.NewFakeRecorder() - if err = rangeAllocator.cidrs.occupy(cidr); err != nil { + if err = rangeAllocator.cidrs.Occupy(cidr); err != nil { t.Fatalf("%v: unexpected error when occupying CIDR %v: %v", tc.description, allocated, err) } } @@ -225,7 +225,7 @@ func TestAllocateOrOccupyCIDRFailure(t *testing.T) { return } rangeAllocator.recorder = testutil.NewFakeRecorder() - err = rangeAllocator.cidrs.occupy(cidr) + err = rangeAllocator.cidrs.Occupy(cidr) if err != nil { t.Fatalf("%v: unexpected error when occupying CIDR %v: %v", tc.description, allocated, err) } @@ -337,7 +337,7 @@ func TestReleaseCIDRSuccess(t *testing.T) { return } rangeAllocator.recorder = testutil.NewFakeRecorder() - err = rangeAllocator.cidrs.occupy(cidr) + err = rangeAllocator.cidrs.Occupy(cidr) if err != nil { t.Fatalf("%v: unexpected error when occupying CIDR %v: %v", tc.description, allocated, err) } diff --git a/pkg/controller/node/ipam/cidrset/BUILD b/pkg/controller/node/ipam/cidrset/BUILD new file mode 100644 index 00000000000..1a0e4cc5794 --- /dev/null +++ b/pkg/controller/node/ipam/cidrset/BUILD @@ -0,0 +1,36 @@ +package(default_visibility = ["//visibility:public"]) + +licenses(["notice"]) + +load( + "@io_bazel_rules_go//go:def.bzl", + "go_library", + "go_test", +) + +go_test( + name = "go_default_test", + srcs = ["cidr_set_test.go"], + library = ":go_default_library", + tags = ["automanaged"], + deps = ["//vendor/github.com/golang/glog:go_default_library"], +) + +go_library( + name = "go_default_library", + srcs = ["cidr_set.go"], + tags = ["automanaged"], +) + +filegroup( + name = "package-srcs", + srcs = glob(["**"]), + tags = ["automanaged"], + visibility = ["//visibility:private"], +) + +filegroup( + name = "all-srcs", + srcs = [":package-srcs"], + tags = ["automanaged"], +) diff --git a/pkg/controller/node/cidr_set.go b/pkg/controller/node/ipam/cidrset/cidr_set.go similarity index 82% rename from pkg/controller/node/cidr_set.go rename to pkg/controller/node/ipam/cidrset/cidr_set.go index 6da63d4281d..ce08dd8af3f 100644 --- a/pkg/controller/node/cidr_set.go +++ b/pkg/controller/node/ipam/cidrset/cidr_set.go @@ -14,17 +14,20 @@ See the License for the specific language governing permissions and limitations under the License. */ -package node +package cidrset import ( "encoding/binary" + "errors" "fmt" "math/big" "net" "sync" ) -type cidrSet struct { +// CidrSet manages a set of CIDR ranges from which blocks of IPs can +// be allocated from. +type CidrSet struct { sync.Mutex clusterCIDR *net.IPNet clusterIP net.IP @@ -44,7 +47,15 @@ const ( maxPrefixLength = 64 ) -func newCIDRSet(clusterCIDR *net.IPNet, subNetMaskSize int) *cidrSet { +var ( + // ErrCIDRRangeNoCIDRsRemaining occurs when there are no more space + // to allocate CIDR ranges. + ErrCIDRRangeNoCIDRsRemaining = errors.New( + "CIDR allocation failed; there are no remaining CIDRs left to allocate in the accepted range") +) + +// NewCIDRSet creates a new CidrSet. +func NewCIDRSet(clusterCIDR *net.IPNet, subNetMaskSize int) *CidrSet { clusterMask := clusterCIDR.Mask clusterMaskSize, _ := clusterMask.Size() @@ -54,7 +65,7 @@ func newCIDRSet(clusterCIDR *net.IPNet, subNetMaskSize int) *cidrSet { } else { maxCIDRs = 1 << uint32(subNetMaskSize-clusterMaskSize) } - return &cidrSet{ + return &CidrSet{ clusterCIDR: clusterCIDR, clusterIP: clusterCIDR.IP, clusterMaskSize: clusterMaskSize, @@ -63,7 +74,7 @@ func newCIDRSet(clusterCIDR *net.IPNet, subNetMaskSize int) *cidrSet { } } -func (s *cidrSet) indexToCIDRBlock(index int) *net.IPNet { +func (s *CidrSet) indexToCIDRBlock(index int) *net.IPNet { var ip []byte var mask int switch /*v4 or v6*/ { @@ -91,7 +102,8 @@ func (s *cidrSet) indexToCIDRBlock(index int) *net.IPNet { } } -func (s *cidrSet) allocateNext() (*net.IPNet, error) { +// AllocateNext allocates the next free CIDR range. +func (s *CidrSet) AllocateNext() (*net.IPNet, error) { s.Lock() defer s.Unlock() @@ -104,7 +116,7 @@ func (s *cidrSet) allocateNext() (*net.IPNet, error) { } } if nextUnused == -1 { - return nil, errCIDRRangeNoCIDRsRemaining + return nil, ErrCIDRRangeNoCIDRsRemaining } s.nextCandidate = (nextUnused + 1) % s.maxCIDRs @@ -113,7 +125,7 @@ func (s *cidrSet) allocateNext() (*net.IPNet, error) { return s.indexToCIDRBlock(nextUnused), nil } -func (s *cidrSet) getBeginingAndEndIndices(cidr *net.IPNet) (begin, end int, err error) { +func (s *CidrSet) getBeginingAndEndIndices(cidr *net.IPNet) (begin, end int, err error) { begin, end = 0, s.maxCIDRs-1 cidrMask := cidr.Mask maskSize, _ := cidrMask.Size() @@ -160,7 +172,8 @@ func (s *cidrSet) getBeginingAndEndIndices(cidr *net.IPNet) (begin, end int, err return begin, end, nil } -func (s *cidrSet) release(cidr *net.IPNet) error { +// Release releases the given CIDR range. +func (s *CidrSet) Release(cidr *net.IPNet) error { begin, end, err := s.getBeginingAndEndIndices(cidr) if err != nil { return err @@ -173,7 +186,8 @@ func (s *cidrSet) release(cidr *net.IPNet) error { return nil } -func (s *cidrSet) occupy(cidr *net.IPNet) (err error) { +// Occupy marks the given CIDR range as used. +func (s *CidrSet) Occupy(cidr *net.IPNet) (err error) { begin, end, err := s.getBeginingAndEndIndices(cidr) if err != nil { return err @@ -188,7 +202,7 @@ func (s *cidrSet) occupy(cidr *net.IPNet) (err error) { return nil } -func (s *cidrSet) getIndexForCIDR(cidr *net.IPNet) (int, error) { +func (s *CidrSet) getIndexForCIDR(cidr *net.IPNet) (int, error) { var cidrIndex uint32 if cidr.IP.To4() != nil { cidrIndex = (binary.BigEndian.Uint32(s.clusterIP) ^ binary.BigEndian.Uint32(cidr.IP.To4())) >> uint32(32-s.subNetMaskSize) diff --git a/pkg/controller/node/cidr_set_test.go b/pkg/controller/node/ipam/cidrset/cidr_set_test.go similarity index 93% rename from pkg/controller/node/cidr_set_test.go rename to pkg/controller/node/ipam/cidrset/cidr_set_test.go index 6c9f6c54d87..fbd9886e53d 100644 --- a/pkg/controller/node/cidr_set_test.go +++ b/pkg/controller/node/ipam/cidrset/cidr_set_test.go @@ -14,7 +14,7 @@ See the License for the specific language governing permissions and limitations under the License. */ -package node +package cidrset import ( "math/big" @@ -47,9 +47,9 @@ func TestCIDRSetFullyAllocated(t *testing.T) { } for _, tc := range cases { _, clusterCIDR, _ := net.ParseCIDR(tc.clusterCIDRStr) - a := newCIDRSet(clusterCIDR, tc.subNetMaskSize) + a := NewCIDRSet(clusterCIDR, tc.subNetMaskSize) - p, err := a.allocateNext() + p, err := a.AllocateNext() if err != nil { t.Fatalf("unexpected error: %v for %v", err, tc.description) } @@ -58,14 +58,14 @@ func TestCIDRSetFullyAllocated(t *testing.T) { p.String(), tc.expectedCIDR, tc.description) } - _, err = a.allocateNext() + _, err = a.AllocateNext() if err == nil { t.Fatalf("expected error because of fully-allocated range for %v", tc.description) } - a.release(p) + a.Release(p) - p, err = a.allocateNext() + p, err = a.AllocateNext() if err != nil { t.Fatalf("unexpected error: %v for %v", err, tc.description) } @@ -73,7 +73,7 @@ func TestCIDRSetFullyAllocated(t *testing.T) { t.Fatalf("unexpected allocated cidr: %v, expecting %v for %v", p.String(), tc.expectedCIDR, tc.description) } - _, err = a.allocateNext() + _, err = a.AllocateNext() if err == nil { t.Fatalf("expected error because of fully-allocated range for %v", tc.description) } @@ -133,7 +133,7 @@ func TestIndexToCIDRBlock(t *testing.T) { } for _, tc := range cases { _, clusterCIDR, _ := net.ParseCIDR(tc.clusterCIDRStr) - a := newCIDRSet(clusterCIDR, tc.subnetMaskSize) + a := NewCIDRSet(clusterCIDR, tc.subnetMaskSize) cidr := a.indexToCIDRBlock(tc.index) if cidr.String() != tc.CIDRBlock { t.Fatalf("error for %v index %d %s", tc.description, tc.index, cidr.String()) @@ -157,12 +157,12 @@ func TestCIDRSet_RandomishAllocation(t *testing.T) { } for _, tc := range cases { _, clusterCIDR, _ := net.ParseCIDR(tc.clusterCIDRStr) - a := newCIDRSet(clusterCIDR, 24) + a := NewCIDRSet(clusterCIDR, 24) // allocate all the CIDRs var cidrs []*net.IPNet for i := 0; i < 256; i++ { - if c, err := a.allocateNext(); err == nil { + if c, err := a.AllocateNext(); err == nil { cidrs = append(cidrs, c) } else { t.Fatalf("unexpected error: %v for %v", err, tc.description) @@ -170,25 +170,25 @@ func TestCIDRSet_RandomishAllocation(t *testing.T) { } var err error - _, err = a.allocateNext() + _, err = a.AllocateNext() if err == nil { t.Fatalf("expected error because of fully-allocated range for %v", tc.description) } // release them all for i := 0; i < len(cidrs); i++ { - a.release(cidrs[i]) + a.Release(cidrs[i]) } // allocate the CIDRs again var rcidrs []*net.IPNet for i := 0; i < 256; i++ { - if c, err := a.allocateNext(); err == nil { + if c, err := a.AllocateNext(); err == nil { rcidrs = append(rcidrs, c) } else { t.Fatalf("unexpected error: %d, %v for %v", i, err, tc.description) } } - _, err = a.allocateNext() + _, err = a.AllocateNext() if err == nil { t.Fatalf("expected error because of fully-allocated range for %v", tc.description) } @@ -215,14 +215,14 @@ func TestCIDRSet_AllocationOccupied(t *testing.T) { } for _, tc := range cases { _, clusterCIDR, _ := net.ParseCIDR(tc.clusterCIDRStr) - a := newCIDRSet(clusterCIDR, 24) + a := NewCIDRSet(clusterCIDR, 24) // allocate all the CIDRs var cidrs []*net.IPNet - var num_cidrs = 256 + var numCIDRs = 256 - for i := 0; i < num_cidrs; i++ { - if c, err := a.allocateNext(); err == nil { + for i := 0; i < numCIDRs; i++ { + if c, err := a.AllocateNext(); err == nil { cidrs = append(cidrs, c) } else { t.Fatalf("unexpected error: %v for %v", err, tc.description) @@ -230,35 +230,35 @@ func TestCIDRSet_AllocationOccupied(t *testing.T) { } var err error - _, err = a.allocateNext() + _, err = a.AllocateNext() if err == nil { t.Fatalf("expected error because of fully-allocated range for %v", tc.description) } // release them all for i := 0; i < len(cidrs); i++ { - a.release(cidrs[i]) + a.Release(cidrs[i]) } // occupy the last 128 CIDRs - for i := num_cidrs / 2; i < num_cidrs; i++ { - a.occupy(cidrs[i]) + for i := numCIDRs / 2; i < numCIDRs; i++ { + a.Occupy(cidrs[i]) } // allocate the first 128 CIDRs again var rcidrs []*net.IPNet - for i := 0; i < num_cidrs/2; i++ { - if c, err := a.allocateNext(); err == nil { + for i := 0; i < numCIDRs/2; i++ { + if c, err := a.AllocateNext(); err == nil { rcidrs = append(rcidrs, c) } else { t.Fatalf("unexpected error: %d, %v for %v", i, err, tc.description) } } - _, err = a.allocateNext() + _, err = a.AllocateNext() if err == nil { t.Fatalf("expected error because of fully-allocated range for %v", tc.description) } // check Occupy() work properly - for i := num_cidrs / 2; i < num_cidrs; i++ { + for i := numCIDRs / 2; i < numCIDRs; i++ { rcidrs = append(rcidrs, cidrs[i]) } if !reflect.DeepEqual(cidrs, rcidrs) { @@ -394,7 +394,7 @@ func TestGetBitforCIDR(t *testing.T) { t.Fatalf("unexpected error: %v for %v", err, tc.description) } - cs := newCIDRSet(clusterCIDR, tc.subNetMaskSize) + cs := NewCIDRSet(clusterCIDR, tc.subNetMaskSize) _, subnetCIDR, err := net.ParseCIDR(tc.subNetCIDRStr) if err != nil { @@ -562,14 +562,14 @@ func TestOccupy(t *testing.T) { t.Fatalf("unexpected error: %v for %v", err, tc.description) } - cs := newCIDRSet(clusterCIDR, tc.subNetMaskSize) + cs := NewCIDRSet(clusterCIDR, tc.subNetMaskSize) _, subnetCIDR, err := net.ParseCIDR(tc.subNetCIDRStr) if err != nil { t.Fatalf("unexpected error: %v for %v", err, tc.description) } - err = cs.occupy(subnetCIDR) + err = cs.Occupy(subnetCIDR) if err == nil && tc.expectErr { t.Errorf("expected error but got none for %v", tc.description) continue @@ -629,9 +629,9 @@ func TestCIDRSetv6(t *testing.T) { } for _, tc := range cases { _, clusterCIDR, _ := net.ParseCIDR(tc.clusterCIDRStr) - a := newCIDRSet(clusterCIDR, tc.subNetMaskSize) + a := NewCIDRSet(clusterCIDR, tc.subNetMaskSize) - p, err := a.allocateNext() + p, err := a.AllocateNext() if err == nil && tc.expectErr { t.Errorf("expected error but got none for %v", tc.description) continue @@ -645,7 +645,7 @@ func TestCIDRSetv6(t *testing.T) { t.Fatalf("unexpected allocated cidr: %s for %v", p.String(), tc.description) } } - p2, err := a.allocateNext() + p2, err := a.AllocateNext() if !tc.expectErr { if p2.String() != tc.expectedCIDR2 { t.Fatalf("unexpected allocated cidr: %s for %v", p2.String(), tc.description) diff --git a/pkg/controller/node/cloud_cidr_allocator.go b/pkg/controller/node/ipam/cloud_cidr_allocator.go similarity index 94% rename from pkg/controller/node/cloud_cidr_allocator.go rename to pkg/controller/node/ipam/cloud_cidr_allocator.go index 85b4c9e8439..9dfb9543f83 100644 --- a/pkg/controller/node/cloud_cidr_allocator.go +++ b/pkg/controller/node/ipam/cloud_cidr_allocator.go @@ -14,7 +14,7 @@ See the License for the specific language governing permissions and limitations under the License. */ -package node +package ipam import ( "fmt" @@ -32,6 +32,7 @@ import ( "k8s.io/kubernetes/pkg/api" "k8s.io/kubernetes/pkg/cloudprovider" "k8s.io/kubernetes/pkg/cloudprovider/providers/gce" + "k8s.io/kubernetes/pkg/controller/node/util" nodeutil "k8s.io/kubernetes/pkg/util/node" ) @@ -50,6 +51,7 @@ type cloudCIDRAllocator struct { var _ CIDRAllocator = (*cloudCIDRAllocator)(nil) +// NewCloudCIDRAllocator creates a new cloud CIDR allocator. func NewCloudCIDRAllocator( client clientset.Interface, cloud cloudprovider.Interface) (ca CIDRAllocator, err error) { @@ -79,12 +81,12 @@ func (ca *cloudCIDRAllocator) AllocateOrOccupyCIDR(node *v1.Node) error { cidrs, err := ca.cloud.AliasRanges(types.NodeName(node.Name)) if err != nil { - recordNodeStatusChange(ca.recorder, node, "CIDRNotAvailable") + util.RecordNodeStatusChange(ca.recorder, node, "CIDRNotAvailable") return fmt.Errorf("failed to allocate cidr: %v", err) } if len(cidrs) == 0 { - recordNodeStatusChange(ca.recorder, node, "CIDRNotAvailable") + util.RecordNodeStatusChange(ca.recorder, node, "CIDRNotAvailable") glog.V(2).Infof("Node %v has no CIDRs", node.Name) return fmt.Errorf("failed to allocate cidr (none exist)") } diff --git a/pkg/controller/node/range_allocator.go b/pkg/controller/node/ipam/range_allocator.go similarity index 92% rename from pkg/controller/node/range_allocator.go rename to pkg/controller/node/ipam/range_allocator.go index f5910417e4a..0f150a761a8 100644 --- a/pkg/controller/node/range_allocator.go +++ b/pkg/controller/node/ipam/range_allocator.go @@ -14,13 +14,15 @@ See the License for the specific language governing permissions and limitations under the License. */ -package node +package ipam import ( "fmt" "net" "sync" + "github.com/golang/glog" + "k8s.io/api/core/v1" apierrors "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" @@ -31,7 +33,8 @@ import ( v1core "k8s.io/client-go/kubernetes/typed/core/v1" "k8s.io/client-go/tools/record" - "github.com/golang/glog" + "k8s.io/kubernetes/pkg/controller/node/ipam/cidrset" + "k8s.io/kubernetes/pkg/controller/node/util" ) // TODO: figure out the good setting for those constants. @@ -45,7 +48,7 @@ const ( type rangeAllocator struct { client clientset.Interface - cidrs *cidrSet + cidrs *cidrset.CidrSet clusterCIDR *net.IPNet maxCIDRs int // Channel that is used to pass updating Nodes with assigned CIDRs to the background @@ -74,7 +77,7 @@ func NewCIDRRangeAllocator(client clientset.Interface, clusterCIDR *net.IPNet, s ra := &rangeAllocator{ client: client, - cidrs: newCIDRSet(clusterCIDR, subNetMaskSize), + cidrs: cidrset.NewCIDRSet(clusterCIDR, subNetMaskSize), clusterCIDR: clusterCIDR, nodeCIDRUpdateChannel: make(chan nodeAndCIDR, cidrUpdateQueueSize), recorder: recorder, @@ -150,7 +153,7 @@ func (r *rangeAllocator) occupyCIDR(node *v1.Node) error { if err != nil { return fmt.Errorf("failed to parse node %s, CIDR %s", node.Name, node.Spec.PodCIDR) } - if err := r.cidrs.occupy(podCIDR); err != nil { + if err := r.cidrs.Occupy(podCIDR); err != nil { return fmt.Errorf("failed to mark cidr as occupied: %v", err) } return nil @@ -169,10 +172,10 @@ func (r *rangeAllocator) AllocateOrOccupyCIDR(node *v1.Node) error { if node.Spec.PodCIDR != "" { return r.occupyCIDR(node) } - podCIDR, err := r.cidrs.allocateNext() + podCIDR, err := r.cidrs.AllocateNext() if err != nil { r.removeNodeFromProcessing(node.Name) - recordNodeStatusChange(r.recorder, node, "CIDRNotAvailable") + util.RecordNodeStatusChange(r.recorder, node, "CIDRNotAvailable") return fmt.Errorf("failed to allocate cidr: %v", err) } @@ -194,7 +197,7 @@ func (r *rangeAllocator) ReleaseCIDR(node *v1.Node) error { } glog.V(4).Infof("release CIDR %s", node.Spec.PodCIDR) - if err = r.cidrs.release(podCIDR); err != nil { + if err = r.cidrs.Release(podCIDR); err != nil { return fmt.Errorf("Error when releasing CIDR %v: %v", node.Spec.PodCIDR, err) } return err @@ -212,7 +215,7 @@ func (r *rangeAllocator) filterOutServiceRange(serviceCIDR *net.IPNet) { return } - if err := r.cidrs.occupy(serviceCIDR); err != nil { + if err := r.cidrs.Occupy(serviceCIDR); err != nil { glog.Errorf("Error filtering out service cidr %v: %v", serviceCIDR, err) } } @@ -232,7 +235,7 @@ func (r *rangeAllocator) updateCIDRAllocation(data nodeAndCIDR) error { if node.Spec.PodCIDR != "" { glog.Errorf("Node %v already has allocated CIDR %v. Releasing assigned one if different.", node.Name, node.Spec.PodCIDR) if node.Spec.PodCIDR != data.cidr.String() { - if err := r.cidrs.release(data.cidr); err != nil { + if err := r.cidrs.Release(data.cidr); err != nil { glog.Errorf("Error when releasing CIDR %v", data.cidr.String()) } } @@ -246,13 +249,13 @@ func (r *rangeAllocator) updateCIDRAllocation(data nodeAndCIDR) error { } } if err != nil { - recordNodeStatusChange(r.recorder, node, "CIDRAssignmentFailed") + util.RecordNodeStatusChange(r.recorder, node, "CIDRAssignmentFailed") // We accept the fact that we may leek CIDRs here. This is safer than releasing // them in case when we don't know if request went through. // NodeController restart will return all falsely allocated CIDRs to the pool. if !apierrors.IsServerTimeout(err) { glog.Errorf("CIDR assignment for node %v failed: %v. Releasing allocated CIDR", data.nodeName, err) - if releaseErr := r.cidrs.release(data.cidr); releaseErr != nil { + if releaseErr := r.cidrs.Release(data.cidr); releaseErr != nil { glog.Errorf("Error releasing allocated CIDR for node %v: %v", data.nodeName, releaseErr) } } diff --git a/pkg/controller/node/node_controller.go b/pkg/controller/node/node_controller.go index 5a2fa6f89f0..f2725a832c2 100644 --- a/pkg/controller/node/node_controller.go +++ b/pkg/controller/node/node_controller.go @@ -17,12 +17,13 @@ limitations under the License. package node import ( - "errors" "fmt" "net" "sync" "time" + "github.com/golang/glog" + apiequality "k8s.io/apimachinery/pkg/api/equality" apierrors "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" @@ -47,14 +48,15 @@ import ( v1node "k8s.io/kubernetes/pkg/api/v1/node" "k8s.io/kubernetes/pkg/cloudprovider" "k8s.io/kubernetes/pkg/controller" + "k8s.io/kubernetes/pkg/controller/node/ipam" + "k8s.io/kubernetes/pkg/controller/node/scheduler" + "k8s.io/kubernetes/pkg/controller/node/util" "k8s.io/kubernetes/pkg/util/metrics" utilnode "k8s.io/kubernetes/pkg/util/node" "k8s.io/kubernetes/pkg/util/system" taintutils "k8s.io/kubernetes/pkg/util/taints" utilversion "k8s.io/kubernetes/pkg/util/version" "k8s.io/kubernetes/plugin/pkg/scheduler/algorithm" - - "github.com/golang/glog" ) func init() { @@ -63,13 +65,8 @@ func init() { } var ( - ErrCloudInstance = errors.New("cloud provider doesn't support instances.") gracefulDeletionVersion = utilversion.MustParseSemantic("v1.1.0") - // The minimum kubelet version for which the nodecontroller - // can safely flip pod.Status to NotReady. - podStatusReconciliationVersion = utilversion.MustParseSemantic("v1.2.0") - UnreachableTaintTemplate = &v1.Taint{ Key: algorithm.TaintNodeUnreachable, Effect: v1.TaintEffectNoExecute, @@ -82,12 +79,6 @@ var ( ) const ( - // nodeStatusUpdateRetry controls the number of retries of writing NodeStatus update. - nodeStatusUpdateRetry = 5 - // controls how often NodeController will try to evict Pods from non-responsive Nodes. - nodeEvictionPeriod = 100 * time.Millisecond - // Burst value for all eviction rate limiters - evictionRateLimiterBurst = 1 // The amount of time the nodecontroller polls on the list nodes endpoint. apiserverStartupGracePeriod = 10 * time.Minute // The amount of time the nodecontroller should sleep between retrying NodeStatus updates @@ -111,7 +102,7 @@ type nodeStatusData struct { type NodeController struct { allocateNodeCIDRs bool - allocatorType CIDRAllocatorType + allocatorType ipam.CIDRAllocatorType cloud cloudprovider.Interface clusterCIDR *net.IPNet @@ -150,9 +141,9 @@ type NodeController struct { // Lock to access evictor workers evictorLock sync.Mutex // workers that evicts pods from unresponsive nodes. - zonePodEvictor map[string]*RateLimitedTimedQueue + zonePodEvictor map[string]*scheduler.RateLimitedTimedQueue // workers that are responsible for tainting nodes. - zoneNoExecuteTainer map[string]*RateLimitedTimedQueue + zoneNoExecuteTainer map[string]*scheduler.RateLimitedTimedQueue podEvictionTimeout time.Duration // The maximum duration before a pod evicted from a node can be forcefully terminated. maximumGracePeriod time.Duration @@ -166,9 +157,9 @@ type NodeController struct { podInformerSynced cache.InformerSynced - cidrAllocator CIDRAllocator + cidrAllocator ipam.CIDRAllocator - taintManager *NoExecuteTaintManager + taintManager *scheduler.NoExecuteTaintManager forcefullyDeletePod func(*v1.Pod) error nodeExistsInCloudProvider func(types.NodeName) (bool, error) @@ -213,7 +204,7 @@ func NewNodeController( serviceCIDR *net.IPNet, nodeCIDRMaskSize int, allocateNodeCIDRs bool, - allocatorType CIDRAllocatorType, + allocatorType ipam.CIDRAllocatorType, runTaintManager bool, useTaintBasedEvictions bool) (*NodeController, error) { eventBroadcaster := record.NewBroadcaster() @@ -247,8 +238,8 @@ func NewNodeController( recorder: recorder, podEvictionTimeout: podEvictionTimeout, maximumGracePeriod: 5 * time.Minute, - zonePodEvictor: make(map[string]*RateLimitedTimedQueue), - zoneNoExecuteTainer: make(map[string]*RateLimitedTimedQueue), + zonePodEvictor: make(map[string]*scheduler.RateLimitedTimedQueue), + zoneNoExecuteTainer: make(map[string]*scheduler.RateLimitedTimedQueue), nodeStatusMap: make(map[string]nodeStatusData), nodeMonitorGracePeriod: nodeMonitorGracePeriod, nodeMonitorPeriod: nodeMonitorPeriod, @@ -259,8 +250,8 @@ func NewNodeController( serviceCIDR: serviceCIDR, allocateNodeCIDRs: allocateNodeCIDRs, allocatorType: allocatorType, - forcefullyDeletePod: func(p *v1.Pod) error { return forcefullyDeletePod(kubeClient, p) }, - nodeExistsInCloudProvider: func(nodeName types.NodeName) (bool, error) { return nodeExistsInCloudProvider(cloud, nodeName) }, + forcefullyDeletePod: func(p *v1.Pod) error { return util.ForcefullyDeletePod(kubeClient, p) }, + nodeExistsInCloudProvider: func(nodeName types.NodeName) (bool, error) { return util.NodeExistsInCloudProvider(cloud, nodeName) }, evictionLimiterQPS: evictionLimiterQPS, secondaryEvictionLimiterQPS: secondaryEvictionLimiterQPS, largeClusterThreshold: largeClusterThreshold, @@ -334,11 +325,11 @@ func NewNodeController( } switch nc.allocatorType { - case RangeAllocatorType: - nc.cidrAllocator, err = NewCIDRRangeAllocator( + case ipam.RangeAllocatorType: + nc.cidrAllocator, err = ipam.NewCIDRRangeAllocator( kubeClient, clusterCIDR, serviceCIDR, nodeCIDRMaskSize, nodeList) - case CloudAllocatorType: - nc.cidrAllocator, err = NewCloudCIDRAllocator(kubeClient, cloud) + case ipam.CloudAllocatorType: + nc.cidrAllocator, err = ipam.NewCloudCIDRAllocator(kubeClient, cloud) default: return nil, fmt.Errorf("Invalid CIDR allocator type: %v", nc.allocatorType) } @@ -348,8 +339,8 @@ func NewNodeController( } nodeInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{ - AddFunc: createAddNodeHandler(nc.cidrAllocator.AllocateOrOccupyCIDR), - UpdateFunc: createUpdateNodeHandler(func(_, newNode *v1.Node) error { + AddFunc: util.CreateAddNodeHandler(nc.cidrAllocator.AllocateOrOccupyCIDR), + UpdateFunc: util.CreateUpdateNodeHandler(func(_, newNode *v1.Node) error { // If the PodCIDR is not empty we either: // - already processed a Node that already had a CIDR after NC restarted // (cidr is marked as used), @@ -374,26 +365,26 @@ func NewNodeController( } return nil }), - DeleteFunc: createDeleteNodeHandler(nc.cidrAllocator.ReleaseCIDR), + DeleteFunc: util.CreateDeleteNodeHandler(nc.cidrAllocator.ReleaseCIDR), }) } if nc.runTaintManager { nodeInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{ - AddFunc: createAddNodeHandler(func(node *v1.Node) error { + AddFunc: util.CreateAddNodeHandler(func(node *v1.Node) error { nc.taintManager.NodeUpdated(nil, node) return nil }), - UpdateFunc: createUpdateNodeHandler(func(oldNode, newNode *v1.Node) error { + UpdateFunc: util.CreateUpdateNodeHandler(func(oldNode, newNode *v1.Node) error { nc.taintManager.NodeUpdated(oldNode, newNode) return nil }), - DeleteFunc: createDeleteNodeHandler(func(node *v1.Node) error { + DeleteFunc: util.CreateDeleteNodeHandler(func(node *v1.Node) error { nc.taintManager.NodeUpdated(node, nil) return nil }), }) - nc.taintManager = NewNoExecuteTaintManager(kubeClient) + nc.taintManager = scheduler.NewNoExecuteTaintManager(kubeClient) } nc.nodeLister = nodeInformer.Lister() @@ -410,7 +401,7 @@ func (nc *NodeController) doEvictionPass() { defer nc.evictorLock.Unlock() for k := range nc.zonePodEvictor { // Function should return 'false' and a time after which it should be retried, or 'true' if it shouldn't (it succeeded). - nc.zonePodEvictor[k].Try(func(value TimedValue) (bool, time.Duration) { + nc.zonePodEvictor[k].Try(func(value scheduler.TimedValue) (bool, time.Duration) { node, err := nc.nodeLister.Get(value.Value) if apierrors.IsNotFound(err) { glog.Warningf("Node %v no longer present in nodeLister!", value.Value) @@ -421,7 +412,7 @@ func (nc *NodeController) doEvictionPass() { EvictionsNumber.WithLabelValues(zone).Inc() } nodeUid, _ := value.UID.(string) - remaining, err := deletePods(nc.kubeClient, nc.recorder, value.Value, nodeUid, nc.daemonSetStore) + remaining, err := util.DeletePods(nc.kubeClient, nc.recorder, value.Value, nodeUid, nc.daemonSetStore) if err != nil { utilruntime.HandleError(fmt.Errorf("unable to evict node %q: %v", value.Value, err)) return false, 0 @@ -439,7 +430,7 @@ func (nc *NodeController) doNoExecuteTaintingPass() { defer nc.evictorLock.Unlock() for k := range nc.zoneNoExecuteTainer { // Function should return 'false' and a time after which it should be retried, or 'true' if it shouldn't (it succeeded). - nc.zoneNoExecuteTainer[k].Try(func(value TimedValue) (bool, time.Duration) { + nc.zoneNoExecuteTainer[k].Try(func(value scheduler.TimedValue) (bool, time.Duration) { node, err := nc.nodeLister.Get(value.Value) if apierrors.IsNotFound(err) { glog.Warningf("Node %v no longer present in nodeLister!", value.Value) @@ -468,7 +459,7 @@ func (nc *NodeController) doNoExecuteTaintingPass() { return true, 0 } - return swapNodeControllerTaint(nc.kubeClient, &taintToAdd, &oppositeTaint, node), 0 + return util.SwapNodeControllerTaint(nc.kubeClient, &taintToAdd, &oppositeTaint, node), 0 }) } } @@ -498,12 +489,12 @@ func (nc *NodeController) Run(stopCh <-chan struct{}) { if nc.useTaintBasedEvictions { // Handling taint based evictions. Because we don't want a dedicated logic in TaintManager for NC-originated // taints and we normally don't rate limit evictions caused by taints, we need to rate limit adding taints. - go wait.Until(nc.doNoExecuteTaintingPass, nodeEvictionPeriod, wait.NeverStop) + go wait.Until(nc.doNoExecuteTaintingPass, scheduler.NodeEvictionPeriod, wait.NeverStop) } else { // Managing eviction of nodes: // When we delete pods off a node, if the node was not empty at the time we then // queue an eviction watcher. If we hit an error, retry deletion. - go wait.Until(nc.doEvictionPass, nodeEvictionPeriod, wait.NeverStop) + go wait.Until(nc.doEvictionPass, scheduler.NodeEvictionPeriod, wait.NeverStop) } <-stopCh @@ -516,12 +507,12 @@ func (nc *NodeController) addPodEvictorForNewZone(node *v1.Node) { nc.zoneStates[zone] = stateInitial if !nc.useTaintBasedEvictions { nc.zonePodEvictor[zone] = - NewRateLimitedTimedQueue( - flowcontrol.NewTokenBucketRateLimiter(nc.evictionLimiterQPS, evictionRateLimiterBurst)) + scheduler.NewRateLimitedTimedQueue( + flowcontrol.NewTokenBucketRateLimiter(nc.evictionLimiterQPS, scheduler.EvictionRateLimiterBurst)) } else { nc.zoneNoExecuteTainer[zone] = - NewRateLimitedTimedQueue( - flowcontrol.NewTokenBucketRateLimiter(nc.evictionLimiterQPS, evictionRateLimiterBurst)) + scheduler.NewRateLimitedTimedQueue( + flowcontrol.NewTokenBucketRateLimiter(nc.evictionLimiterQPS, scheduler.EvictionRateLimiterBurst)) } // Init the metric for the new zone. glog.Infof("Initializing eviction metric for zone: %v", zone) @@ -547,7 +538,7 @@ func (nc *NodeController) monitorNodeStatus() error { for i := range added { glog.V(1).Infof("NodeController observed a new Node: %#v", added[i].Name) - recordNodeEvent(nc.recorder, added[i].Name, string(added[i].UID), v1.EventTypeNormal, "RegisteredNode", fmt.Sprintf("Registered Node %v in NodeController", added[i].Name)) + util.RecordNodeEvent(nc.recorder, added[i].Name, string(added[i].UID), v1.EventTypeNormal, "RegisteredNode", fmt.Sprintf("Registered Node %v in NodeController", added[i].Name)) nc.knownNodeSet[added[i].Name] = added[i] nc.addPodEvictorForNewZone(added[i]) if nc.useTaintBasedEvictions { @@ -559,7 +550,7 @@ func (nc *NodeController) monitorNodeStatus() error { for i := range deleted { glog.V(1).Infof("NodeController observed a Node deletion: %v", deleted[i].Name) - recordNodeEvent(nc.recorder, deleted[i].Name, string(deleted[i].UID), v1.EventTypeNormal, "RemovingNode", fmt.Sprintf("Removing Node %v from NodeController", deleted[i].Name)) + util.RecordNodeEvent(nc.recorder, deleted[i].Name, string(deleted[i].UID), v1.EventTypeNormal, "RemovingNode", fmt.Sprintf("Removing Node %v from NodeController", deleted[i].Name)) delete(nc.knownNodeSet, deleted[i].Name) } @@ -574,7 +565,7 @@ func (nc *NodeController) monitorNodeStatus() error { continue } node := nodeCopy.(*v1.Node) - if err := wait.PollImmediate(retrySleepTime, retrySleepTime*nodeStatusUpdateRetry, func() (bool, error) { + if err := wait.PollImmediate(retrySleepTime, retrySleepTime*scheduler.NodeStatusUpdateRetry, func() (bool, error) { gracePeriod, observedReadyCondition, currentReadyCondition, err = nc.tryUpdateNodeStatus(node) if err == nil { return true, nil @@ -605,7 +596,7 @@ func (nc *NodeController) monitorNodeStatus() error { // We want to update the taint straight away if Node is already tainted with the UnreachableTaint if taintutils.TaintExists(node.Spec.Taints, UnreachableTaintTemplate) { taintToAdd := *NotReadyTaintTemplate - if !swapNodeControllerTaint(nc.kubeClient, &taintToAdd, UnreachableTaintTemplate, node) { + if !util.SwapNodeControllerTaint(nc.kubeClient, &taintToAdd, UnreachableTaintTemplate, node) { glog.Errorf("Failed to instantly swap UnreachableTaint to NotReadyTaint. Will try again in the next cycle.") } } else if nc.markNodeForTainting(node) { @@ -632,7 +623,7 @@ func (nc *NodeController) monitorNodeStatus() error { // We want to update the taint straight away if Node is already tainted with the UnreachableTaint if taintutils.TaintExists(node.Spec.Taints, NotReadyTaintTemplate) { taintToAdd := *UnreachableTaintTemplate - if !swapNodeControllerTaint(nc.kubeClient, &taintToAdd, NotReadyTaintTemplate, node) { + if !util.SwapNodeControllerTaint(nc.kubeClient, &taintToAdd, NotReadyTaintTemplate, node) { glog.Errorf("Failed to instantly swap UnreachableTaint to NotReadyTaint. Will try again in the next cycle.") } } else if nc.markNodeForTainting(node) { @@ -672,8 +663,8 @@ func (nc *NodeController) monitorNodeStatus() error { // Report node event. if currentReadyCondition.Status != v1.ConditionTrue && observedReadyCondition.Status == v1.ConditionTrue { - recordNodeStatusChange(nc.recorder, node, "NodeNotReady") - if err = markAllPodsNotReady(nc.kubeClient, node); err != nil { + util.RecordNodeStatusChange(nc.recorder, node, "NodeNotReady") + if err = util.MarkAllPodsNotReady(nc.kubeClient, node); err != nil { utilruntime.HandleError(fmt.Errorf("Unable to mark all pods NotReady on node %v: %v", node.Name, err)) } } @@ -688,13 +679,13 @@ func (nc *NodeController) monitorNodeStatus() error { } if !exists { glog.V(2).Infof("Deleting node (no longer present in cloud provider): %s", node.Name) - recordNodeEvent(nc.recorder, node.Name, string(node.UID), v1.EventTypeNormal, "DeletingNode", fmt.Sprintf("Deleting Node %v because it's not present according to cloud provider", node.Name)) + util.RecordNodeEvent(nc.recorder, node.Name, string(node.UID), v1.EventTypeNormal, "DeletingNode", fmt.Sprintf("Deleting Node %v because it's not present according to cloud provider", node.Name)) go func(nodeName string) { defer utilruntime.HandleCrash() // Kubelet is not reporting and Cloud Provider says node // is gone. Delete it without worrying about grace // periods. - if err := forcefullyDeleteNode(nc.kubeClient, nodeName); err != nil { + if err := util.ForcefullyDeleteNode(nc.kubeClient, nodeName); err != nil { glog.Errorf("Unable to forcefully delete node %q: %v", nodeName, err) } }(node.Name) @@ -1122,3 +1113,55 @@ func (nc *NodeController) ComputeZoneState(nodeReadyConditions []*v1.NodeConditi return notReadyNodes, stateNormal } } + +// maybeDeleteTerminatingPod non-gracefully deletes pods that are terminating +// that should not be gracefully terminated. +func (nc *NodeController) maybeDeleteTerminatingPod(obj interface{}) { + pod, ok := obj.(*v1.Pod) + if !ok { + tombstone, ok := obj.(cache.DeletedFinalStateUnknown) + if !ok { + glog.Errorf("Couldn't get object from tombstone %#v", obj) + return + } + pod, ok = tombstone.Obj.(*v1.Pod) + if !ok { + glog.Errorf("Tombstone contained object that is not a Pod %#v", obj) + return + } + } + + // consider only terminating pods + if pod.DeletionTimestamp == nil { + return + } + + node, err := nc.nodeLister.Get(pod.Spec.NodeName) + // if there is no such node, do nothing and let the podGC clean it up. + if apierrors.IsNotFound(err) { + return + } + if err != nil { + // this can only happen if the Store.KeyFunc has a problem creating + // a key for the pod. If it happens once, it will happen again so + // don't bother requeuing the pod. + utilruntime.HandleError(err) + return + } + + // delete terminating pods that have been scheduled on + // nodes that do not support graceful termination + // TODO(mikedanese): this can be removed when we no longer + // guarantee backwards compatibility of master API to kubelets with + // versions less than 1.1.0 + v, err := utilversion.ParseSemantic(node.Status.NodeInfo.KubeletVersion) + if err != nil { + glog.V(0).Infof("Couldn't parse version %q of node: %v", node.Status.NodeInfo.KubeletVersion, err) + utilruntime.HandleError(nc.forcefullyDeletePod(pod)) + return + } + if v.LessThan(gracefulDeletionVersion) { + utilruntime.HandleError(nc.forcefullyDeletePod(pod)) + return + } +} diff --git a/pkg/controller/node/nodecontroller_test.go b/pkg/controller/node/nodecontroller_test.go index e3782631786..33fcb911f04 100644 --- a/pkg/controller/node/nodecontroller_test.go +++ b/pkg/controller/node/nodecontroller_test.go @@ -39,6 +39,9 @@ import ( "k8s.io/kubernetes/pkg/cloudprovider" fakecloud "k8s.io/kubernetes/pkg/cloudprovider/providers/fake" "k8s.io/kubernetes/pkg/controller" + "k8s.io/kubernetes/pkg/controller/node/ipam" + "k8s.io/kubernetes/pkg/controller/node/scheduler" + "k8s.io/kubernetes/pkg/controller/node/util" "k8s.io/kubernetes/pkg/controller/testutil" kubeletapis "k8s.io/kubernetes/pkg/kubelet/apis" "k8s.io/kubernetes/pkg/util/node" @@ -103,7 +106,7 @@ func NewNodeControllerFromClient( serviceCIDR, nodeCIDRMaskSize, allocateNodeCIDRs, - RangeAllocatorType, + ipam.RangeAllocatorType, useTaints, useTaints, ) @@ -637,9 +640,9 @@ func TestMonitorNodeStatusEvictPods(t *testing.T) { zones := testutil.GetZones(item.fakeNodeHandler) for _, zone := range zones { if _, ok := nodeController.zonePodEvictor[zone]; ok { - nodeController.zonePodEvictor[zone].Try(func(value TimedValue) (bool, time.Duration) { + nodeController.zonePodEvictor[zone].Try(func(value scheduler.TimedValue) (bool, time.Duration) { nodeUid, _ := value.UID.(string) - deletePods(item.fakeNodeHandler, nodeController.recorder, value.Value, nodeUid, nodeController.daemonSetInformer.Lister()) + util.DeletePods(item.fakeNodeHandler, nodeController.recorder, value.Value, nodeUid, nodeController.daemonSetInformer.Lister()) return true, 0 }) } else { @@ -782,9 +785,9 @@ func TestPodStatusChange(t *testing.T) { } zones := testutil.GetZones(item.fakeNodeHandler) for _, zone := range zones { - nodeController.zonePodEvictor[zone].Try(func(value TimedValue) (bool, time.Duration) { + nodeController.zonePodEvictor[zone].Try(func(value scheduler.TimedValue) (bool, time.Duration) { nodeUid, _ := value.UID.(string) - deletePods(item.fakeNodeHandler, nodeController.recorder, value.Value, nodeUid, nodeController.daemonSetStore) + util.DeletePods(item.fakeNodeHandler, nodeController.recorder, value.Value, nodeUid, nodeController.daemonSetStore) return true, 0 }) } @@ -1337,9 +1340,9 @@ func (nc *nodeController) doEviction(fakeNodeHandler *testutil.FakeNodeHandler) var podEvicted bool zones := testutil.GetZones(fakeNodeHandler) for _, zone := range zones { - nc.zonePodEvictor[zone].Try(func(value TimedValue) (bool, time.Duration) { + nc.zonePodEvictor[zone].Try(func(value scheduler.TimedValue) (bool, time.Duration) { uid, _ := value.UID.(string) - deletePods(fakeNodeHandler, nc.recorder, value.Value, uid, nc.daemonSetStore) + util.DeletePods(fakeNodeHandler, nc.recorder, value.Value, uid, nc.daemonSetStore) return true, 0 }) } @@ -2310,7 +2313,7 @@ func TestCheckNodeKubeletVersionParsing(t *testing.T) { }, }, } - isOutdated := nodeRunningOutdatedKubelet(n) + isOutdated := util.NodeRunningOutdatedKubelet(n) if ov.outdated != isOutdated { t.Errorf("Version %v doesn't match test expectation. Expected outdated %v got %v", n.Status.NodeInfo.KubeletVersion, ov.outdated, isOutdated) } else { diff --git a/pkg/controller/node/scheduler/BUILD b/pkg/controller/node/scheduler/BUILD new file mode 100644 index 00000000000..017c5d5dbb5 --- /dev/null +++ b/pkg/controller/node/scheduler/BUILD @@ -0,0 +1,69 @@ +package(default_visibility = ["//visibility:public"]) + +licenses(["notice"]) + +load( + "@io_bazel_rules_go//go:def.bzl", + "go_library", + "go_test", +) + +go_test( + name = "go_default_test", + srcs = [ + "rate_limited_queue_test.go", + "taint_controller_test.go", + "timed_workers_test.go", + ], + library = ":go_default_library", + tags = ["automanaged"], + deps = [ + "//pkg/controller/testutil:go_default_library", + "//vendor/k8s.io/api/core/v1:go_default_library", + "//vendor/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library", + "//vendor/k8s.io/apimachinery/pkg/util/sets:go_default_library", + "//vendor/k8s.io/client-go/kubernetes/fake:go_default_library", + "//vendor/k8s.io/client-go/testing:go_default_library", + "//vendor/k8s.io/client-go/util/flowcontrol:go_default_library", + ], +) + +go_library( + name = "go_default_library", + srcs = [ + "rate_limited_queue.go", + "taint_controller.go", + "timed_workers.go", + ], + tags = ["automanaged"], + deps = [ + "//pkg/api/helper:go_default_library", + "//pkg/api/v1/helper:go_default_library", + "//vendor/github.com/golang/glog:go_default_library", + "//vendor/k8s.io/api/core/v1:go_default_library", + "//vendor/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library", + "//vendor/k8s.io/apimachinery/pkg/fields:go_default_library", + "//vendor/k8s.io/apimachinery/pkg/labels:go_default_library", + "//vendor/k8s.io/apimachinery/pkg/types:go_default_library", + "//vendor/k8s.io/apimachinery/pkg/util/sets:go_default_library", + "//vendor/k8s.io/client-go/kubernetes:go_default_library", + "//vendor/k8s.io/client-go/kubernetes/scheme:go_default_library", + "//vendor/k8s.io/client-go/kubernetes/typed/core/v1:go_default_library", + "//vendor/k8s.io/client-go/tools/record:go_default_library", + "//vendor/k8s.io/client-go/util/flowcontrol:go_default_library", + "//vendor/k8s.io/client-go/util/workqueue:go_default_library", + ], +) + +filegroup( + name = "package-srcs", + srcs = glob(["**"]), + tags = ["automanaged"], + visibility = ["//visibility:private"], +) + +filegroup( + name = "all-srcs", + srcs = [":package-srcs"], + tags = ["automanaged"], +) diff --git a/pkg/controller/node/rate_limited_queue.go b/pkg/controller/node/scheduler/rate_limited_queue.go similarity index 67% rename from pkg/controller/node/rate_limited_queue.go rename to pkg/controller/node/scheduler/rate_limited_queue.go index 05a1273f3d7..984443ff52b 100644 --- a/pkg/controller/node/rate_limited_queue.go +++ b/pkg/controller/node/scheduler/rate_limited_queue.go @@ -14,7 +14,7 @@ See the License for the specific language governing permissions and limitations under the License. */ -package node +package scheduler import ( "container/heap" @@ -27,6 +27,18 @@ import ( "github.com/golang/glog" ) +const ( + // NodeStatusUpdateRetry controls the number of retries of writing + // NodeStatus update. + NodeStatusUpdateRetry = 5 + // NodeEvictionPeriod controls how often NodeController will try to + // evict Pods from non-responsive Nodes. + NodeEvictionPeriod = 100 * time.Millisecond + // EvictionRateLimiterBurst is the burst value for all eviction rate + // limiters + EvictionRateLimiterBurst = 1 +) + // TimedValue is a value that should be processed at a designated time. type TimedValue struct { Value string @@ -37,19 +49,26 @@ type TimedValue struct { } // now is used to test time -var now func() time.Time = time.Now +var now = time.Now // TimedQueue is a priority heap where the lowest ProcessAt is at the front of the queue type TimedQueue []*TimedValue -func (h TimedQueue) Len() int { return len(h) } -func (h TimedQueue) Less(i, j int) bool { return h[i].ProcessAt.Before(h[j].ProcessAt) } -func (h TimedQueue) Swap(i, j int) { h[i], h[j] = h[j], h[i] } +// Len is the length of the queue. +func (h TimedQueue) Len() int { return len(h) } +// Less returns true if queue[i] < queue[j]. +func (h TimedQueue) Less(i, j int) bool { return h[i].ProcessAt.Before(h[j].ProcessAt) } + +// Swap swaps index i and j. +func (h TimedQueue) Swap(i, j int) { h[i], h[j] = h[j], h[i] } + +// Push a new TimedValue on to the queue. func (h *TimedQueue) Push(x interface{}) { *h = append(*h, x.(*TimedValue)) } +// Pop the lowest ProcessAt item. func (h *TimedQueue) Pop() interface{} { old := *h n := len(old) @@ -58,16 +77,17 @@ func (h *TimedQueue) Pop() interface{} { return x } -// A FIFO queue which additionally guarantees that any element can be added only once until -// it is removed. +// UniqueQueue is a FIFO queue which additionally guarantees that any +// element can be added only once until it is removed. type UniqueQueue struct { lock sync.Mutex queue TimedQueue set sets.String } -// Adds a new value to the queue if it wasn't added before, or was explicitly removed by the -// Remove call. Returns true if new value was added. +// Add a new value to the queue if it wasn't added before, or was +// explicitly removed by the Remove call. Returns true if new value +// was added. func (q *UniqueQueue) Add(value TimedValue) bool { q.lock.Lock() defer q.lock.Unlock() @@ -80,8 +100,9 @@ func (q *UniqueQueue) Add(value TimedValue) bool { return true } -// Replace replaces an existing value in the queue if it already exists, otherwise it does nothing. -// Returns true if the item was found. +// Replace replaces an existing value in the queue if it already +// exists, otherwise it does nothing. Returns true if the item was +// found. func (q *UniqueQueue) Replace(value TimedValue) bool { q.lock.Lock() defer q.lock.Unlock() @@ -97,8 +118,9 @@ func (q *UniqueQueue) Replace(value TimedValue) bool { return false } -// Removes the value from the queue, but keeps it in the set, so it won't be added second time. -// Returns true if something was removed. +// RemoveFromQueue the value from the queue, but keeps it in the set, +// so it won't be added second time. Returns true if something was +// removed. func (q *UniqueQueue) RemoveFromQueue(value string) bool { q.lock.Lock() defer q.lock.Unlock() @@ -115,8 +137,9 @@ func (q *UniqueQueue) RemoveFromQueue(value string) bool { return false } -// Removes the value from the queue, so Get() call won't return it, and allow subsequent addition -// of the given value. If the value is not present does nothing and returns false. +// Remove the value from the queue, so Get() call won't return it, and +// allow subsequent addition of the given value. If the value is not +// present does nothing and returns false. func (q *UniqueQueue) Remove(value string) bool { q.lock.Lock() defer q.lock.Unlock() @@ -134,7 +157,7 @@ func (q *UniqueQueue) Remove(value string) bool { return true } -// Returns the oldest added value that wasn't returned yet. +// Get returns the oldest added value that wasn't returned yet. func (q *UniqueQueue) Get() (TimedValue, bool) { q.lock.Lock() defer q.lock.Unlock() @@ -146,7 +169,8 @@ func (q *UniqueQueue) Get() (TimedValue, bool) { return *result, true } -// Head returns the oldest added value that wasn't returned yet without removing it. +// Head returns the oldest added value that wasn't returned yet +// without removing it. func (q *UniqueQueue) Head() (TimedValue, bool) { q.lock.Lock() defer q.lock.Unlock() @@ -157,7 +181,8 @@ func (q *UniqueQueue) Head() (TimedValue, bool) { return *result, true } -// Clear removes all items from the queue and duplication preventing set. +// Clear removes all items from the queue and duplication preventing +// set. func (q *UniqueQueue) Clear() { q.lock.Lock() defer q.lock.Unlock() @@ -169,15 +194,16 @@ func (q *UniqueQueue) Clear() { } } -// RateLimitedTimedQueue is a unique item priority queue ordered by the expected next time -// of execution. It is also rate limited. +// RateLimitedTimedQueue is a unique item priority queue ordered by +// the expected next time of execution. It is also rate limited. type RateLimitedTimedQueue struct { queue UniqueQueue limiterLock sync.Mutex limiter flowcontrol.RateLimiter } -// Creates new queue which will use given RateLimiter to oversee execution. +// NewRateLimitedTimedQueue creates new queue which will use given +// RateLimiter to oversee execution. func NewRateLimitedTimedQueue(limiter flowcontrol.RateLimiter) *RateLimitedTimedQueue { return &RateLimitedTimedQueue{ queue: UniqueQueue{ @@ -188,18 +214,21 @@ func NewRateLimitedTimedQueue(limiter flowcontrol.RateLimiter) *RateLimitedTimed } } -// ActionFunc takes a timed value and returns false if the item must be retried, with an optional -// time.Duration if some minimum wait interval should be used. +// ActionFunc takes a timed value and returns false if the item must +// be retried, with an optional time.Duration if some minimum wait +// interval should be used. type ActionFunc func(TimedValue) (bool, time.Duration) -// Try processes the queue. Ends prematurely if RateLimiter forbids an action and leak is true. -// Otherwise, requeues the item to be processed. Each value is processed once if fn returns true, -// otherwise it is added back to the queue. The returned remaining is used to identify the minimum -// time to execute the next item in the queue. The same value is processed only once unless -// Remove is explicitly called on it (it's done by the cancelPodEviction function in NodeController -// when Node becomes Ready again) -// TODO: figure out a good way to do garbage collection for all Nodes that were removed from -// the cluster. +// Try processes the queue.Ends prematurely if RateLimiter forbids an +// action and leak is true. Otherwise, requeues the item to be +// processed. Each value is processed once if fn returns true, +// otherwise it is added back to the queue. The returned remaining is +// used to identify the minimum time to execute the next item in the +// queue. The same value is processed only once unless Remove is +// explicitly called on it (it's done by the cancelPodEviction +// function in NodeController when Node becomes Ready again) TODO: +// figure out a good way to do garbage collection for all Nodes that +// were removed from the cluster. func (q *RateLimitedTimedQueue) Try(fn ActionFunc) { val, ok := q.queue.Head() q.limiterLock.Lock() @@ -227,8 +256,9 @@ func (q *RateLimitedTimedQueue) Try(fn ActionFunc) { } } -// Adds value to the queue to be processed. Won't add the same value(comparsion by value) a second time -// if it was already added and not removed. +// Add value to the queue to be processed. Won't add the same +// value(comparsion by value) a second time if it was already added +// and not removed. func (q *RateLimitedTimedQueue) Add(value string, uid interface{}) bool { now := now() return q.queue.Add(TimedValue{ @@ -239,17 +269,19 @@ func (q *RateLimitedTimedQueue) Add(value string, uid interface{}) bool { }) } -// Removes Node from the Evictor. The Node won't be processed until added again. +// Remove Node from the Evictor. The Node won't be processed until +// added again. func (q *RateLimitedTimedQueue) Remove(value string) bool { return q.queue.Remove(value) } -// Removes all items from the queue +// Clear removes all items from the queue func (q *RateLimitedTimedQueue) Clear() { q.queue.Clear() } -// SwapLimiter safely swaps current limiter for this queue with the passed one if capacities or qps's differ. +// SwapLimiter safely swaps current limiter for this queue with the +// passed one if capacities or qps's differ. func (q *RateLimitedTimedQueue) SwapLimiter(newQPS float32) { q.limiterLock.Lock() defer q.limiterLock.Unlock() @@ -260,7 +292,7 @@ func (q *RateLimitedTimedQueue) SwapLimiter(newQPS float32) { if newQPS <= 0 { newLimiter = flowcontrol.NewFakeNeverRateLimiter() } else { - newLimiter = flowcontrol.NewTokenBucketRateLimiter(newQPS, evictionRateLimiterBurst) + newLimiter = flowcontrol.NewTokenBucketRateLimiter(newQPS, EvictionRateLimiterBurst) } // If we're currently waiting on limiter, we drain the new one - this is a good approach when Burst value is 1 // TODO: figure out if we need to support higher Burst values and decide on the drain logic, should we keep: diff --git a/pkg/controller/node/rate_limited_queue_test.go b/pkg/controller/node/scheduler/rate_limited_queue_test.go similarity index 99% rename from pkg/controller/node/rate_limited_queue_test.go rename to pkg/controller/node/scheduler/rate_limited_queue_test.go index 6037c32e5b1..644b6569039 100644 --- a/pkg/controller/node/rate_limited_queue_test.go +++ b/pkg/controller/node/scheduler/rate_limited_queue_test.go @@ -14,7 +14,7 @@ See the License for the specific language governing permissions and limitations under the License. */ -package node +package scheduler import ( "reflect" diff --git a/pkg/controller/node/taint_controller.go b/pkg/controller/node/scheduler/taint_controller.go similarity index 98% rename from pkg/controller/node/taint_controller.go rename to pkg/controller/node/scheduler/taint_controller.go index 1603e433fff..9892defc5fe 100644 --- a/pkg/controller/node/taint_controller.go +++ b/pkg/controller/node/scheduler/taint_controller.go @@ -14,7 +14,7 @@ See the License for the specific language governing permissions and limitations under the License. */ -package node +package scheduler import ( "fmt" @@ -122,7 +122,7 @@ func getPodsAssignedToNode(c clientset.Interface, nodeName string) ([]v1.Pod, er time.Sleep(100 * time.Millisecond) } if err != nil { - return []v1.Pod{}, fmt.Errorf("Failed to get Pods assigned to node %v. Skipping update.", nodeName) + return []v1.Pod{}, fmt.Errorf("failed to get Pods assigned to node %v", nodeName) } return pods.Items, nil } @@ -325,9 +325,8 @@ func (tc *NoExecuteTaintManager) processPodOnNode( startTime = scheduledEviction.CreatedAt if startTime.Add(minTolerationTime).Before(triggerTime) { return - } else { - tc.cancelWorkWithEvent(podNamespacedName) } + tc.cancelWorkWithEvent(podNamespacedName) } tc.taintEvictionQueue.AddWork(NewWorkArgs(podNamespacedName.Name, podNamespacedName.Namespace), startTime, triggerTime) } diff --git a/pkg/controller/node/taint_controller_test.go b/pkg/controller/node/scheduler/taint_controller_test.go similarity index 99% rename from pkg/controller/node/taint_controller_test.go rename to pkg/controller/node/scheduler/taint_controller_test.go index 6ce5eee4705..1e6a0e2d5b4 100644 --- a/pkg/controller/node/taint_controller_test.go +++ b/pkg/controller/node/scheduler/taint_controller_test.go @@ -14,7 +14,7 @@ See the License for the specific language governing permissions and limitations under the License. */ -package node +package scheduler import ( "fmt" diff --git a/pkg/controller/node/timed_workers.go b/pkg/controller/node/scheduler/timed_workers.go similarity index 99% rename from pkg/controller/node/timed_workers.go rename to pkg/controller/node/scheduler/timed_workers.go index d61a63f84ec..2eef59b041b 100644 --- a/pkg/controller/node/timed_workers.go +++ b/pkg/controller/node/scheduler/timed_workers.go @@ -14,7 +14,7 @@ See the License for the specific language governing permissions and limitations under the License. */ -package node +package scheduler import ( "sync" diff --git a/pkg/controller/node/timed_workers_test.go b/pkg/controller/node/scheduler/timed_workers_test.go similarity index 99% rename from pkg/controller/node/timed_workers_test.go rename to pkg/controller/node/scheduler/timed_workers_test.go index 08761371dba..6003b07a697 100644 --- a/pkg/controller/node/timed_workers_test.go +++ b/pkg/controller/node/scheduler/timed_workers_test.go @@ -14,7 +14,7 @@ See the License for the specific language governing permissions and limitations under the License. */ -package node +package scheduler import ( "sync" diff --git a/pkg/controller/node/util/BUILD b/pkg/controller/node/util/BUILD new file mode 100644 index 00000000000..8db6c485c4b --- /dev/null +++ b/pkg/controller/node/util/BUILD @@ -0,0 +1,48 @@ +package(default_visibility = ["//visibility:public"]) + +licenses(["notice"]) + +load( + "@io_bazel_rules_go//go:def.bzl", + "go_library", +) + +go_library( + name = "go_default_library", + srcs = ["controller_utils.go"], + tags = ["automanaged"], + deps = [ + "//pkg/api:go_default_library", + "//pkg/cloudprovider:go_default_library", + "//pkg/controller:go_default_library", + "//pkg/kubelet/util/format:go_default_library", + "//pkg/util/node:go_default_library", + "//pkg/util/version:go_default_library", + "//vendor/github.com/golang/glog:go_default_library", + "//vendor/k8s.io/api/core/v1:go_default_library", + "//vendor/k8s.io/apimachinery/pkg/api/errors:go_default_library", + "//vendor/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library", + "//vendor/k8s.io/apimachinery/pkg/fields:go_default_library", + "//vendor/k8s.io/apimachinery/pkg/types:go_default_library", + "//vendor/k8s.io/apimachinery/pkg/util/errors:go_default_library", + "//vendor/k8s.io/apimachinery/pkg/util/runtime:go_default_library", + "//vendor/k8s.io/client-go/kubernetes:go_default_library", + "//vendor/k8s.io/client-go/kubernetes/scheme:go_default_library", + "//vendor/k8s.io/client-go/listers/extensions/v1beta1:go_default_library", + "//vendor/k8s.io/client-go/tools/cache:go_default_library", + "//vendor/k8s.io/client-go/tools/record:go_default_library", + ], +) + +filegroup( + name = "package-srcs", + srcs = glob(["**"]), + tags = ["automanaged"], + visibility = ["//visibility:private"], +) + +filegroup( + name = "all-srcs", + srcs = [":package-srcs"], + tags = ["automanaged"], +) diff --git a/pkg/controller/node/controller_utils.go b/pkg/controller/node/util/controller_utils.go similarity index 72% rename from pkg/controller/node/controller_utils.go rename to pkg/controller/node/util/controller_utils.go index 9b5666d5861..8365a0686a9 100644 --- a/pkg/controller/node/controller_utils.go +++ b/pkg/controller/node/util/controller_utils.go @@ -14,13 +14,14 @@ See the License for the specific language governing permissions and limitations under the License. */ -package node +package util import ( + "errors" "fmt" "strings" - "k8s.io/apimachinery/pkg/api/errors" + apierrors "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/fields" "k8s.io/apimachinery/pkg/types" @@ -44,9 +45,20 @@ import ( "github.com/golang/glog" ) -// deletePods will delete all pods from master running on given node, and return true -// if any pods were deleted, or were found pending deletion. -func deletePods(kubeClient clientset.Interface, recorder record.EventRecorder, nodeName, nodeUID string, daemonStore extensionslisters.DaemonSetLister) (bool, error) { +var ( + // ErrCloudInstance occurs when the cloud provider does not support + // the Instances API. + ErrCloudInstance = errors.New("cloud provider doesn't support instances") + // podStatusReconciliationVersion is the the minimum kubelet version + // for which the nodecontroller can safely flip pod.Status to + // NotReady. + podStatusReconciliationVersion = utilversion.MustParseSemantic("v1.2.0") +) + +// DeletePods will delete all pods from master running on given node, +// and return true if any pods were deleted, or were found pending +// deletion. +func DeletePods(kubeClient clientset.Interface, recorder record.EventRecorder, nodeName, nodeUID string, daemonStore extensionslisters.DaemonSetLister) (bool, error) { remaining := false selector := fields.OneTermEqualSelector(api.PodHostField, nodeName).String() options := metav1.ListOptions{FieldSelector: selector} @@ -58,7 +70,7 @@ func deletePods(kubeClient clientset.Interface, recorder record.EventRecorder, n } if len(pods.Items) > 0 { - recordNodeEvent(recorder, nodeName, nodeUID, v1.EventTypeNormal, "DeletingAllPods", fmt.Sprintf("Deleting all Pods from Node %v.", nodeName)) + RecordNodeEvent(recorder, nodeName, nodeUID, v1.EventTypeNormal, "DeletingAllPods", fmt.Sprintf("Deleting all Pods from Node %v.", nodeName)) } for _, pod := range pods.Items { @@ -68,8 +80,8 @@ func deletePods(kubeClient clientset.Interface, recorder record.EventRecorder, n } // Set reason and message in the pod object. - if _, err = setPodTerminationReason(kubeClient, &pod, nodeName); err != nil { - if errors.IsConflict(err) { + if _, err = SetPodTerminationReason(kubeClient, &pod, nodeName); err != nil { + if apierrors.IsConflict(err) { updateErrList = append(updateErrList, fmt.Errorf("update status failed for pod %q: %v", format.Pod(&pod), err)) continue @@ -100,9 +112,10 @@ func deletePods(kubeClient clientset.Interface, recorder record.EventRecorder, n return remaining, nil } -// setPodTerminationReason attempts to set a reason and message in the pod status, updates it in the apiserver, -// and returns an error if it encounters one. -func setPodTerminationReason(kubeClient clientset.Interface, pod *v1.Pod, nodeName string) (*v1.Pod, error) { +// SetPodTerminationReason attempts to set a reason and message in the +// pod status, updates it in the apiserver, and returns an error if it +// encounters one. +func SetPodTerminationReason(kubeClient clientset.Interface, pod *v1.Pod, nodeName string) (*v1.Pod, error) { if pod.Status.Reason == nodepkg.NodeUnreachablePodReason { return pod, nil } @@ -118,7 +131,8 @@ func setPodTerminationReason(kubeClient clientset.Interface, pod *v1.Pod, nodeNa return updatedPod, nil } -func forcefullyDeletePod(c clientset.Interface, pod *v1.Pod) error { +// ForcefullyDeletePod deletes the pod immediately. +func ForcefullyDeletePod(c clientset.Interface, pod *v1.Pod) error { var zero int64 glog.Infof("NodeController is force deleting Pod: %v:%v", pod.Namespace, pod.Name) err := c.Core().Pods(pod.Namespace).Delete(pod.Name, &metav1.DeleteOptions{GracePeriodSeconds: &zero}) @@ -128,75 +142,23 @@ func forcefullyDeletePod(c clientset.Interface, pod *v1.Pod) error { return err } -// forcefullyDeleteNode immediately the node. The pods on the node are cleaned -// up by the podGC. -func forcefullyDeleteNode(kubeClient clientset.Interface, nodeName string) error { +// ForcefullyDeleteNode deletes the node immediately. The pods on the +// node are cleaned up by the podGC. +func ForcefullyDeleteNode(kubeClient clientset.Interface, nodeName string) error { if err := kubeClient.Core().Nodes().Delete(nodeName, nil); err != nil { return fmt.Errorf("unable to delete node %q: %v", nodeName, err) } return nil } -// maybeDeleteTerminatingPod non-gracefully deletes pods that are terminating -// that should not be gracefully terminated. -func (nc *NodeController) maybeDeleteTerminatingPod(obj interface{}) { - pod, ok := obj.(*v1.Pod) - if !ok { - tombstone, ok := obj.(cache.DeletedFinalStateUnknown) - if !ok { - glog.Errorf("Couldn't get object from tombstone %#v", obj) - return - } - pod, ok = tombstone.Obj.(*v1.Pod) - if !ok { - glog.Errorf("Tombstone contained object that is not a Pod %#v", obj) - return - } - } - - // consider only terminating pods - if pod.DeletionTimestamp == nil { - return - } - - node, err := nc.nodeLister.Get(pod.Spec.NodeName) - // if there is no such node, do nothing and let the podGC clean it up. - if errors.IsNotFound(err) { - return - } - if err != nil { - // this can only happen if the Store.KeyFunc has a problem creating - // a key for the pod. If it happens once, it will happen again so - // don't bother requeuing the pod. - utilruntime.HandleError(err) - return - } - - // delete terminating pods that have been scheduled on - // nodes that do not support graceful termination - // TODO(mikedanese): this can be removed when we no longer - // guarantee backwards compatibility of master API to kubelets with - // versions less than 1.1.0 - v, err := utilversion.ParseSemantic(node.Status.NodeInfo.KubeletVersion) - if err != nil { - glog.V(0).Infof("Couldn't parse version %q of node: %v", node.Status.NodeInfo.KubeletVersion, err) - utilruntime.HandleError(nc.forcefullyDeletePod(pod)) - return - } - if v.LessThan(gracefulDeletionVersion) { - utilruntime.HandleError(nc.forcefullyDeletePod(pod)) - return - } -} - -// update ready status of all pods running on given node from master -// return true if success -func markAllPodsNotReady(kubeClient clientset.Interface, node *v1.Node) error { +// MarkAllPodsNotReady updates ready status of all pods running on +// given node from master return true if success +func MarkAllPodsNotReady(kubeClient clientset.Interface, node *v1.Node) error { // Don't set pods to NotReady if the kubelet is running a version that // doesn't understand how to correct readiness. // TODO: Remove this check when we no longer guarantee backward compatibility // with node versions < 1.2.0. - if nodeRunningOutdatedKubelet(node) { + if NodeRunningOutdatedKubelet(node) { return nil } nodeName := node.Name @@ -233,11 +195,11 @@ func markAllPodsNotReady(kubeClient clientset.Interface, node *v1.Node) error { return fmt.Errorf("%v", strings.Join(errMsg, "; ")) } -// nodeRunningOutdatedKubelet returns true if the kubeletVersion reported +// NodeRunningOutdatedKubelet returns true if the kubeletVersion reported // in the nodeInfo of the given node is "outdated", meaning < 1.2.0. // Older versions were inflexible and modifying pod.Status directly through // the apiserver would result in unexpected outcomes. -func nodeRunningOutdatedKubelet(node *v1.Node) bool { +func NodeRunningOutdatedKubelet(node *v1.Node) bool { v, err := utilversion.ParseSemantic(node.Status.NodeInfo.KubeletVersion) if err != nil { glog.Errorf("couldn't parse version %q of node %v", node.Status.NodeInfo.KubeletVersion, err) @@ -250,7 +212,9 @@ func nodeRunningOutdatedKubelet(node *v1.Node) bool { return false } -func nodeExistsInCloudProvider(cloud cloudprovider.Interface, nodeName types.NodeName) (bool, error) { +// NodeExistsInCloudProvider returns true if the node exists in the +// cloud provider. +func NodeExistsInCloudProvider(cloud cloudprovider.Interface, nodeName types.NodeName) (bool, error) { instances, ok := cloud.Instances() if !ok { return false, fmt.Errorf("%v", ErrCloudInstance) @@ -264,7 +228,8 @@ func nodeExistsInCloudProvider(cloud cloudprovider.Interface, nodeName types.Nod return true, nil } -func recordNodeEvent(recorder record.EventRecorder, nodeName, nodeUID, eventtype, reason, event string) { +// RecordNodeEvent records a event related to a node. +func RecordNodeEvent(recorder record.EventRecorder, nodeName, nodeUID, eventtype, reason, event string) { ref := &v1.ObjectReference{ Kind: "Node", Name: nodeName, @@ -275,21 +240,23 @@ func recordNodeEvent(recorder record.EventRecorder, nodeName, nodeUID, eventtype recorder.Eventf(ref, eventtype, reason, "Node %s event: %s", nodeName, event) } -func recordNodeStatusChange(recorder record.EventRecorder, node *v1.Node, new_status string) { +// RecordNodeStatusChange records a event related to a node status change. +func RecordNodeStatusChange(recorder record.EventRecorder, node *v1.Node, newStatus string) { ref := &v1.ObjectReference{ Kind: "Node", Name: node.Name, UID: node.UID, Namespace: "", } - glog.V(2).Infof("Recording status change %s event message for node %s", new_status, node.Name) + glog.V(2).Infof("Recording status change %s event message for node %s", newStatus, node.Name) // TODO: This requires a transaction, either both node status is updated // and event is recorded or neither should happen, see issue #6055. - recorder.Eventf(ref, v1.EventTypeNormal, new_status, "Node %s status is now: %s", node.Name, new_status) + recorder.Eventf(ref, v1.EventTypeNormal, newStatus, "Node %s status is now: %s", node.Name, newStatus) } -// Returns true in case of success and false otherwise -func swapNodeControllerTaint(kubeClient clientset.Interface, taintToAdd, taintToRemove *v1.Taint, node *v1.Node) bool { +// SwapNodeControllerTaint returns true in case of success and false +// otherwise. +func SwapNodeControllerTaint(kubeClient clientset.Interface, taintToAdd, taintToRemove *v1.Taint, node *v1.Node) bool { taintToAdd.TimeAdded = metav1.Now() err := controller.AddOrUpdateTaintOnNode(kubeClient, node.Name, taintToAdd) if err != nil { @@ -317,7 +284,8 @@ func swapNodeControllerTaint(kubeClient clientset.Interface, taintToAdd, taintTo return true } -func createAddNodeHandler(f func(node *v1.Node) error) func(obj interface{}) { +// CreateAddNodeHandler creates an add node handler. +func CreateAddNodeHandler(f func(node *v1.Node) error) func(obj interface{}) { return func(originalObj interface{}) { obj, err := scheme.Scheme.DeepCopy(originalObj) if err != nil { @@ -332,7 +300,8 @@ func createAddNodeHandler(f func(node *v1.Node) error) func(obj interface{}) { } } -func createUpdateNodeHandler(f func(oldNode, newNode *v1.Node) error) func(oldObj, newObj interface{}) { +// CreateUpdateNodeHandler creates a node update handler. +func CreateUpdateNodeHandler(f func(oldNode, newNode *v1.Node) error) func(oldObj, newObj interface{}) { return func(origOldObj, origNewObj interface{}) { oldObj, err := scheme.Scheme.DeepCopy(origOldObj) if err != nil { @@ -353,7 +322,8 @@ func createUpdateNodeHandler(f func(oldNode, newNode *v1.Node) error) func(oldOb } } -func createDeleteNodeHandler(f func(node *v1.Node) error) func(obj interface{}) { +// CreateDeleteNodeHandler creates a delete node handler. +func CreateDeleteNodeHandler(f func(node *v1.Node) error) func(obj interface{}) { return func(originalObj interface{}) { obj, err := scheme.Scheme.DeepCopy(originalObj) if err != nil {