Merge pull request #50337 from bowei/node-controller-repackage

Automatic merge from submit-queue (batch tested with PRs 50016, 49583, 49930, 46254, 50337)

Break up node controller into packages

This change does NO actual code changes other than moving constituent
parts into packages.

```release-note
NONE
```
This commit is contained in:
Kubernetes Submit Queue 2017-08-09 14:14:35 -07:00 committed by GitHub
commit fdc65025ee
23 changed files with 567 additions and 286 deletions

View File

@ -63,6 +63,7 @@ go_library(
"//pkg/controller/job:go_default_library",
"//pkg/controller/namespace:go_default_library",
"//pkg/controller/node:go_default_library",
"//pkg/controller/node/ipam:go_default_library",
"//pkg/controller/podautoscaler:go_default_library",
"//pkg/controller/podautoscaler/metrics:go_default_library",
"//pkg/controller/podgc:go_default_library",

View File

@ -42,6 +42,7 @@ import (
"k8s.io/kubernetes/pkg/controller/garbagecollector"
namespacecontroller "k8s.io/kubernetes/pkg/controller/namespace"
nodecontroller "k8s.io/kubernetes/pkg/controller/node"
"k8s.io/kubernetes/pkg/controller/node/ipam"
"k8s.io/kubernetes/pkg/controller/podgc"
replicationcontroller "k8s.io/kubernetes/pkg/controller/replication"
resourcequotacontroller "k8s.io/kubernetes/pkg/controller/resourcequota"
@ -108,7 +109,7 @@ func startNodeController(ctx ControllerContext) (bool, error) {
serviceCIDR,
int(ctx.Options.NodeCIDRMaskSize),
ctx.Options.AllocateNodeCIDRs,
nodecontroller.CIDRAllocatorType(ctx.Options.CIDRAllocatorType),
ipam.CIDRAllocatorType(ctx.Options.CIDRAllocatorType),
ctx.Options.EnableTaintManager,
utilfeature.DefaultFeatureGate.Enabled(features.TaintBasedEvictions),
)

View File

@ -10,25 +10,20 @@ load(
go_test(
name = "go_default_test",
srcs = [
"cidr_allocator_test.go",
"cidr_set_test.go",
"nodecontroller_test.go",
"rate_limited_queue_test.go",
"taint_controller_test.go",
"timed_workers_test.go",
],
srcs = ["nodecontroller_test.go"],
library = ":go_default_library",
tags = ["automanaged"],
deps = [
"//pkg/cloudprovider:go_default_library",
"//pkg/cloudprovider/providers/fake:go_default_library",
"//pkg/controller:go_default_library",
"//pkg/controller/node/ipam:go_default_library",
"//pkg/controller/node/scheduler:go_default_library",
"//pkg/controller/node/util:go_default_library",
"//pkg/controller/testutil:go_default_library",
"//pkg/kubelet/apis:go_default_library",
"//pkg/util/node:go_default_library",
"//pkg/util/taints:go_default_library",
"//vendor/github.com/golang/glog:go_default_library",
"//vendor/k8s.io/api/core/v1:go_default_library",
"//vendor/k8s.io/api/extensions/v1beta1:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/api/equality:go_default_library",
@ -36,7 +31,6 @@ go_test(
"//vendor/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/types:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/util/diff:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/util/sets:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/util/wait:go_default_library",
"//vendor/k8s.io/client-go/informers:go_default_library",
"//vendor/k8s.io/client-go/informers/core/v1:go_default_library",
@ -44,35 +38,24 @@ go_test(
"//vendor/k8s.io/client-go/kubernetes:go_default_library",
"//vendor/k8s.io/client-go/kubernetes/fake:go_default_library",
"//vendor/k8s.io/client-go/testing:go_default_library",
"//vendor/k8s.io/client-go/util/flowcontrol:go_default_library",
],
)
go_library(
name = "go_default_library",
srcs = [
"cidr_allocator.go",
"cidr_set.go",
"cloud_cidr_allocator.go",
"controller_utils.go",
"doc.go",
"metrics.go",
"node_controller.go",
"range_allocator.go",
"rate_limited_queue.go",
"taint_controller.go",
"timed_workers.go",
],
tags = ["automanaged"],
deps = [
"//pkg/api:go_default_library",
"//pkg/api/helper:go_default_library",
"//pkg/api/v1/helper:go_default_library",
"//pkg/api/v1/node:go_default_library",
"//pkg/cloudprovider:go_default_library",
"//pkg/cloudprovider/providers/gce:go_default_library",
"//pkg/controller:go_default_library",
"//pkg/kubelet/util/format:go_default_library",
"//pkg/controller/node/ipam:go_default_library",
"//pkg/controller/node/scheduler:go_default_library",
"//pkg/controller/node/util:go_default_library",
"//pkg/util/metrics:go_default_library",
"//pkg/util/node:go_default_library",
"//pkg/util/system:go_default_library",
@ -88,9 +71,7 @@ go_library(
"//vendor/k8s.io/apimachinery/pkg/fields:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/labels:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/types:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/util/errors:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/util/runtime:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/util/sets:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/util/wait:go_default_library",
"//vendor/k8s.io/client-go/informers/core/v1:go_default_library",
"//vendor/k8s.io/client-go/informers/extensions/v1beta1:go_default_library",
@ -102,7 +83,6 @@ go_library(
"//vendor/k8s.io/client-go/tools/cache:go_default_library",
"//vendor/k8s.io/client-go/tools/record:go_default_library",
"//vendor/k8s.io/client-go/util/flowcontrol:go_default_library",
"//vendor/k8s.io/client-go/util/workqueue:go_default_library",
],
)
@ -115,6 +95,11 @@ filegroup(
filegroup(
name = "all-srcs",
srcs = [":package-srcs"],
srcs = [
":package-srcs",
"//pkg/controller/node/ipam:all-srcs",
"//pkg/controller/node/scheduler:all-srcs",
"//pkg/controller/node/util:all-srcs",
],
tags = ["automanaged"],
)

View File

@ -0,0 +1,68 @@
package(default_visibility = ["//visibility:public"])
licenses(["notice"])
load(
"@io_bazel_rules_go//go:def.bzl",
"go_library",
"go_test",
)
go_test(
name = "go_default_test",
srcs = ["cidr_allocator_test.go"],
library = ":go_default_library",
tags = ["automanaged"],
deps = [
"//pkg/controller/testutil:go_default_library",
"//vendor/k8s.io/api/core/v1:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/util/wait:go_default_library",
"//vendor/k8s.io/client-go/kubernetes/fake:go_default_library",
],
)
go_library(
name = "go_default_library",
srcs = [
"cidr_allocator.go",
"cloud_cidr_allocator.go",
"range_allocator.go",
],
tags = ["automanaged"],
deps = [
"//pkg/api:go_default_library",
"//pkg/cloudprovider:go_default_library",
"//pkg/cloudprovider/providers/gce:go_default_library",
"//pkg/controller/node/ipam/cidrset:go_default_library",
"//pkg/controller/node/util:go_default_library",
"//pkg/util/node:go_default_library",
"//vendor/github.com/golang/glog:go_default_library",
"//vendor/k8s.io/api/core/v1:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/api/errors:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/types:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/util/sets:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/util/wait:go_default_library",
"//vendor/k8s.io/client-go/kubernetes:go_default_library",
"//vendor/k8s.io/client-go/kubernetes/scheme:go_default_library",
"//vendor/k8s.io/client-go/kubernetes/typed/core/v1:go_default_library",
"//vendor/k8s.io/client-go/tools/record:go_default_library",
],
)
filegroup(
name = "package-srcs",
srcs = glob(["**"]),
tags = ["automanaged"],
visibility = ["//visibility:private"],
)
filegroup(
name = "all-srcs",
srcs = [
":package-srcs",
"//pkg/controller/node/ipam/cidrset:all-srcs",
],
tags = ["automanaged"],
)

View File

@ -0,0 +1,7 @@
approvers:
- bowei
- dnardo
reviewers:
- bowei
- dnardo
- freehan

View File

@ -14,18 +14,14 @@ See the License for the specific language governing permissions and
limitations under the License.
*/
package node
package ipam
import (
"errors"
"net"
v1 "k8s.io/api/core/v1"
)
var errCIDRRangeNoCIDRsRemaining = errors.New(
"CIDR allocation failed; there are no remaining CIDRs left to allocate in the accepted range")
type nodeAndCIDR struct {
cidr *net.IPNet
nodeName string
@ -35,7 +31,11 @@ type nodeAndCIDR struct {
type CIDRAllocatorType string
const (
// RangeAllocatorType is the allocator that uses an internal CIDR
// range allocator to do node CIDR range allocations.
RangeAllocatorType CIDRAllocatorType = "RangeAllocator"
// CloudAllocatorType is the allocator that uses cloud platform
// support to do node CIDR range allocations.
CloudAllocatorType CIDRAllocatorType = "CloudAllocator"
)

View File

@ -14,7 +14,7 @@ See the License for the specific language governing permissions and
limitations under the License.
*/
package node
package ipam
import (
"net"
@ -143,7 +143,7 @@ func TestAllocateOrOccupyCIDRSuccess(t *testing.T) {
return
}
rangeAllocator.recorder = testutil.NewFakeRecorder()
if err = rangeAllocator.cidrs.occupy(cidr); err != nil {
if err = rangeAllocator.cidrs.Occupy(cidr); err != nil {
t.Fatalf("%v: unexpected error when occupying CIDR %v: %v", tc.description, allocated, err)
}
}
@ -225,7 +225,7 @@ func TestAllocateOrOccupyCIDRFailure(t *testing.T) {
return
}
rangeAllocator.recorder = testutil.NewFakeRecorder()
err = rangeAllocator.cidrs.occupy(cidr)
err = rangeAllocator.cidrs.Occupy(cidr)
if err != nil {
t.Fatalf("%v: unexpected error when occupying CIDR %v: %v", tc.description, allocated, err)
}
@ -337,7 +337,7 @@ func TestReleaseCIDRSuccess(t *testing.T) {
return
}
rangeAllocator.recorder = testutil.NewFakeRecorder()
err = rangeAllocator.cidrs.occupy(cidr)
err = rangeAllocator.cidrs.Occupy(cidr)
if err != nil {
t.Fatalf("%v: unexpected error when occupying CIDR %v: %v", tc.description, allocated, err)
}

View File

@ -0,0 +1,36 @@
package(default_visibility = ["//visibility:public"])
licenses(["notice"])
load(
"@io_bazel_rules_go//go:def.bzl",
"go_library",
"go_test",
)
go_test(
name = "go_default_test",
srcs = ["cidr_set_test.go"],
library = ":go_default_library",
tags = ["automanaged"],
deps = ["//vendor/github.com/golang/glog:go_default_library"],
)
go_library(
name = "go_default_library",
srcs = ["cidr_set.go"],
tags = ["automanaged"],
)
filegroup(
name = "package-srcs",
srcs = glob(["**"]),
tags = ["automanaged"],
visibility = ["//visibility:private"],
)
filegroup(
name = "all-srcs",
srcs = [":package-srcs"],
tags = ["automanaged"],
)

View File

@ -14,17 +14,20 @@ See the License for the specific language governing permissions and
limitations under the License.
*/
package node
package cidrset
import (
"encoding/binary"
"errors"
"fmt"
"math/big"
"net"
"sync"
)
type cidrSet struct {
// CidrSet manages a set of CIDR ranges from which blocks of IPs can
// be allocated from.
type CidrSet struct {
sync.Mutex
clusterCIDR *net.IPNet
clusterIP net.IP
@ -44,7 +47,15 @@ const (
maxPrefixLength = 64
)
func newCIDRSet(clusterCIDR *net.IPNet, subNetMaskSize int) *cidrSet {
var (
// ErrCIDRRangeNoCIDRsRemaining occurs when there are no more space
// to allocate CIDR ranges.
ErrCIDRRangeNoCIDRsRemaining = errors.New(
"CIDR allocation failed; there are no remaining CIDRs left to allocate in the accepted range")
)
// NewCIDRSet creates a new CidrSet.
func NewCIDRSet(clusterCIDR *net.IPNet, subNetMaskSize int) *CidrSet {
clusterMask := clusterCIDR.Mask
clusterMaskSize, _ := clusterMask.Size()
@ -54,7 +65,7 @@ func newCIDRSet(clusterCIDR *net.IPNet, subNetMaskSize int) *cidrSet {
} else {
maxCIDRs = 1 << uint32(subNetMaskSize-clusterMaskSize)
}
return &cidrSet{
return &CidrSet{
clusterCIDR: clusterCIDR,
clusterIP: clusterCIDR.IP,
clusterMaskSize: clusterMaskSize,
@ -63,7 +74,7 @@ func newCIDRSet(clusterCIDR *net.IPNet, subNetMaskSize int) *cidrSet {
}
}
func (s *cidrSet) indexToCIDRBlock(index int) *net.IPNet {
func (s *CidrSet) indexToCIDRBlock(index int) *net.IPNet {
var ip []byte
var mask int
switch /*v4 or v6*/ {
@ -91,7 +102,8 @@ func (s *cidrSet) indexToCIDRBlock(index int) *net.IPNet {
}
}
func (s *cidrSet) allocateNext() (*net.IPNet, error) {
// AllocateNext allocates the next free CIDR range.
func (s *CidrSet) AllocateNext() (*net.IPNet, error) {
s.Lock()
defer s.Unlock()
@ -104,7 +116,7 @@ func (s *cidrSet) allocateNext() (*net.IPNet, error) {
}
}
if nextUnused == -1 {
return nil, errCIDRRangeNoCIDRsRemaining
return nil, ErrCIDRRangeNoCIDRsRemaining
}
s.nextCandidate = (nextUnused + 1) % s.maxCIDRs
@ -113,7 +125,7 @@ func (s *cidrSet) allocateNext() (*net.IPNet, error) {
return s.indexToCIDRBlock(nextUnused), nil
}
func (s *cidrSet) getBeginingAndEndIndices(cidr *net.IPNet) (begin, end int, err error) {
func (s *CidrSet) getBeginingAndEndIndices(cidr *net.IPNet) (begin, end int, err error) {
begin, end = 0, s.maxCIDRs-1
cidrMask := cidr.Mask
maskSize, _ := cidrMask.Size()
@ -160,7 +172,8 @@ func (s *cidrSet) getBeginingAndEndIndices(cidr *net.IPNet) (begin, end int, err
return begin, end, nil
}
func (s *cidrSet) release(cidr *net.IPNet) error {
// Release releases the given CIDR range.
func (s *CidrSet) Release(cidr *net.IPNet) error {
begin, end, err := s.getBeginingAndEndIndices(cidr)
if err != nil {
return err
@ -173,7 +186,8 @@ func (s *cidrSet) release(cidr *net.IPNet) error {
return nil
}
func (s *cidrSet) occupy(cidr *net.IPNet) (err error) {
// Occupy marks the given CIDR range as used.
func (s *CidrSet) Occupy(cidr *net.IPNet) (err error) {
begin, end, err := s.getBeginingAndEndIndices(cidr)
if err != nil {
return err
@ -188,7 +202,7 @@ func (s *cidrSet) occupy(cidr *net.IPNet) (err error) {
return nil
}
func (s *cidrSet) getIndexForCIDR(cidr *net.IPNet) (int, error) {
func (s *CidrSet) getIndexForCIDR(cidr *net.IPNet) (int, error) {
var cidrIndex uint32
if cidr.IP.To4() != nil {
cidrIndex = (binary.BigEndian.Uint32(s.clusterIP) ^ binary.BigEndian.Uint32(cidr.IP.To4())) >> uint32(32-s.subNetMaskSize)

View File

@ -14,7 +14,7 @@ See the License for the specific language governing permissions and
limitations under the License.
*/
package node
package cidrset
import (
"math/big"
@ -47,9 +47,9 @@ func TestCIDRSetFullyAllocated(t *testing.T) {
}
for _, tc := range cases {
_, clusterCIDR, _ := net.ParseCIDR(tc.clusterCIDRStr)
a := newCIDRSet(clusterCIDR, tc.subNetMaskSize)
a := NewCIDRSet(clusterCIDR, tc.subNetMaskSize)
p, err := a.allocateNext()
p, err := a.AllocateNext()
if err != nil {
t.Fatalf("unexpected error: %v for %v", err, tc.description)
}
@ -58,14 +58,14 @@ func TestCIDRSetFullyAllocated(t *testing.T) {
p.String(), tc.expectedCIDR, tc.description)
}
_, err = a.allocateNext()
_, err = a.AllocateNext()
if err == nil {
t.Fatalf("expected error because of fully-allocated range for %v", tc.description)
}
a.release(p)
a.Release(p)
p, err = a.allocateNext()
p, err = a.AllocateNext()
if err != nil {
t.Fatalf("unexpected error: %v for %v", err, tc.description)
}
@ -73,7 +73,7 @@ func TestCIDRSetFullyAllocated(t *testing.T) {
t.Fatalf("unexpected allocated cidr: %v, expecting %v for %v",
p.String(), tc.expectedCIDR, tc.description)
}
_, err = a.allocateNext()
_, err = a.AllocateNext()
if err == nil {
t.Fatalf("expected error because of fully-allocated range for %v", tc.description)
}
@ -133,7 +133,7 @@ func TestIndexToCIDRBlock(t *testing.T) {
}
for _, tc := range cases {
_, clusterCIDR, _ := net.ParseCIDR(tc.clusterCIDRStr)
a := newCIDRSet(clusterCIDR, tc.subnetMaskSize)
a := NewCIDRSet(clusterCIDR, tc.subnetMaskSize)
cidr := a.indexToCIDRBlock(tc.index)
if cidr.String() != tc.CIDRBlock {
t.Fatalf("error for %v index %d %s", tc.description, tc.index, cidr.String())
@ -157,12 +157,12 @@ func TestCIDRSet_RandomishAllocation(t *testing.T) {
}
for _, tc := range cases {
_, clusterCIDR, _ := net.ParseCIDR(tc.clusterCIDRStr)
a := newCIDRSet(clusterCIDR, 24)
a := NewCIDRSet(clusterCIDR, 24)
// allocate all the CIDRs
var cidrs []*net.IPNet
for i := 0; i < 256; i++ {
if c, err := a.allocateNext(); err == nil {
if c, err := a.AllocateNext(); err == nil {
cidrs = append(cidrs, c)
} else {
t.Fatalf("unexpected error: %v for %v", err, tc.description)
@ -170,25 +170,25 @@ func TestCIDRSet_RandomishAllocation(t *testing.T) {
}
var err error
_, err = a.allocateNext()
_, err = a.AllocateNext()
if err == nil {
t.Fatalf("expected error because of fully-allocated range for %v", tc.description)
}
// release them all
for i := 0; i < len(cidrs); i++ {
a.release(cidrs[i])
a.Release(cidrs[i])
}
// allocate the CIDRs again
var rcidrs []*net.IPNet
for i := 0; i < 256; i++ {
if c, err := a.allocateNext(); err == nil {
if c, err := a.AllocateNext(); err == nil {
rcidrs = append(rcidrs, c)
} else {
t.Fatalf("unexpected error: %d, %v for %v", i, err, tc.description)
}
}
_, err = a.allocateNext()
_, err = a.AllocateNext()
if err == nil {
t.Fatalf("expected error because of fully-allocated range for %v", tc.description)
}
@ -215,14 +215,14 @@ func TestCIDRSet_AllocationOccupied(t *testing.T) {
}
for _, tc := range cases {
_, clusterCIDR, _ := net.ParseCIDR(tc.clusterCIDRStr)
a := newCIDRSet(clusterCIDR, 24)
a := NewCIDRSet(clusterCIDR, 24)
// allocate all the CIDRs
var cidrs []*net.IPNet
var num_cidrs = 256
var numCIDRs = 256
for i := 0; i < num_cidrs; i++ {
if c, err := a.allocateNext(); err == nil {
for i := 0; i < numCIDRs; i++ {
if c, err := a.AllocateNext(); err == nil {
cidrs = append(cidrs, c)
} else {
t.Fatalf("unexpected error: %v for %v", err, tc.description)
@ -230,35 +230,35 @@ func TestCIDRSet_AllocationOccupied(t *testing.T) {
}
var err error
_, err = a.allocateNext()
_, err = a.AllocateNext()
if err == nil {
t.Fatalf("expected error because of fully-allocated range for %v", tc.description)
}
// release them all
for i := 0; i < len(cidrs); i++ {
a.release(cidrs[i])
a.Release(cidrs[i])
}
// occupy the last 128 CIDRs
for i := num_cidrs / 2; i < num_cidrs; i++ {
a.occupy(cidrs[i])
for i := numCIDRs / 2; i < numCIDRs; i++ {
a.Occupy(cidrs[i])
}
// allocate the first 128 CIDRs again
var rcidrs []*net.IPNet
for i := 0; i < num_cidrs/2; i++ {
if c, err := a.allocateNext(); err == nil {
for i := 0; i < numCIDRs/2; i++ {
if c, err := a.AllocateNext(); err == nil {
rcidrs = append(rcidrs, c)
} else {
t.Fatalf("unexpected error: %d, %v for %v", i, err, tc.description)
}
}
_, err = a.allocateNext()
_, err = a.AllocateNext()
if err == nil {
t.Fatalf("expected error because of fully-allocated range for %v", tc.description)
}
// check Occupy() work properly
for i := num_cidrs / 2; i < num_cidrs; i++ {
for i := numCIDRs / 2; i < numCIDRs; i++ {
rcidrs = append(rcidrs, cidrs[i])
}
if !reflect.DeepEqual(cidrs, rcidrs) {
@ -394,7 +394,7 @@ func TestGetBitforCIDR(t *testing.T) {
t.Fatalf("unexpected error: %v for %v", err, tc.description)
}
cs := newCIDRSet(clusterCIDR, tc.subNetMaskSize)
cs := NewCIDRSet(clusterCIDR, tc.subNetMaskSize)
_, subnetCIDR, err := net.ParseCIDR(tc.subNetCIDRStr)
if err != nil {
@ -562,14 +562,14 @@ func TestOccupy(t *testing.T) {
t.Fatalf("unexpected error: %v for %v", err, tc.description)
}
cs := newCIDRSet(clusterCIDR, tc.subNetMaskSize)
cs := NewCIDRSet(clusterCIDR, tc.subNetMaskSize)
_, subnetCIDR, err := net.ParseCIDR(tc.subNetCIDRStr)
if err != nil {
t.Fatalf("unexpected error: %v for %v", err, tc.description)
}
err = cs.occupy(subnetCIDR)
err = cs.Occupy(subnetCIDR)
if err == nil && tc.expectErr {
t.Errorf("expected error but got none for %v", tc.description)
continue
@ -629,9 +629,9 @@ func TestCIDRSetv6(t *testing.T) {
}
for _, tc := range cases {
_, clusterCIDR, _ := net.ParseCIDR(tc.clusterCIDRStr)
a := newCIDRSet(clusterCIDR, tc.subNetMaskSize)
a := NewCIDRSet(clusterCIDR, tc.subNetMaskSize)
p, err := a.allocateNext()
p, err := a.AllocateNext()
if err == nil && tc.expectErr {
t.Errorf("expected error but got none for %v", tc.description)
continue
@ -645,7 +645,7 @@ func TestCIDRSetv6(t *testing.T) {
t.Fatalf("unexpected allocated cidr: %s for %v", p.String(), tc.description)
}
}
p2, err := a.allocateNext()
p2, err := a.AllocateNext()
if !tc.expectErr {
if p2.String() != tc.expectedCIDR2 {
t.Fatalf("unexpected allocated cidr: %s for %v", p2.String(), tc.description)

View File

@ -14,7 +14,7 @@ See the License for the specific language governing permissions and
limitations under the License.
*/
package node
package ipam
import (
"fmt"
@ -32,6 +32,7 @@ import (
"k8s.io/kubernetes/pkg/api"
"k8s.io/kubernetes/pkg/cloudprovider"
"k8s.io/kubernetes/pkg/cloudprovider/providers/gce"
"k8s.io/kubernetes/pkg/controller/node/util"
nodeutil "k8s.io/kubernetes/pkg/util/node"
)
@ -50,6 +51,7 @@ type cloudCIDRAllocator struct {
var _ CIDRAllocator = (*cloudCIDRAllocator)(nil)
// NewCloudCIDRAllocator creates a new cloud CIDR allocator.
func NewCloudCIDRAllocator(
client clientset.Interface,
cloud cloudprovider.Interface) (ca CIDRAllocator, err error) {
@ -79,12 +81,12 @@ func (ca *cloudCIDRAllocator) AllocateOrOccupyCIDR(node *v1.Node) error {
cidrs, err := ca.cloud.AliasRanges(types.NodeName(node.Name))
if err != nil {
recordNodeStatusChange(ca.recorder, node, "CIDRNotAvailable")
util.RecordNodeStatusChange(ca.recorder, node, "CIDRNotAvailable")
return fmt.Errorf("failed to allocate cidr: %v", err)
}
if len(cidrs) == 0 {
recordNodeStatusChange(ca.recorder, node, "CIDRNotAvailable")
util.RecordNodeStatusChange(ca.recorder, node, "CIDRNotAvailable")
glog.V(2).Infof("Node %v has no CIDRs", node.Name)
return fmt.Errorf("failed to allocate cidr (none exist)")
}

View File

@ -14,13 +14,15 @@ See the License for the specific language governing permissions and
limitations under the License.
*/
package node
package ipam
import (
"fmt"
"net"
"sync"
"github.com/golang/glog"
"k8s.io/api/core/v1"
apierrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
@ -31,7 +33,8 @@ import (
v1core "k8s.io/client-go/kubernetes/typed/core/v1"
"k8s.io/client-go/tools/record"
"github.com/golang/glog"
"k8s.io/kubernetes/pkg/controller/node/ipam/cidrset"
"k8s.io/kubernetes/pkg/controller/node/util"
)
// TODO: figure out the good setting for those constants.
@ -45,7 +48,7 @@ const (
type rangeAllocator struct {
client clientset.Interface
cidrs *cidrSet
cidrs *cidrset.CidrSet
clusterCIDR *net.IPNet
maxCIDRs int
// Channel that is used to pass updating Nodes with assigned CIDRs to the background
@ -74,7 +77,7 @@ func NewCIDRRangeAllocator(client clientset.Interface, clusterCIDR *net.IPNet, s
ra := &rangeAllocator{
client: client,
cidrs: newCIDRSet(clusterCIDR, subNetMaskSize),
cidrs: cidrset.NewCIDRSet(clusterCIDR, subNetMaskSize),
clusterCIDR: clusterCIDR,
nodeCIDRUpdateChannel: make(chan nodeAndCIDR, cidrUpdateQueueSize),
recorder: recorder,
@ -150,7 +153,7 @@ func (r *rangeAllocator) occupyCIDR(node *v1.Node) error {
if err != nil {
return fmt.Errorf("failed to parse node %s, CIDR %s", node.Name, node.Spec.PodCIDR)
}
if err := r.cidrs.occupy(podCIDR); err != nil {
if err := r.cidrs.Occupy(podCIDR); err != nil {
return fmt.Errorf("failed to mark cidr as occupied: %v", err)
}
return nil
@ -169,10 +172,10 @@ func (r *rangeAllocator) AllocateOrOccupyCIDR(node *v1.Node) error {
if node.Spec.PodCIDR != "" {
return r.occupyCIDR(node)
}
podCIDR, err := r.cidrs.allocateNext()
podCIDR, err := r.cidrs.AllocateNext()
if err != nil {
r.removeNodeFromProcessing(node.Name)
recordNodeStatusChange(r.recorder, node, "CIDRNotAvailable")
util.RecordNodeStatusChange(r.recorder, node, "CIDRNotAvailable")
return fmt.Errorf("failed to allocate cidr: %v", err)
}
@ -194,7 +197,7 @@ func (r *rangeAllocator) ReleaseCIDR(node *v1.Node) error {
}
glog.V(4).Infof("release CIDR %s", node.Spec.PodCIDR)
if err = r.cidrs.release(podCIDR); err != nil {
if err = r.cidrs.Release(podCIDR); err != nil {
return fmt.Errorf("Error when releasing CIDR %v: %v", node.Spec.PodCIDR, err)
}
return err
@ -212,7 +215,7 @@ func (r *rangeAllocator) filterOutServiceRange(serviceCIDR *net.IPNet) {
return
}
if err := r.cidrs.occupy(serviceCIDR); err != nil {
if err := r.cidrs.Occupy(serviceCIDR); err != nil {
glog.Errorf("Error filtering out service cidr %v: %v", serviceCIDR, err)
}
}
@ -232,7 +235,7 @@ func (r *rangeAllocator) updateCIDRAllocation(data nodeAndCIDR) error {
if node.Spec.PodCIDR != "" {
glog.Errorf("Node %v already has allocated CIDR %v. Releasing assigned one if different.", node.Name, node.Spec.PodCIDR)
if node.Spec.PodCIDR != data.cidr.String() {
if err := r.cidrs.release(data.cidr); err != nil {
if err := r.cidrs.Release(data.cidr); err != nil {
glog.Errorf("Error when releasing CIDR %v", data.cidr.String())
}
}
@ -246,13 +249,13 @@ func (r *rangeAllocator) updateCIDRAllocation(data nodeAndCIDR) error {
}
}
if err != nil {
recordNodeStatusChange(r.recorder, node, "CIDRAssignmentFailed")
util.RecordNodeStatusChange(r.recorder, node, "CIDRAssignmentFailed")
// We accept the fact that we may leek CIDRs here. This is safer than releasing
// them in case when we don't know if request went through.
// NodeController restart will return all falsely allocated CIDRs to the pool.
if !apierrors.IsServerTimeout(err) {
glog.Errorf("CIDR assignment for node %v failed: %v. Releasing allocated CIDR", data.nodeName, err)
if releaseErr := r.cidrs.release(data.cidr); releaseErr != nil {
if releaseErr := r.cidrs.Release(data.cidr); releaseErr != nil {
glog.Errorf("Error releasing allocated CIDR for node %v: %v", data.nodeName, releaseErr)
}
}

View File

@ -17,12 +17,13 @@ limitations under the License.
package node
import (
"errors"
"fmt"
"net"
"sync"
"time"
"github.com/golang/glog"
apiequality "k8s.io/apimachinery/pkg/api/equality"
apierrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
@ -47,14 +48,15 @@ import (
v1node "k8s.io/kubernetes/pkg/api/v1/node"
"k8s.io/kubernetes/pkg/cloudprovider"
"k8s.io/kubernetes/pkg/controller"
"k8s.io/kubernetes/pkg/controller/node/ipam"
"k8s.io/kubernetes/pkg/controller/node/scheduler"
"k8s.io/kubernetes/pkg/controller/node/util"
"k8s.io/kubernetes/pkg/util/metrics"
utilnode "k8s.io/kubernetes/pkg/util/node"
"k8s.io/kubernetes/pkg/util/system"
taintutils "k8s.io/kubernetes/pkg/util/taints"
utilversion "k8s.io/kubernetes/pkg/util/version"
"k8s.io/kubernetes/plugin/pkg/scheduler/algorithm"
"github.com/golang/glog"
)
func init() {
@ -63,13 +65,8 @@ func init() {
}
var (
ErrCloudInstance = errors.New("cloud provider doesn't support instances.")
gracefulDeletionVersion = utilversion.MustParseSemantic("v1.1.0")
// The minimum kubelet version for which the nodecontroller
// can safely flip pod.Status to NotReady.
podStatusReconciliationVersion = utilversion.MustParseSemantic("v1.2.0")
UnreachableTaintTemplate = &v1.Taint{
Key: algorithm.TaintNodeUnreachable,
Effect: v1.TaintEffectNoExecute,
@ -82,12 +79,6 @@ var (
)
const (
// nodeStatusUpdateRetry controls the number of retries of writing NodeStatus update.
nodeStatusUpdateRetry = 5
// controls how often NodeController will try to evict Pods from non-responsive Nodes.
nodeEvictionPeriod = 100 * time.Millisecond
// Burst value for all eviction rate limiters
evictionRateLimiterBurst = 1
// The amount of time the nodecontroller polls on the list nodes endpoint.
apiserverStartupGracePeriod = 10 * time.Minute
// The amount of time the nodecontroller should sleep between retrying NodeStatus updates
@ -111,7 +102,7 @@ type nodeStatusData struct {
type NodeController struct {
allocateNodeCIDRs bool
allocatorType CIDRAllocatorType
allocatorType ipam.CIDRAllocatorType
cloud cloudprovider.Interface
clusterCIDR *net.IPNet
@ -150,9 +141,9 @@ type NodeController struct {
// Lock to access evictor workers
evictorLock sync.Mutex
// workers that evicts pods from unresponsive nodes.
zonePodEvictor map[string]*RateLimitedTimedQueue
zonePodEvictor map[string]*scheduler.RateLimitedTimedQueue
// workers that are responsible for tainting nodes.
zoneNoExecuteTainer map[string]*RateLimitedTimedQueue
zoneNoExecuteTainer map[string]*scheduler.RateLimitedTimedQueue
podEvictionTimeout time.Duration
// The maximum duration before a pod evicted from a node can be forcefully terminated.
maximumGracePeriod time.Duration
@ -166,9 +157,9 @@ type NodeController struct {
podInformerSynced cache.InformerSynced
cidrAllocator CIDRAllocator
cidrAllocator ipam.CIDRAllocator
taintManager *NoExecuteTaintManager
taintManager *scheduler.NoExecuteTaintManager
forcefullyDeletePod func(*v1.Pod) error
nodeExistsInCloudProvider func(types.NodeName) (bool, error)
@ -213,7 +204,7 @@ func NewNodeController(
serviceCIDR *net.IPNet,
nodeCIDRMaskSize int,
allocateNodeCIDRs bool,
allocatorType CIDRAllocatorType,
allocatorType ipam.CIDRAllocatorType,
runTaintManager bool,
useTaintBasedEvictions bool) (*NodeController, error) {
eventBroadcaster := record.NewBroadcaster()
@ -247,8 +238,8 @@ func NewNodeController(
recorder: recorder,
podEvictionTimeout: podEvictionTimeout,
maximumGracePeriod: 5 * time.Minute,
zonePodEvictor: make(map[string]*RateLimitedTimedQueue),
zoneNoExecuteTainer: make(map[string]*RateLimitedTimedQueue),
zonePodEvictor: make(map[string]*scheduler.RateLimitedTimedQueue),
zoneNoExecuteTainer: make(map[string]*scheduler.RateLimitedTimedQueue),
nodeStatusMap: make(map[string]nodeStatusData),
nodeMonitorGracePeriod: nodeMonitorGracePeriod,
nodeMonitorPeriod: nodeMonitorPeriod,
@ -259,8 +250,8 @@ func NewNodeController(
serviceCIDR: serviceCIDR,
allocateNodeCIDRs: allocateNodeCIDRs,
allocatorType: allocatorType,
forcefullyDeletePod: func(p *v1.Pod) error { return forcefullyDeletePod(kubeClient, p) },
nodeExistsInCloudProvider: func(nodeName types.NodeName) (bool, error) { return nodeExistsInCloudProvider(cloud, nodeName) },
forcefullyDeletePod: func(p *v1.Pod) error { return util.ForcefullyDeletePod(kubeClient, p) },
nodeExistsInCloudProvider: func(nodeName types.NodeName) (bool, error) { return util.NodeExistsInCloudProvider(cloud, nodeName) },
evictionLimiterQPS: evictionLimiterQPS,
secondaryEvictionLimiterQPS: secondaryEvictionLimiterQPS,
largeClusterThreshold: largeClusterThreshold,
@ -334,11 +325,11 @@ func NewNodeController(
}
switch nc.allocatorType {
case RangeAllocatorType:
nc.cidrAllocator, err = NewCIDRRangeAllocator(
case ipam.RangeAllocatorType:
nc.cidrAllocator, err = ipam.NewCIDRRangeAllocator(
kubeClient, clusterCIDR, serviceCIDR, nodeCIDRMaskSize, nodeList)
case CloudAllocatorType:
nc.cidrAllocator, err = NewCloudCIDRAllocator(kubeClient, cloud)
case ipam.CloudAllocatorType:
nc.cidrAllocator, err = ipam.NewCloudCIDRAllocator(kubeClient, cloud)
default:
return nil, fmt.Errorf("Invalid CIDR allocator type: %v", nc.allocatorType)
}
@ -348,8 +339,8 @@ func NewNodeController(
}
nodeInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: createAddNodeHandler(nc.cidrAllocator.AllocateOrOccupyCIDR),
UpdateFunc: createUpdateNodeHandler(func(_, newNode *v1.Node) error {
AddFunc: util.CreateAddNodeHandler(nc.cidrAllocator.AllocateOrOccupyCIDR),
UpdateFunc: util.CreateUpdateNodeHandler(func(_, newNode *v1.Node) error {
// If the PodCIDR is not empty we either:
// - already processed a Node that already had a CIDR after NC restarted
// (cidr is marked as used),
@ -374,26 +365,26 @@ func NewNodeController(
}
return nil
}),
DeleteFunc: createDeleteNodeHandler(nc.cidrAllocator.ReleaseCIDR),
DeleteFunc: util.CreateDeleteNodeHandler(nc.cidrAllocator.ReleaseCIDR),
})
}
if nc.runTaintManager {
nodeInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: createAddNodeHandler(func(node *v1.Node) error {
AddFunc: util.CreateAddNodeHandler(func(node *v1.Node) error {
nc.taintManager.NodeUpdated(nil, node)
return nil
}),
UpdateFunc: createUpdateNodeHandler(func(oldNode, newNode *v1.Node) error {
UpdateFunc: util.CreateUpdateNodeHandler(func(oldNode, newNode *v1.Node) error {
nc.taintManager.NodeUpdated(oldNode, newNode)
return nil
}),
DeleteFunc: createDeleteNodeHandler(func(node *v1.Node) error {
DeleteFunc: util.CreateDeleteNodeHandler(func(node *v1.Node) error {
nc.taintManager.NodeUpdated(node, nil)
return nil
}),
})
nc.taintManager = NewNoExecuteTaintManager(kubeClient)
nc.taintManager = scheduler.NewNoExecuteTaintManager(kubeClient)
}
nc.nodeLister = nodeInformer.Lister()
@ -410,7 +401,7 @@ func (nc *NodeController) doEvictionPass() {
defer nc.evictorLock.Unlock()
for k := range nc.zonePodEvictor {
// Function should return 'false' and a time after which it should be retried, or 'true' if it shouldn't (it succeeded).
nc.zonePodEvictor[k].Try(func(value TimedValue) (bool, time.Duration) {
nc.zonePodEvictor[k].Try(func(value scheduler.TimedValue) (bool, time.Duration) {
node, err := nc.nodeLister.Get(value.Value)
if apierrors.IsNotFound(err) {
glog.Warningf("Node %v no longer present in nodeLister!", value.Value)
@ -421,7 +412,7 @@ func (nc *NodeController) doEvictionPass() {
EvictionsNumber.WithLabelValues(zone).Inc()
}
nodeUid, _ := value.UID.(string)
remaining, err := deletePods(nc.kubeClient, nc.recorder, value.Value, nodeUid, nc.daemonSetStore)
remaining, err := util.DeletePods(nc.kubeClient, nc.recorder, value.Value, nodeUid, nc.daemonSetStore)
if err != nil {
utilruntime.HandleError(fmt.Errorf("unable to evict node %q: %v", value.Value, err))
return false, 0
@ -439,7 +430,7 @@ func (nc *NodeController) doNoExecuteTaintingPass() {
defer nc.evictorLock.Unlock()
for k := range nc.zoneNoExecuteTainer {
// Function should return 'false' and a time after which it should be retried, or 'true' if it shouldn't (it succeeded).
nc.zoneNoExecuteTainer[k].Try(func(value TimedValue) (bool, time.Duration) {
nc.zoneNoExecuteTainer[k].Try(func(value scheduler.TimedValue) (bool, time.Duration) {
node, err := nc.nodeLister.Get(value.Value)
if apierrors.IsNotFound(err) {
glog.Warningf("Node %v no longer present in nodeLister!", value.Value)
@ -468,7 +459,7 @@ func (nc *NodeController) doNoExecuteTaintingPass() {
return true, 0
}
return swapNodeControllerTaint(nc.kubeClient, &taintToAdd, &oppositeTaint, node), 0
return util.SwapNodeControllerTaint(nc.kubeClient, &taintToAdd, &oppositeTaint, node), 0
})
}
}
@ -498,12 +489,12 @@ func (nc *NodeController) Run(stopCh <-chan struct{}) {
if nc.useTaintBasedEvictions {
// Handling taint based evictions. Because we don't want a dedicated logic in TaintManager for NC-originated
// taints and we normally don't rate limit evictions caused by taints, we need to rate limit adding taints.
go wait.Until(nc.doNoExecuteTaintingPass, nodeEvictionPeriod, wait.NeverStop)
go wait.Until(nc.doNoExecuteTaintingPass, scheduler.NodeEvictionPeriod, wait.NeverStop)
} else {
// Managing eviction of nodes:
// When we delete pods off a node, if the node was not empty at the time we then
// queue an eviction watcher. If we hit an error, retry deletion.
go wait.Until(nc.doEvictionPass, nodeEvictionPeriod, wait.NeverStop)
go wait.Until(nc.doEvictionPass, scheduler.NodeEvictionPeriod, wait.NeverStop)
}
<-stopCh
@ -516,12 +507,12 @@ func (nc *NodeController) addPodEvictorForNewZone(node *v1.Node) {
nc.zoneStates[zone] = stateInitial
if !nc.useTaintBasedEvictions {
nc.zonePodEvictor[zone] =
NewRateLimitedTimedQueue(
flowcontrol.NewTokenBucketRateLimiter(nc.evictionLimiterQPS, evictionRateLimiterBurst))
scheduler.NewRateLimitedTimedQueue(
flowcontrol.NewTokenBucketRateLimiter(nc.evictionLimiterQPS, scheduler.EvictionRateLimiterBurst))
} else {
nc.zoneNoExecuteTainer[zone] =
NewRateLimitedTimedQueue(
flowcontrol.NewTokenBucketRateLimiter(nc.evictionLimiterQPS, evictionRateLimiterBurst))
scheduler.NewRateLimitedTimedQueue(
flowcontrol.NewTokenBucketRateLimiter(nc.evictionLimiterQPS, scheduler.EvictionRateLimiterBurst))
}
// Init the metric for the new zone.
glog.Infof("Initializing eviction metric for zone: %v", zone)
@ -547,7 +538,7 @@ func (nc *NodeController) monitorNodeStatus() error {
for i := range added {
glog.V(1).Infof("NodeController observed a new Node: %#v", added[i].Name)
recordNodeEvent(nc.recorder, added[i].Name, string(added[i].UID), v1.EventTypeNormal, "RegisteredNode", fmt.Sprintf("Registered Node %v in NodeController", added[i].Name))
util.RecordNodeEvent(nc.recorder, added[i].Name, string(added[i].UID), v1.EventTypeNormal, "RegisteredNode", fmt.Sprintf("Registered Node %v in NodeController", added[i].Name))
nc.knownNodeSet[added[i].Name] = added[i]
nc.addPodEvictorForNewZone(added[i])
if nc.useTaintBasedEvictions {
@ -559,7 +550,7 @@ func (nc *NodeController) monitorNodeStatus() error {
for i := range deleted {
glog.V(1).Infof("NodeController observed a Node deletion: %v", deleted[i].Name)
recordNodeEvent(nc.recorder, deleted[i].Name, string(deleted[i].UID), v1.EventTypeNormal, "RemovingNode", fmt.Sprintf("Removing Node %v from NodeController", deleted[i].Name))
util.RecordNodeEvent(nc.recorder, deleted[i].Name, string(deleted[i].UID), v1.EventTypeNormal, "RemovingNode", fmt.Sprintf("Removing Node %v from NodeController", deleted[i].Name))
delete(nc.knownNodeSet, deleted[i].Name)
}
@ -574,7 +565,7 @@ func (nc *NodeController) monitorNodeStatus() error {
continue
}
node := nodeCopy.(*v1.Node)
if err := wait.PollImmediate(retrySleepTime, retrySleepTime*nodeStatusUpdateRetry, func() (bool, error) {
if err := wait.PollImmediate(retrySleepTime, retrySleepTime*scheduler.NodeStatusUpdateRetry, func() (bool, error) {
gracePeriod, observedReadyCondition, currentReadyCondition, err = nc.tryUpdateNodeStatus(node)
if err == nil {
return true, nil
@ -605,7 +596,7 @@ func (nc *NodeController) monitorNodeStatus() error {
// We want to update the taint straight away if Node is already tainted with the UnreachableTaint
if taintutils.TaintExists(node.Spec.Taints, UnreachableTaintTemplate) {
taintToAdd := *NotReadyTaintTemplate
if !swapNodeControllerTaint(nc.kubeClient, &taintToAdd, UnreachableTaintTemplate, node) {
if !util.SwapNodeControllerTaint(nc.kubeClient, &taintToAdd, UnreachableTaintTemplate, node) {
glog.Errorf("Failed to instantly swap UnreachableTaint to NotReadyTaint. Will try again in the next cycle.")
}
} else if nc.markNodeForTainting(node) {
@ -632,7 +623,7 @@ func (nc *NodeController) monitorNodeStatus() error {
// We want to update the taint straight away if Node is already tainted with the UnreachableTaint
if taintutils.TaintExists(node.Spec.Taints, NotReadyTaintTemplate) {
taintToAdd := *UnreachableTaintTemplate
if !swapNodeControllerTaint(nc.kubeClient, &taintToAdd, NotReadyTaintTemplate, node) {
if !util.SwapNodeControllerTaint(nc.kubeClient, &taintToAdd, NotReadyTaintTemplate, node) {
glog.Errorf("Failed to instantly swap UnreachableTaint to NotReadyTaint. Will try again in the next cycle.")
}
} else if nc.markNodeForTainting(node) {
@ -672,8 +663,8 @@ func (nc *NodeController) monitorNodeStatus() error {
// Report node event.
if currentReadyCondition.Status != v1.ConditionTrue && observedReadyCondition.Status == v1.ConditionTrue {
recordNodeStatusChange(nc.recorder, node, "NodeNotReady")
if err = markAllPodsNotReady(nc.kubeClient, node); err != nil {
util.RecordNodeStatusChange(nc.recorder, node, "NodeNotReady")
if err = util.MarkAllPodsNotReady(nc.kubeClient, node); err != nil {
utilruntime.HandleError(fmt.Errorf("Unable to mark all pods NotReady on node %v: %v", node.Name, err))
}
}
@ -688,13 +679,13 @@ func (nc *NodeController) monitorNodeStatus() error {
}
if !exists {
glog.V(2).Infof("Deleting node (no longer present in cloud provider): %s", node.Name)
recordNodeEvent(nc.recorder, node.Name, string(node.UID), v1.EventTypeNormal, "DeletingNode", fmt.Sprintf("Deleting Node %v because it's not present according to cloud provider", node.Name))
util.RecordNodeEvent(nc.recorder, node.Name, string(node.UID), v1.EventTypeNormal, "DeletingNode", fmt.Sprintf("Deleting Node %v because it's not present according to cloud provider", node.Name))
go func(nodeName string) {
defer utilruntime.HandleCrash()
// Kubelet is not reporting and Cloud Provider says node
// is gone. Delete it without worrying about grace
// periods.
if err := forcefullyDeleteNode(nc.kubeClient, nodeName); err != nil {
if err := util.ForcefullyDeleteNode(nc.kubeClient, nodeName); err != nil {
glog.Errorf("Unable to forcefully delete node %q: %v", nodeName, err)
}
}(node.Name)
@ -1122,3 +1113,55 @@ func (nc *NodeController) ComputeZoneState(nodeReadyConditions []*v1.NodeConditi
return notReadyNodes, stateNormal
}
}
// maybeDeleteTerminatingPod non-gracefully deletes pods that are terminating
// that should not be gracefully terminated.
func (nc *NodeController) maybeDeleteTerminatingPod(obj interface{}) {
pod, ok := obj.(*v1.Pod)
if !ok {
tombstone, ok := obj.(cache.DeletedFinalStateUnknown)
if !ok {
glog.Errorf("Couldn't get object from tombstone %#v", obj)
return
}
pod, ok = tombstone.Obj.(*v1.Pod)
if !ok {
glog.Errorf("Tombstone contained object that is not a Pod %#v", obj)
return
}
}
// consider only terminating pods
if pod.DeletionTimestamp == nil {
return
}
node, err := nc.nodeLister.Get(pod.Spec.NodeName)
// if there is no such node, do nothing and let the podGC clean it up.
if apierrors.IsNotFound(err) {
return
}
if err != nil {
// this can only happen if the Store.KeyFunc has a problem creating
// a key for the pod. If it happens once, it will happen again so
// don't bother requeuing the pod.
utilruntime.HandleError(err)
return
}
// delete terminating pods that have been scheduled on
// nodes that do not support graceful termination
// TODO(mikedanese): this can be removed when we no longer
// guarantee backwards compatibility of master API to kubelets with
// versions less than 1.1.0
v, err := utilversion.ParseSemantic(node.Status.NodeInfo.KubeletVersion)
if err != nil {
glog.V(0).Infof("Couldn't parse version %q of node: %v", node.Status.NodeInfo.KubeletVersion, err)
utilruntime.HandleError(nc.forcefullyDeletePod(pod))
return
}
if v.LessThan(gracefulDeletionVersion) {
utilruntime.HandleError(nc.forcefullyDeletePod(pod))
return
}
}

View File

@ -39,6 +39,9 @@ import (
"k8s.io/kubernetes/pkg/cloudprovider"
fakecloud "k8s.io/kubernetes/pkg/cloudprovider/providers/fake"
"k8s.io/kubernetes/pkg/controller"
"k8s.io/kubernetes/pkg/controller/node/ipam"
"k8s.io/kubernetes/pkg/controller/node/scheduler"
"k8s.io/kubernetes/pkg/controller/node/util"
"k8s.io/kubernetes/pkg/controller/testutil"
kubeletapis "k8s.io/kubernetes/pkg/kubelet/apis"
"k8s.io/kubernetes/pkg/util/node"
@ -103,7 +106,7 @@ func NewNodeControllerFromClient(
serviceCIDR,
nodeCIDRMaskSize,
allocateNodeCIDRs,
RangeAllocatorType,
ipam.RangeAllocatorType,
useTaints,
useTaints,
)
@ -637,9 +640,9 @@ func TestMonitorNodeStatusEvictPods(t *testing.T) {
zones := testutil.GetZones(item.fakeNodeHandler)
for _, zone := range zones {
if _, ok := nodeController.zonePodEvictor[zone]; ok {
nodeController.zonePodEvictor[zone].Try(func(value TimedValue) (bool, time.Duration) {
nodeController.zonePodEvictor[zone].Try(func(value scheduler.TimedValue) (bool, time.Duration) {
nodeUid, _ := value.UID.(string)
deletePods(item.fakeNodeHandler, nodeController.recorder, value.Value, nodeUid, nodeController.daemonSetInformer.Lister())
util.DeletePods(item.fakeNodeHandler, nodeController.recorder, value.Value, nodeUid, nodeController.daemonSetInformer.Lister())
return true, 0
})
} else {
@ -782,9 +785,9 @@ func TestPodStatusChange(t *testing.T) {
}
zones := testutil.GetZones(item.fakeNodeHandler)
for _, zone := range zones {
nodeController.zonePodEvictor[zone].Try(func(value TimedValue) (bool, time.Duration) {
nodeController.zonePodEvictor[zone].Try(func(value scheduler.TimedValue) (bool, time.Duration) {
nodeUid, _ := value.UID.(string)
deletePods(item.fakeNodeHandler, nodeController.recorder, value.Value, nodeUid, nodeController.daemonSetStore)
util.DeletePods(item.fakeNodeHandler, nodeController.recorder, value.Value, nodeUid, nodeController.daemonSetStore)
return true, 0
})
}
@ -1337,9 +1340,9 @@ func (nc *nodeController) doEviction(fakeNodeHandler *testutil.FakeNodeHandler)
var podEvicted bool
zones := testutil.GetZones(fakeNodeHandler)
for _, zone := range zones {
nc.zonePodEvictor[zone].Try(func(value TimedValue) (bool, time.Duration) {
nc.zonePodEvictor[zone].Try(func(value scheduler.TimedValue) (bool, time.Duration) {
uid, _ := value.UID.(string)
deletePods(fakeNodeHandler, nc.recorder, value.Value, uid, nc.daemonSetStore)
util.DeletePods(fakeNodeHandler, nc.recorder, value.Value, uid, nc.daemonSetStore)
return true, 0
})
}
@ -2310,7 +2313,7 @@ func TestCheckNodeKubeletVersionParsing(t *testing.T) {
},
},
}
isOutdated := nodeRunningOutdatedKubelet(n)
isOutdated := util.NodeRunningOutdatedKubelet(n)
if ov.outdated != isOutdated {
t.Errorf("Version %v doesn't match test expectation. Expected outdated %v got %v", n.Status.NodeInfo.KubeletVersion, ov.outdated, isOutdated)
} else {

View File

@ -0,0 +1,69 @@
package(default_visibility = ["//visibility:public"])
licenses(["notice"])
load(
"@io_bazel_rules_go//go:def.bzl",
"go_library",
"go_test",
)
go_test(
name = "go_default_test",
srcs = [
"rate_limited_queue_test.go",
"taint_controller_test.go",
"timed_workers_test.go",
],
library = ":go_default_library",
tags = ["automanaged"],
deps = [
"//pkg/controller/testutil:go_default_library",
"//vendor/k8s.io/api/core/v1:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/util/sets:go_default_library",
"//vendor/k8s.io/client-go/kubernetes/fake:go_default_library",
"//vendor/k8s.io/client-go/testing:go_default_library",
"//vendor/k8s.io/client-go/util/flowcontrol:go_default_library",
],
)
go_library(
name = "go_default_library",
srcs = [
"rate_limited_queue.go",
"taint_controller.go",
"timed_workers.go",
],
tags = ["automanaged"],
deps = [
"//pkg/api/helper:go_default_library",
"//pkg/api/v1/helper:go_default_library",
"//vendor/github.com/golang/glog:go_default_library",
"//vendor/k8s.io/api/core/v1:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/fields:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/labels:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/types:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/util/sets:go_default_library",
"//vendor/k8s.io/client-go/kubernetes:go_default_library",
"//vendor/k8s.io/client-go/kubernetes/scheme:go_default_library",
"//vendor/k8s.io/client-go/kubernetes/typed/core/v1:go_default_library",
"//vendor/k8s.io/client-go/tools/record:go_default_library",
"//vendor/k8s.io/client-go/util/flowcontrol:go_default_library",
"//vendor/k8s.io/client-go/util/workqueue:go_default_library",
],
)
filegroup(
name = "package-srcs",
srcs = glob(["**"]),
tags = ["automanaged"],
visibility = ["//visibility:private"],
)
filegroup(
name = "all-srcs",
srcs = [":package-srcs"],
tags = ["automanaged"],
)

View File

@ -14,7 +14,7 @@ See the License for the specific language governing permissions and
limitations under the License.
*/
package node
package scheduler
import (
"container/heap"
@ -27,6 +27,18 @@ import (
"github.com/golang/glog"
)
const (
// NodeStatusUpdateRetry controls the number of retries of writing
// NodeStatus update.
NodeStatusUpdateRetry = 5
// NodeEvictionPeriod controls how often NodeController will try to
// evict Pods from non-responsive Nodes.
NodeEvictionPeriod = 100 * time.Millisecond
// EvictionRateLimiterBurst is the burst value for all eviction rate
// limiters
EvictionRateLimiterBurst = 1
)
// TimedValue is a value that should be processed at a designated time.
type TimedValue struct {
Value string
@ -37,19 +49,26 @@ type TimedValue struct {
}
// now is used to test time
var now func() time.Time = time.Now
var now = time.Now
// TimedQueue is a priority heap where the lowest ProcessAt is at the front of the queue
type TimedQueue []*TimedValue
func (h TimedQueue) Len() int { return len(h) }
func (h TimedQueue) Less(i, j int) bool { return h[i].ProcessAt.Before(h[j].ProcessAt) }
func (h TimedQueue) Swap(i, j int) { h[i], h[j] = h[j], h[i] }
// Len is the length of the queue.
func (h TimedQueue) Len() int { return len(h) }
// Less returns true if queue[i] < queue[j].
func (h TimedQueue) Less(i, j int) bool { return h[i].ProcessAt.Before(h[j].ProcessAt) }
// Swap swaps index i and j.
func (h TimedQueue) Swap(i, j int) { h[i], h[j] = h[j], h[i] }
// Push a new TimedValue on to the queue.
func (h *TimedQueue) Push(x interface{}) {
*h = append(*h, x.(*TimedValue))
}
// Pop the lowest ProcessAt item.
func (h *TimedQueue) Pop() interface{} {
old := *h
n := len(old)
@ -58,16 +77,17 @@ func (h *TimedQueue) Pop() interface{} {
return x
}
// A FIFO queue which additionally guarantees that any element can be added only once until
// it is removed.
// UniqueQueue is a FIFO queue which additionally guarantees that any
// element can be added only once until it is removed.
type UniqueQueue struct {
lock sync.Mutex
queue TimedQueue
set sets.String
}
// Adds a new value to the queue if it wasn't added before, or was explicitly removed by the
// Remove call. Returns true if new value was added.
// Add a new value to the queue if it wasn't added before, or was
// explicitly removed by the Remove call. Returns true if new value
// was added.
func (q *UniqueQueue) Add(value TimedValue) bool {
q.lock.Lock()
defer q.lock.Unlock()
@ -80,8 +100,9 @@ func (q *UniqueQueue) Add(value TimedValue) bool {
return true
}
// Replace replaces an existing value in the queue if it already exists, otherwise it does nothing.
// Returns true if the item was found.
// Replace replaces an existing value in the queue if it already
// exists, otherwise it does nothing. Returns true if the item was
// found.
func (q *UniqueQueue) Replace(value TimedValue) bool {
q.lock.Lock()
defer q.lock.Unlock()
@ -97,8 +118,9 @@ func (q *UniqueQueue) Replace(value TimedValue) bool {
return false
}
// Removes the value from the queue, but keeps it in the set, so it won't be added second time.
// Returns true if something was removed.
// RemoveFromQueue the value from the queue, but keeps it in the set,
// so it won't be added second time. Returns true if something was
// removed.
func (q *UniqueQueue) RemoveFromQueue(value string) bool {
q.lock.Lock()
defer q.lock.Unlock()
@ -115,8 +137,9 @@ func (q *UniqueQueue) RemoveFromQueue(value string) bool {
return false
}
// Removes the value from the queue, so Get() call won't return it, and allow subsequent addition
// of the given value. If the value is not present does nothing and returns false.
// Remove the value from the queue, so Get() call won't return it, and
// allow subsequent addition of the given value. If the value is not
// present does nothing and returns false.
func (q *UniqueQueue) Remove(value string) bool {
q.lock.Lock()
defer q.lock.Unlock()
@ -134,7 +157,7 @@ func (q *UniqueQueue) Remove(value string) bool {
return true
}
// Returns the oldest added value that wasn't returned yet.
// Get returns the oldest added value that wasn't returned yet.
func (q *UniqueQueue) Get() (TimedValue, bool) {
q.lock.Lock()
defer q.lock.Unlock()
@ -146,7 +169,8 @@ func (q *UniqueQueue) Get() (TimedValue, bool) {
return *result, true
}
// Head returns the oldest added value that wasn't returned yet without removing it.
// Head returns the oldest added value that wasn't returned yet
// without removing it.
func (q *UniqueQueue) Head() (TimedValue, bool) {
q.lock.Lock()
defer q.lock.Unlock()
@ -157,7 +181,8 @@ func (q *UniqueQueue) Head() (TimedValue, bool) {
return *result, true
}
// Clear removes all items from the queue and duplication preventing set.
// Clear removes all items from the queue and duplication preventing
// set.
func (q *UniqueQueue) Clear() {
q.lock.Lock()
defer q.lock.Unlock()
@ -169,15 +194,16 @@ func (q *UniqueQueue) Clear() {
}
}
// RateLimitedTimedQueue is a unique item priority queue ordered by the expected next time
// of execution. It is also rate limited.
// RateLimitedTimedQueue is a unique item priority queue ordered by
// the expected next time of execution. It is also rate limited.
type RateLimitedTimedQueue struct {
queue UniqueQueue
limiterLock sync.Mutex
limiter flowcontrol.RateLimiter
}
// Creates new queue which will use given RateLimiter to oversee execution.
// NewRateLimitedTimedQueue creates new queue which will use given
// RateLimiter to oversee execution.
func NewRateLimitedTimedQueue(limiter flowcontrol.RateLimiter) *RateLimitedTimedQueue {
return &RateLimitedTimedQueue{
queue: UniqueQueue{
@ -188,18 +214,21 @@ func NewRateLimitedTimedQueue(limiter flowcontrol.RateLimiter) *RateLimitedTimed
}
}
// ActionFunc takes a timed value and returns false if the item must be retried, with an optional
// time.Duration if some minimum wait interval should be used.
// ActionFunc takes a timed value and returns false if the item must
// be retried, with an optional time.Duration if some minimum wait
// interval should be used.
type ActionFunc func(TimedValue) (bool, time.Duration)
// Try processes the queue. Ends prematurely if RateLimiter forbids an action and leak is true.
// Otherwise, requeues the item to be processed. Each value is processed once if fn returns true,
// otherwise it is added back to the queue. The returned remaining is used to identify the minimum
// time to execute the next item in the queue. The same value is processed only once unless
// Remove is explicitly called on it (it's done by the cancelPodEviction function in NodeController
// when Node becomes Ready again)
// TODO: figure out a good way to do garbage collection for all Nodes that were removed from
// the cluster.
// Try processes the queue.Ends prematurely if RateLimiter forbids an
// action and leak is true. Otherwise, requeues the item to be
// processed. Each value is processed once if fn returns true,
// otherwise it is added back to the queue. The returned remaining is
// used to identify the minimum time to execute the next item in the
// queue. The same value is processed only once unless Remove is
// explicitly called on it (it's done by the cancelPodEviction
// function in NodeController when Node becomes Ready again) TODO:
// figure out a good way to do garbage collection for all Nodes that
// were removed from the cluster.
func (q *RateLimitedTimedQueue) Try(fn ActionFunc) {
val, ok := q.queue.Head()
q.limiterLock.Lock()
@ -227,8 +256,9 @@ func (q *RateLimitedTimedQueue) Try(fn ActionFunc) {
}
}
// Adds value to the queue to be processed. Won't add the same value(comparsion by value) a second time
// if it was already added and not removed.
// Add value to the queue to be processed. Won't add the same
// value(comparsion by value) a second time if it was already added
// and not removed.
func (q *RateLimitedTimedQueue) Add(value string, uid interface{}) bool {
now := now()
return q.queue.Add(TimedValue{
@ -239,17 +269,19 @@ func (q *RateLimitedTimedQueue) Add(value string, uid interface{}) bool {
})
}
// Removes Node from the Evictor. The Node won't be processed until added again.
// Remove Node from the Evictor. The Node won't be processed until
// added again.
func (q *RateLimitedTimedQueue) Remove(value string) bool {
return q.queue.Remove(value)
}
// Removes all items from the queue
// Clear removes all items from the queue
func (q *RateLimitedTimedQueue) Clear() {
q.queue.Clear()
}
// SwapLimiter safely swaps current limiter for this queue with the passed one if capacities or qps's differ.
// SwapLimiter safely swaps current limiter for this queue with the
// passed one if capacities or qps's differ.
func (q *RateLimitedTimedQueue) SwapLimiter(newQPS float32) {
q.limiterLock.Lock()
defer q.limiterLock.Unlock()
@ -260,7 +292,7 @@ func (q *RateLimitedTimedQueue) SwapLimiter(newQPS float32) {
if newQPS <= 0 {
newLimiter = flowcontrol.NewFakeNeverRateLimiter()
} else {
newLimiter = flowcontrol.NewTokenBucketRateLimiter(newQPS, evictionRateLimiterBurst)
newLimiter = flowcontrol.NewTokenBucketRateLimiter(newQPS, EvictionRateLimiterBurst)
}
// If we're currently waiting on limiter, we drain the new one - this is a good approach when Burst value is 1
// TODO: figure out if we need to support higher Burst values and decide on the drain logic, should we keep:

View File

@ -14,7 +14,7 @@ See the License for the specific language governing permissions and
limitations under the License.
*/
package node
package scheduler
import (
"reflect"

View File

@ -14,7 +14,7 @@ See the License for the specific language governing permissions and
limitations under the License.
*/
package node
package scheduler
import (
"fmt"
@ -122,7 +122,7 @@ func getPodsAssignedToNode(c clientset.Interface, nodeName string) ([]v1.Pod, er
time.Sleep(100 * time.Millisecond)
}
if err != nil {
return []v1.Pod{}, fmt.Errorf("Failed to get Pods assigned to node %v. Skipping update.", nodeName)
return []v1.Pod{}, fmt.Errorf("failed to get Pods assigned to node %v", nodeName)
}
return pods.Items, nil
}
@ -325,9 +325,8 @@ func (tc *NoExecuteTaintManager) processPodOnNode(
startTime = scheduledEviction.CreatedAt
if startTime.Add(minTolerationTime).Before(triggerTime) {
return
} else {
tc.cancelWorkWithEvent(podNamespacedName)
}
tc.cancelWorkWithEvent(podNamespacedName)
}
tc.taintEvictionQueue.AddWork(NewWorkArgs(podNamespacedName.Name, podNamespacedName.Namespace), startTime, triggerTime)
}

View File

@ -14,7 +14,7 @@ See the License for the specific language governing permissions and
limitations under the License.
*/
package node
package scheduler
import (
"fmt"

View File

@ -14,7 +14,7 @@ See the License for the specific language governing permissions and
limitations under the License.
*/
package node
package scheduler
import (
"sync"

View File

@ -14,7 +14,7 @@ See the License for the specific language governing permissions and
limitations under the License.
*/
package node
package scheduler
import (
"sync"

View File

@ -0,0 +1,48 @@
package(default_visibility = ["//visibility:public"])
licenses(["notice"])
load(
"@io_bazel_rules_go//go:def.bzl",
"go_library",
)
go_library(
name = "go_default_library",
srcs = ["controller_utils.go"],
tags = ["automanaged"],
deps = [
"//pkg/api:go_default_library",
"//pkg/cloudprovider:go_default_library",
"//pkg/controller:go_default_library",
"//pkg/kubelet/util/format:go_default_library",
"//pkg/util/node:go_default_library",
"//pkg/util/version:go_default_library",
"//vendor/github.com/golang/glog:go_default_library",
"//vendor/k8s.io/api/core/v1:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/api/errors:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/fields:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/types:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/util/errors:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/util/runtime:go_default_library",
"//vendor/k8s.io/client-go/kubernetes:go_default_library",
"//vendor/k8s.io/client-go/kubernetes/scheme:go_default_library",
"//vendor/k8s.io/client-go/listers/extensions/v1beta1:go_default_library",
"//vendor/k8s.io/client-go/tools/cache:go_default_library",
"//vendor/k8s.io/client-go/tools/record:go_default_library",
],
)
filegroup(
name = "package-srcs",
srcs = glob(["**"]),
tags = ["automanaged"],
visibility = ["//visibility:private"],
)
filegroup(
name = "all-srcs",
srcs = [":package-srcs"],
tags = ["automanaged"],
)

View File

@ -14,13 +14,14 @@ See the License for the specific language governing permissions and
limitations under the License.
*/
package node
package util
import (
"errors"
"fmt"
"strings"
"k8s.io/apimachinery/pkg/api/errors"
apierrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/fields"
"k8s.io/apimachinery/pkg/types"
@ -44,9 +45,20 @@ import (
"github.com/golang/glog"
)
// deletePods will delete all pods from master running on given node, and return true
// if any pods were deleted, or were found pending deletion.
func deletePods(kubeClient clientset.Interface, recorder record.EventRecorder, nodeName, nodeUID string, daemonStore extensionslisters.DaemonSetLister) (bool, error) {
var (
// ErrCloudInstance occurs when the cloud provider does not support
// the Instances API.
ErrCloudInstance = errors.New("cloud provider doesn't support instances")
// podStatusReconciliationVersion is the the minimum kubelet version
// for which the nodecontroller can safely flip pod.Status to
// NotReady.
podStatusReconciliationVersion = utilversion.MustParseSemantic("v1.2.0")
)
// DeletePods will delete all pods from master running on given node,
// and return true if any pods were deleted, or were found pending
// deletion.
func DeletePods(kubeClient clientset.Interface, recorder record.EventRecorder, nodeName, nodeUID string, daemonStore extensionslisters.DaemonSetLister) (bool, error) {
remaining := false
selector := fields.OneTermEqualSelector(api.PodHostField, nodeName).String()
options := metav1.ListOptions{FieldSelector: selector}
@ -58,7 +70,7 @@ func deletePods(kubeClient clientset.Interface, recorder record.EventRecorder, n
}
if len(pods.Items) > 0 {
recordNodeEvent(recorder, nodeName, nodeUID, v1.EventTypeNormal, "DeletingAllPods", fmt.Sprintf("Deleting all Pods from Node %v.", nodeName))
RecordNodeEvent(recorder, nodeName, nodeUID, v1.EventTypeNormal, "DeletingAllPods", fmt.Sprintf("Deleting all Pods from Node %v.", nodeName))
}
for _, pod := range pods.Items {
@ -68,8 +80,8 @@ func deletePods(kubeClient clientset.Interface, recorder record.EventRecorder, n
}
// Set reason and message in the pod object.
if _, err = setPodTerminationReason(kubeClient, &pod, nodeName); err != nil {
if errors.IsConflict(err) {
if _, err = SetPodTerminationReason(kubeClient, &pod, nodeName); err != nil {
if apierrors.IsConflict(err) {
updateErrList = append(updateErrList,
fmt.Errorf("update status failed for pod %q: %v", format.Pod(&pod), err))
continue
@ -100,9 +112,10 @@ func deletePods(kubeClient clientset.Interface, recorder record.EventRecorder, n
return remaining, nil
}
// setPodTerminationReason attempts to set a reason and message in the pod status, updates it in the apiserver,
// and returns an error if it encounters one.
func setPodTerminationReason(kubeClient clientset.Interface, pod *v1.Pod, nodeName string) (*v1.Pod, error) {
// SetPodTerminationReason attempts to set a reason and message in the
// pod status, updates it in the apiserver, and returns an error if it
// encounters one.
func SetPodTerminationReason(kubeClient clientset.Interface, pod *v1.Pod, nodeName string) (*v1.Pod, error) {
if pod.Status.Reason == nodepkg.NodeUnreachablePodReason {
return pod, nil
}
@ -118,7 +131,8 @@ func setPodTerminationReason(kubeClient clientset.Interface, pod *v1.Pod, nodeNa
return updatedPod, nil
}
func forcefullyDeletePod(c clientset.Interface, pod *v1.Pod) error {
// ForcefullyDeletePod deletes the pod immediately.
func ForcefullyDeletePod(c clientset.Interface, pod *v1.Pod) error {
var zero int64
glog.Infof("NodeController is force deleting Pod: %v:%v", pod.Namespace, pod.Name)
err := c.Core().Pods(pod.Namespace).Delete(pod.Name, &metav1.DeleteOptions{GracePeriodSeconds: &zero})
@ -128,75 +142,23 @@ func forcefullyDeletePod(c clientset.Interface, pod *v1.Pod) error {
return err
}
// forcefullyDeleteNode immediately the node. The pods on the node are cleaned
// up by the podGC.
func forcefullyDeleteNode(kubeClient clientset.Interface, nodeName string) error {
// ForcefullyDeleteNode deletes the node immediately. The pods on the
// node are cleaned up by the podGC.
func ForcefullyDeleteNode(kubeClient clientset.Interface, nodeName string) error {
if err := kubeClient.Core().Nodes().Delete(nodeName, nil); err != nil {
return fmt.Errorf("unable to delete node %q: %v", nodeName, err)
}
return nil
}
// maybeDeleteTerminatingPod non-gracefully deletes pods that are terminating
// that should not be gracefully terminated.
func (nc *NodeController) maybeDeleteTerminatingPod(obj interface{}) {
pod, ok := obj.(*v1.Pod)
if !ok {
tombstone, ok := obj.(cache.DeletedFinalStateUnknown)
if !ok {
glog.Errorf("Couldn't get object from tombstone %#v", obj)
return
}
pod, ok = tombstone.Obj.(*v1.Pod)
if !ok {
glog.Errorf("Tombstone contained object that is not a Pod %#v", obj)
return
}
}
// consider only terminating pods
if pod.DeletionTimestamp == nil {
return
}
node, err := nc.nodeLister.Get(pod.Spec.NodeName)
// if there is no such node, do nothing and let the podGC clean it up.
if errors.IsNotFound(err) {
return
}
if err != nil {
// this can only happen if the Store.KeyFunc has a problem creating
// a key for the pod. If it happens once, it will happen again so
// don't bother requeuing the pod.
utilruntime.HandleError(err)
return
}
// delete terminating pods that have been scheduled on
// nodes that do not support graceful termination
// TODO(mikedanese): this can be removed when we no longer
// guarantee backwards compatibility of master API to kubelets with
// versions less than 1.1.0
v, err := utilversion.ParseSemantic(node.Status.NodeInfo.KubeletVersion)
if err != nil {
glog.V(0).Infof("Couldn't parse version %q of node: %v", node.Status.NodeInfo.KubeletVersion, err)
utilruntime.HandleError(nc.forcefullyDeletePod(pod))
return
}
if v.LessThan(gracefulDeletionVersion) {
utilruntime.HandleError(nc.forcefullyDeletePod(pod))
return
}
}
// update ready status of all pods running on given node from master
// return true if success
func markAllPodsNotReady(kubeClient clientset.Interface, node *v1.Node) error {
// MarkAllPodsNotReady updates ready status of all pods running on
// given node from master return true if success
func MarkAllPodsNotReady(kubeClient clientset.Interface, node *v1.Node) error {
// Don't set pods to NotReady if the kubelet is running a version that
// doesn't understand how to correct readiness.
// TODO: Remove this check when we no longer guarantee backward compatibility
// with node versions < 1.2.0.
if nodeRunningOutdatedKubelet(node) {
if NodeRunningOutdatedKubelet(node) {
return nil
}
nodeName := node.Name
@ -233,11 +195,11 @@ func markAllPodsNotReady(kubeClient clientset.Interface, node *v1.Node) error {
return fmt.Errorf("%v", strings.Join(errMsg, "; "))
}
// nodeRunningOutdatedKubelet returns true if the kubeletVersion reported
// NodeRunningOutdatedKubelet returns true if the kubeletVersion reported
// in the nodeInfo of the given node is "outdated", meaning < 1.2.0.
// Older versions were inflexible and modifying pod.Status directly through
// the apiserver would result in unexpected outcomes.
func nodeRunningOutdatedKubelet(node *v1.Node) bool {
func NodeRunningOutdatedKubelet(node *v1.Node) bool {
v, err := utilversion.ParseSemantic(node.Status.NodeInfo.KubeletVersion)
if err != nil {
glog.Errorf("couldn't parse version %q of node %v", node.Status.NodeInfo.KubeletVersion, err)
@ -250,7 +212,9 @@ func nodeRunningOutdatedKubelet(node *v1.Node) bool {
return false
}
func nodeExistsInCloudProvider(cloud cloudprovider.Interface, nodeName types.NodeName) (bool, error) {
// NodeExistsInCloudProvider returns true if the node exists in the
// cloud provider.
func NodeExistsInCloudProvider(cloud cloudprovider.Interface, nodeName types.NodeName) (bool, error) {
instances, ok := cloud.Instances()
if !ok {
return false, fmt.Errorf("%v", ErrCloudInstance)
@ -264,7 +228,8 @@ func nodeExistsInCloudProvider(cloud cloudprovider.Interface, nodeName types.Nod
return true, nil
}
func recordNodeEvent(recorder record.EventRecorder, nodeName, nodeUID, eventtype, reason, event string) {
// RecordNodeEvent records a event related to a node.
func RecordNodeEvent(recorder record.EventRecorder, nodeName, nodeUID, eventtype, reason, event string) {
ref := &v1.ObjectReference{
Kind: "Node",
Name: nodeName,
@ -275,21 +240,23 @@ func recordNodeEvent(recorder record.EventRecorder, nodeName, nodeUID, eventtype
recorder.Eventf(ref, eventtype, reason, "Node %s event: %s", nodeName, event)
}
func recordNodeStatusChange(recorder record.EventRecorder, node *v1.Node, new_status string) {
// RecordNodeStatusChange records a event related to a node status change.
func RecordNodeStatusChange(recorder record.EventRecorder, node *v1.Node, newStatus string) {
ref := &v1.ObjectReference{
Kind: "Node",
Name: node.Name,
UID: node.UID,
Namespace: "",
}
glog.V(2).Infof("Recording status change %s event message for node %s", new_status, node.Name)
glog.V(2).Infof("Recording status change %s event message for node %s", newStatus, node.Name)
// TODO: This requires a transaction, either both node status is updated
// and event is recorded or neither should happen, see issue #6055.
recorder.Eventf(ref, v1.EventTypeNormal, new_status, "Node %s status is now: %s", node.Name, new_status)
recorder.Eventf(ref, v1.EventTypeNormal, newStatus, "Node %s status is now: %s", node.Name, newStatus)
}
// Returns true in case of success and false otherwise
func swapNodeControllerTaint(kubeClient clientset.Interface, taintToAdd, taintToRemove *v1.Taint, node *v1.Node) bool {
// SwapNodeControllerTaint returns true in case of success and false
// otherwise.
func SwapNodeControllerTaint(kubeClient clientset.Interface, taintToAdd, taintToRemove *v1.Taint, node *v1.Node) bool {
taintToAdd.TimeAdded = metav1.Now()
err := controller.AddOrUpdateTaintOnNode(kubeClient, node.Name, taintToAdd)
if err != nil {
@ -317,7 +284,8 @@ func swapNodeControllerTaint(kubeClient clientset.Interface, taintToAdd, taintTo
return true
}
func createAddNodeHandler(f func(node *v1.Node) error) func(obj interface{}) {
// CreateAddNodeHandler creates an add node handler.
func CreateAddNodeHandler(f func(node *v1.Node) error) func(obj interface{}) {
return func(originalObj interface{}) {
obj, err := scheme.Scheme.DeepCopy(originalObj)
if err != nil {
@ -332,7 +300,8 @@ func createAddNodeHandler(f func(node *v1.Node) error) func(obj interface{}) {
}
}
func createUpdateNodeHandler(f func(oldNode, newNode *v1.Node) error) func(oldObj, newObj interface{}) {
// CreateUpdateNodeHandler creates a node update handler.
func CreateUpdateNodeHandler(f func(oldNode, newNode *v1.Node) error) func(oldObj, newObj interface{}) {
return func(origOldObj, origNewObj interface{}) {
oldObj, err := scheme.Scheme.DeepCopy(origOldObj)
if err != nil {
@ -353,7 +322,8 @@ func createUpdateNodeHandler(f func(oldNode, newNode *v1.Node) error) func(oldOb
}
}
func createDeleteNodeHandler(f func(node *v1.Node) error) func(obj interface{}) {
// CreateDeleteNodeHandler creates a delete node handler.
func CreateDeleteNodeHandler(f func(node *v1.Node) error) func(obj interface{}) {
return func(originalObj interface{}) {
obj, err := scheme.Scheme.DeepCopy(originalObj)
if err != nil {