mirror of
https://github.com/k3s-io/kubernetes.git
synced 2025-07-22 19:31:44 +00:00
commit
68909b5683
@ -16,6 +16,12 @@ limitations under the License.
|
|||||||
|
|
||||||
package kubelet
|
package kubelet
|
||||||
|
|
||||||
|
import "errors"
|
||||||
|
|
||||||
const (
|
const (
|
||||||
NetworkNotReadyErrorMsg = "network is not ready"
|
NetworkNotReadyErrorMsg = "network is not ready"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
var (
|
||||||
|
ErrNetworkUnknown = errors.New("network state unknown")
|
||||||
|
)
|
||||||
|
@ -1563,9 +1563,9 @@ func (kl *Kubelet) syncPod(o syncPodOptions) error {
|
|||||||
}
|
}
|
||||||
|
|
||||||
// If the network plugin is not ready, only start the pod if it uses the host network
|
// If the network plugin is not ready, only start the pod if it uses the host network
|
||||||
if rs := kl.runtimeState.networkErrors(); len(rs) != 0 && !kubecontainer.IsHostNetworkPod(pod) {
|
if err := kl.runtimeState.networkErrors(); err != nil && !kubecontainer.IsHostNetworkPod(pod) {
|
||||||
kl.recorder.Eventf(pod, v1.EventTypeWarning, events.NetworkNotReady, "%s: %v", NetworkNotReadyErrorMsg, rs)
|
kl.recorder.Eventf(pod, v1.EventTypeWarning, events.NetworkNotReady, "%s: %v", NetworkNotReadyErrorMsg, err)
|
||||||
return fmt.Errorf("%s: %v", NetworkNotReadyErrorMsg, rs)
|
return fmt.Errorf("%s: %v", NetworkNotReadyErrorMsg, err)
|
||||||
}
|
}
|
||||||
|
|
||||||
// Create Cgroups for the pod and apply resource parameters
|
// Create Cgroups for the pod and apply resource parameters
|
||||||
@ -1820,8 +1820,8 @@ func (kl *Kubelet) syncLoop(updates <-chan kubetypes.PodUpdate, handler SyncHand
|
|||||||
)
|
)
|
||||||
duration := base
|
duration := base
|
||||||
for {
|
for {
|
||||||
if rs := kl.runtimeState.runtimeErrors(); len(rs) != 0 {
|
if err := kl.runtimeState.runtimeErrors(); err != nil {
|
||||||
klog.Infof("skipping pod synchronization - %v", rs)
|
klog.Infof("skipping pod synchronization - %v", err)
|
||||||
// exponential backoff
|
// exponential backoff
|
||||||
time.Sleep(duration)
|
time.Sleep(duration)
|
||||||
duration = time.Duration(math.Min(float64(max), factor*float64(duration)))
|
duration = time.Duration(math.Min(float64(max), factor*float64(duration)))
|
||||||
|
@ -18,6 +18,7 @@ go_library(
|
|||||||
"//staging/src/k8s.io/api/core/v1:go_default_library",
|
"//staging/src/k8s.io/api/core/v1:go_default_library",
|
||||||
"//staging/src/k8s.io/apimachinery/pkg/api/resource:go_default_library",
|
"//staging/src/k8s.io/apimachinery/pkg/api/resource:go_default_library",
|
||||||
"//staging/src/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library",
|
"//staging/src/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library",
|
||||||
|
"//staging/src/k8s.io/apimachinery/pkg/util/errors:go_default_library",
|
||||||
"//staging/src/k8s.io/apimachinery/pkg/util/net:go_default_library",
|
"//staging/src/k8s.io/apimachinery/pkg/util/net:go_default_library",
|
||||||
"//staging/src/k8s.io/apiserver/pkg/util/feature:go_default_library",
|
"//staging/src/k8s.io/apiserver/pkg/util/feature:go_default_library",
|
||||||
"//staging/src/k8s.io/cloud-provider:go_default_library",
|
"//staging/src/k8s.io/cloud-provider:go_default_library",
|
||||||
|
@ -29,6 +29,7 @@ import (
|
|||||||
"k8s.io/api/core/v1"
|
"k8s.io/api/core/v1"
|
||||||
"k8s.io/apimachinery/pkg/api/resource"
|
"k8s.io/apimachinery/pkg/api/resource"
|
||||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||||
|
"k8s.io/apimachinery/pkg/util/errors"
|
||||||
utilnet "k8s.io/apimachinery/pkg/util/net"
|
utilnet "k8s.io/apimachinery/pkg/util/net"
|
||||||
utilfeature "k8s.io/apiserver/pkg/util/feature"
|
utilfeature "k8s.io/apiserver/pkg/util/feature"
|
||||||
cloudprovider "k8s.io/cloud-provider"
|
cloudprovider "k8s.io/cloud-provider"
|
||||||
@ -438,8 +439,8 @@ func GoRuntime() Setter {
|
|||||||
// ReadyCondition returns a Setter that updates the v1.NodeReady condition on the node.
|
// ReadyCondition returns a Setter that updates the v1.NodeReady condition on the node.
|
||||||
func ReadyCondition(
|
func ReadyCondition(
|
||||||
nowFunc func() time.Time, // typically Kubelet.clock.Now
|
nowFunc func() time.Time, // typically Kubelet.clock.Now
|
||||||
runtimeErrorsFunc func() []string, // typically Kubelet.runtimeState.runtimeErrors
|
runtimeErrorsFunc func() error, // typically Kubelet.runtimeState.runtimeErrors
|
||||||
networkErrorsFunc func() []string, // typically Kubelet.runtimeState.networkErrors
|
networkErrorsFunc func() error, // typically Kubelet.runtimeState.networkErrors
|
||||||
appArmorValidateHostFunc func() error, // typically Kubelet.appArmorValidator.ValidateHost, might be nil depending on whether there was an appArmorValidator
|
appArmorValidateHostFunc func() error, // typically Kubelet.appArmorValidator.ValidateHost, might be nil depending on whether there was an appArmorValidator
|
||||||
cmStatusFunc func() cm.Status, // typically Kubelet.containerManager.Status
|
cmStatusFunc func() cm.Status, // typically Kubelet.containerManager.Status
|
||||||
recordEventFunc func(eventType, event string), // typically Kubelet.recordNodeStatusEvent
|
recordEventFunc func(eventType, event string), // typically Kubelet.recordNodeStatusEvent
|
||||||
@ -456,7 +457,7 @@ func ReadyCondition(
|
|||||||
Message: "kubelet is posting ready status",
|
Message: "kubelet is posting ready status",
|
||||||
LastHeartbeatTime: currentTime,
|
LastHeartbeatTime: currentTime,
|
||||||
}
|
}
|
||||||
rs := append(runtimeErrorsFunc(), networkErrorsFunc()...)
|
errs := []error{runtimeErrorsFunc(), networkErrorsFunc()}
|
||||||
requiredCapacities := []v1.ResourceName{v1.ResourceCPU, v1.ResourceMemory, v1.ResourcePods}
|
requiredCapacities := []v1.ResourceName{v1.ResourceCPU, v1.ResourceMemory, v1.ResourcePods}
|
||||||
if utilfeature.DefaultFeatureGate.Enabled(features.LocalStorageCapacityIsolation) {
|
if utilfeature.DefaultFeatureGate.Enabled(features.LocalStorageCapacityIsolation) {
|
||||||
requiredCapacities = append(requiredCapacities, v1.ResourceEphemeralStorage)
|
requiredCapacities = append(requiredCapacities, v1.ResourceEphemeralStorage)
|
||||||
@ -468,14 +469,14 @@ func ReadyCondition(
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
if len(missingCapacities) > 0 {
|
if len(missingCapacities) > 0 {
|
||||||
rs = append(rs, fmt.Sprintf("Missing node capacity for resources: %s", strings.Join(missingCapacities, ", ")))
|
errs = append(errs, fmt.Errorf("Missing node capacity for resources: %s", strings.Join(missingCapacities, ", ")))
|
||||||
}
|
}
|
||||||
if len(rs) > 0 {
|
if aggregatedErr := errors.NewAggregate(errs); aggregatedErr != nil {
|
||||||
newNodeReadyCondition = v1.NodeCondition{
|
newNodeReadyCondition = v1.NodeCondition{
|
||||||
Type: v1.NodeReady,
|
Type: v1.NodeReady,
|
||||||
Status: v1.ConditionFalse,
|
Status: v1.ConditionFalse,
|
||||||
Reason: "KubeletNotReady",
|
Reason: "KubeletNotReady",
|
||||||
Message: strings.Join(rs, ","),
|
Message: aggregatedErr.Error(),
|
||||||
LastHeartbeatTime: currentTime,
|
LastHeartbeatTime: currentTime,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -17,6 +17,7 @@ limitations under the License.
|
|||||||
package nodestatus
|
package nodestatus
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"errors"
|
||||||
"fmt"
|
"fmt"
|
||||||
"net"
|
"net"
|
||||||
"sort"
|
"sort"
|
||||||
@ -890,8 +891,8 @@ func TestReadyCondition(t *testing.T) {
|
|||||||
cases := []struct {
|
cases := []struct {
|
||||||
desc string
|
desc string
|
||||||
node *v1.Node
|
node *v1.Node
|
||||||
runtimeErrors []string
|
runtimeErrors error
|
||||||
networkErrors []string
|
networkErrors error
|
||||||
appArmorValidateHostFunc func() error
|
appArmorValidateHostFunc func() error
|
||||||
cmStatus cm.Status
|
cmStatus cm.Status
|
||||||
expectConditions []v1.NodeCondition
|
expectConditions []v1.NodeCondition
|
||||||
@ -926,24 +927,12 @@ func TestReadyCondition(t *testing.T) {
|
|||||||
},
|
},
|
||||||
expectConditions: []v1.NodeCondition{*makeReadyCondition(true, "kubelet is posting ready status. WARNING: foo", now, now)},
|
expectConditions: []v1.NodeCondition{*makeReadyCondition(true, "kubelet is posting ready status. WARNING: foo", now, now)},
|
||||||
},
|
},
|
||||||
{
|
|
||||||
desc: "new, not ready: runtime errors",
|
|
||||||
node: withCapacity.DeepCopy(),
|
|
||||||
runtimeErrors: []string{"foo", "bar"},
|
|
||||||
expectConditions: []v1.NodeCondition{*makeReadyCondition(false, "foo,bar", now, now)},
|
|
||||||
},
|
|
||||||
{
|
|
||||||
desc: "new, not ready: network errors",
|
|
||||||
node: withCapacity.DeepCopy(),
|
|
||||||
networkErrors: []string{"foo", "bar"},
|
|
||||||
expectConditions: []v1.NodeCondition{*makeReadyCondition(false, "foo,bar", now, now)},
|
|
||||||
},
|
|
||||||
{
|
{
|
||||||
desc: "new, not ready: runtime and network errors",
|
desc: "new, not ready: runtime and network errors",
|
||||||
node: withCapacity.DeepCopy(),
|
node: withCapacity.DeepCopy(),
|
||||||
runtimeErrors: []string{"runtime"},
|
runtimeErrors: errors.New("runtime"),
|
||||||
networkErrors: []string{"network"},
|
networkErrors: errors.New("network"),
|
||||||
expectConditions: []v1.NodeCondition{*makeReadyCondition(false, "runtime,network", now, now)},
|
expectConditions: []v1.NodeCondition{*makeReadyCondition(false, "[runtime, network]", now, now)},
|
||||||
},
|
},
|
||||||
{
|
{
|
||||||
desc: "new, not ready: missing capacities",
|
desc: "new, not ready: missing capacities",
|
||||||
@ -973,7 +962,7 @@ func TestReadyCondition(t *testing.T) {
|
|||||||
node.Status.Conditions = []v1.NodeCondition{*makeReadyCondition(true, "", before, before)}
|
node.Status.Conditions = []v1.NodeCondition{*makeReadyCondition(true, "", before, before)}
|
||||||
return node
|
return node
|
||||||
}(),
|
}(),
|
||||||
runtimeErrors: []string{"foo"},
|
runtimeErrors: errors.New("foo"),
|
||||||
expectConditions: []v1.NodeCondition{*makeReadyCondition(false, "foo", now, now)},
|
expectConditions: []v1.NodeCondition{*makeReadyCondition(false, "foo", now, now)},
|
||||||
expectEvents: []testEvent{
|
expectEvents: []testEvent{
|
||||||
{
|
{
|
||||||
@ -999,17 +988,17 @@ func TestReadyCondition(t *testing.T) {
|
|||||||
node.Status.Conditions = []v1.NodeCondition{*makeReadyCondition(false, "", before, before)}
|
node.Status.Conditions = []v1.NodeCondition{*makeReadyCondition(false, "", before, before)}
|
||||||
return node
|
return node
|
||||||
}(),
|
}(),
|
||||||
runtimeErrors: []string{"foo"},
|
runtimeErrors: errors.New("foo"),
|
||||||
expectConditions: []v1.NodeCondition{*makeReadyCondition(false, "foo", before, now)},
|
expectConditions: []v1.NodeCondition{*makeReadyCondition(false, "foo", before, now)},
|
||||||
expectEvents: []testEvent{},
|
expectEvents: []testEvent{},
|
||||||
},
|
},
|
||||||
}
|
}
|
||||||
for _, tc := range cases {
|
for _, tc := range cases {
|
||||||
t.Run(tc.desc, func(t *testing.T) {
|
t.Run(tc.desc, func(t *testing.T) {
|
||||||
runtimeErrorsFunc := func() []string {
|
runtimeErrorsFunc := func() error {
|
||||||
return tc.runtimeErrors
|
return tc.runtimeErrors
|
||||||
}
|
}
|
||||||
networkErrorsFunc := func() []string {
|
networkErrorsFunc := func() error {
|
||||||
return tc.networkErrors
|
return tc.networkErrors
|
||||||
}
|
}
|
||||||
cmStatusFunc := func() cm.Status {
|
cmStatusFunc := func() cm.Status {
|
||||||
|
@ -17,9 +17,12 @@ limitations under the License.
|
|||||||
package kubelet
|
package kubelet
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"errors"
|
||||||
"fmt"
|
"fmt"
|
||||||
"sync"
|
"sync"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
|
utilerrors "k8s.io/apimachinery/pkg/util/errors"
|
||||||
)
|
)
|
||||||
|
|
||||||
type runtimeState struct {
|
type runtimeState struct {
|
||||||
@ -70,32 +73,32 @@ func (s *runtimeState) podCIDR() string {
|
|||||||
return s.cidr
|
return s.cidr
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *runtimeState) runtimeErrors() []string {
|
func (s *runtimeState) runtimeErrors() error {
|
||||||
s.RLock()
|
s.RLock()
|
||||||
defer s.RUnlock()
|
defer s.RUnlock()
|
||||||
var ret []string
|
errs := []error{}
|
||||||
if s.lastBaseRuntimeSync.IsZero() {
|
if s.lastBaseRuntimeSync.IsZero() {
|
||||||
ret = append(ret, "container runtime status check may not have completed yet")
|
errs = append(errs, errors.New("container runtime status check may not have completed yet."))
|
||||||
} else if !s.lastBaseRuntimeSync.Add(s.baseRuntimeSyncThreshold).After(time.Now()) {
|
} else if !s.lastBaseRuntimeSync.Add(s.baseRuntimeSyncThreshold).After(time.Now()) {
|
||||||
ret = append(ret, "container runtime is down")
|
errs = append(errs, errors.New("container runtime is down."))
|
||||||
}
|
}
|
||||||
for _, hc := range s.healthChecks {
|
for _, hc := range s.healthChecks {
|
||||||
if ok, err := hc.fn(); !ok {
|
if ok, err := hc.fn(); !ok {
|
||||||
ret = append(ret, fmt.Sprintf("%s is not healthy: %v", hc.name, err))
|
errs = append(errs, fmt.Errorf("%s is not healthy: %v.", hc.name, err))
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
return ret
|
return utilerrors.NewAggregate(errs)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *runtimeState) networkErrors() []string {
|
func (s *runtimeState) networkErrors() error {
|
||||||
s.RLock()
|
s.RLock()
|
||||||
defer s.RUnlock()
|
defer s.RUnlock()
|
||||||
var ret []string
|
errs := []error{}
|
||||||
if s.networkError != nil {
|
if s.networkError != nil {
|
||||||
ret = append(ret, s.networkError.Error())
|
errs = append(errs, s.networkError)
|
||||||
}
|
}
|
||||||
return ret
|
return utilerrors.NewAggregate(errs)
|
||||||
}
|
}
|
||||||
|
|
||||||
func newRuntimeState(
|
func newRuntimeState(
|
||||||
@ -104,6 +107,6 @@ func newRuntimeState(
|
|||||||
return &runtimeState{
|
return &runtimeState{
|
||||||
lastBaseRuntimeSync: time.Time{},
|
lastBaseRuntimeSync: time.Time{},
|
||||||
baseRuntimeSyncThreshold: runtimeSyncThreshold,
|
baseRuntimeSyncThreshold: runtimeSyncThreshold,
|
||||||
networkError: fmt.Errorf("network state unknown"),
|
networkError: ErrNetworkUnknown,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -89,7 +89,7 @@ type Reconciler interface {
|
|||||||
// safely (prevents more than one operation from being triggered on the same
|
// safely (prevents more than one operation from being triggered on the same
|
||||||
// volume)
|
// volume)
|
||||||
// mounter - mounter passed in from kubelet, passed down unmount path
|
// mounter - mounter passed in from kubelet, passed down unmount path
|
||||||
// volumePluginMrg - volume plugin manager passed from kubelet
|
// volumePluginMgr - volume plugin manager passed from kubelet
|
||||||
func NewReconciler(
|
func NewReconciler(
|
||||||
kubeClient clientset.Interface,
|
kubeClient clientset.Interface,
|
||||||
controllerAttachDetachEnabled bool,
|
controllerAttachDetachEnabled bool,
|
||||||
|
Loading…
Reference in New Issue
Block a user