mirror of
https://github.com/k3s-io/kubernetes.git
synced 2025-07-22 19:31:44 +00:00
Merge pull request #73556 from msau42/triage-72931
Mark volume as in use even when node status didn't change
This commit is contained in:
commit
a684bd5eb1
@ -426,6 +426,23 @@ func (kl *Kubelet) tryUpdateNodeStatus(tryNumber int) error {
|
|||||||
now := kl.clock.Now()
|
now := kl.clock.Now()
|
||||||
if utilfeature.DefaultFeatureGate.Enabled(features.NodeLease) && now.Before(kl.lastStatusReportTime.Add(kl.nodeStatusReportFrequency)) {
|
if utilfeature.DefaultFeatureGate.Enabled(features.NodeLease) && now.Before(kl.lastStatusReportTime.Add(kl.nodeStatusReportFrequency)) {
|
||||||
if !podCIDRChanged && !nodeStatusHasChanged(&originalNode.Status, &node.Status) {
|
if !podCIDRChanged && !nodeStatusHasChanged(&originalNode.Status, &node.Status) {
|
||||||
|
// We must mark the volumes as ReportedInUse in volume manager's dsw even
|
||||||
|
// if no changes were made to the node status (no volumes were added or removed
|
||||||
|
// from the VolumesInUse list).
|
||||||
|
//
|
||||||
|
// The reason is that on a kubelet restart, the volume manager's dsw is
|
||||||
|
// repopulated and the volume ReportedInUse is initialized to false, while the
|
||||||
|
// VolumesInUse list from the Node object still contains the state from the
|
||||||
|
// previous kubelet instantiation.
|
||||||
|
//
|
||||||
|
// Once the volumes are added to the dsw, the ReportedInUse field needs to be
|
||||||
|
// synced from the VolumesInUse list in the Node.Status.
|
||||||
|
//
|
||||||
|
// The MarkVolumesAsReportedInUse() call cannot be performed in dsw directly
|
||||||
|
// because it does not have access to the Node object.
|
||||||
|
// This also cannot be populated on node status manager init because the volume
|
||||||
|
// may not have been added to dsw at that time.
|
||||||
|
kl.volumeManager.MarkVolumesAsReportedInUse(node.Status.VolumesInUse)
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -54,6 +54,7 @@ import (
|
|||||||
kubecontainer "k8s.io/kubernetes/pkg/kubelet/container"
|
kubecontainer "k8s.io/kubernetes/pkg/kubelet/container"
|
||||||
"k8s.io/kubernetes/pkg/kubelet/nodestatus"
|
"k8s.io/kubernetes/pkg/kubelet/nodestatus"
|
||||||
"k8s.io/kubernetes/pkg/kubelet/util/sliceutils"
|
"k8s.io/kubernetes/pkg/kubelet/util/sliceutils"
|
||||||
|
kubeletvolume "k8s.io/kubernetes/pkg/kubelet/volumemanager"
|
||||||
schedulerapi "k8s.io/kubernetes/pkg/scheduler/api"
|
schedulerapi "k8s.io/kubernetes/pkg/scheduler/api"
|
||||||
taintutil "k8s.io/kubernetes/pkg/util/taints"
|
taintutil "k8s.io/kubernetes/pkg/util/taints"
|
||||||
"k8s.io/kubernetes/pkg/version"
|
"k8s.io/kubernetes/pkg/version"
|
||||||
@ -1028,6 +1029,213 @@ func TestUpdateNodeStatusWithLease(t *testing.T) {
|
|||||||
assert.IsType(t, core.GetActionImpl{}, actions[9])
|
assert.IsType(t, core.GetActionImpl{}, actions[9])
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func TestUpdateNodeStatusAndVolumesInUseWithoutNodeLease(t *testing.T) {
|
||||||
|
defer utilfeaturetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.NodeLease, false)()
|
||||||
|
|
||||||
|
cases := []struct {
|
||||||
|
desc string
|
||||||
|
existingVolumes []v1.UniqueVolumeName // volumes to initially populate volumeManager
|
||||||
|
existingNode *v1.Node // existing node object
|
||||||
|
expectedNode *v1.Node // new node object after patch
|
||||||
|
expectedReportedInUse []v1.UniqueVolumeName // expected volumes reported in use in volumeManager
|
||||||
|
}{
|
||||||
|
{
|
||||||
|
desc: "no volumes and no update",
|
||||||
|
existingNode: &v1.Node{ObjectMeta: metav1.ObjectMeta{Name: testKubeletHostname}},
|
||||||
|
expectedNode: &v1.Node{ObjectMeta: metav1.ObjectMeta{Name: testKubeletHostname}},
|
||||||
|
},
|
||||||
|
{
|
||||||
|
desc: "volumes inuse on node and volumeManager",
|
||||||
|
existingVolumes: []v1.UniqueVolumeName{"vol1"},
|
||||||
|
existingNode: &v1.Node{
|
||||||
|
ObjectMeta: metav1.ObjectMeta{Name: testKubeletHostname},
|
||||||
|
Status: v1.NodeStatus{
|
||||||
|
VolumesInUse: []v1.UniqueVolumeName{"vol1"},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
expectedNode: &v1.Node{
|
||||||
|
ObjectMeta: metav1.ObjectMeta{Name: testKubeletHostname},
|
||||||
|
Status: v1.NodeStatus{
|
||||||
|
VolumesInUse: []v1.UniqueVolumeName{"vol1"},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
expectedReportedInUse: []v1.UniqueVolumeName{"vol1"},
|
||||||
|
},
|
||||||
|
{
|
||||||
|
desc: "volumes inuse on node but not in volumeManager",
|
||||||
|
existingNode: &v1.Node{
|
||||||
|
ObjectMeta: metav1.ObjectMeta{Name: testKubeletHostname},
|
||||||
|
Status: v1.NodeStatus{
|
||||||
|
VolumesInUse: []v1.UniqueVolumeName{"vol1"},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
expectedNode: &v1.Node{ObjectMeta: metav1.ObjectMeta{Name: testKubeletHostname}},
|
||||||
|
},
|
||||||
|
{
|
||||||
|
desc: "volumes inuse in volumeManager but not on node",
|
||||||
|
existingVolumes: []v1.UniqueVolumeName{"vol1"},
|
||||||
|
existingNode: &v1.Node{ObjectMeta: metav1.ObjectMeta{Name: testKubeletHostname}},
|
||||||
|
expectedNode: &v1.Node{
|
||||||
|
ObjectMeta: metav1.ObjectMeta{Name: testKubeletHostname},
|
||||||
|
Status: v1.NodeStatus{
|
||||||
|
VolumesInUse: []v1.UniqueVolumeName{"vol1"},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
expectedReportedInUse: []v1.UniqueVolumeName{"vol1"},
|
||||||
|
},
|
||||||
|
}
|
||||||
|
|
||||||
|
for _, tc := range cases {
|
||||||
|
t.Run(tc.desc, func(t *testing.T) {
|
||||||
|
// Setup
|
||||||
|
testKubelet := newTestKubelet(t, false /* controllerAttachDetachEnabled */)
|
||||||
|
defer testKubelet.Cleanup()
|
||||||
|
|
||||||
|
kubelet := testKubelet.kubelet
|
||||||
|
kubelet.kubeClient = nil // ensure only the heartbeat client is used
|
||||||
|
kubelet.containerManager = &localCM{ContainerManager: cm.NewStubContainerManager()}
|
||||||
|
kubelet.lastStatusReportTime = kubelet.clock.Now()
|
||||||
|
kubelet.nodeStatusReportFrequency = time.Hour
|
||||||
|
kubelet.machineInfo = &cadvisorapi.MachineInfo{}
|
||||||
|
|
||||||
|
// override test volumeManager
|
||||||
|
fakeVolumeManager := kubeletvolume.NewFakeVolumeManager(tc.existingVolumes)
|
||||||
|
kubelet.volumeManager = fakeVolumeManager
|
||||||
|
|
||||||
|
// Only test VolumesInUse setter
|
||||||
|
kubelet.setNodeStatusFuncs = []func(*v1.Node) error{
|
||||||
|
nodestatus.VolumesInUse(kubelet.volumeManager.ReconcilerStatesHasBeenSynced,
|
||||||
|
kubelet.volumeManager.GetVolumesInUse),
|
||||||
|
}
|
||||||
|
|
||||||
|
kubeClient := testKubelet.fakeKubeClient
|
||||||
|
kubeClient.ReactionChain = fake.NewSimpleClientset(&v1.NodeList{Items: []v1.Node{*tc.existingNode}}).ReactionChain
|
||||||
|
|
||||||
|
// Execute
|
||||||
|
assert.NoError(t, kubelet.updateNodeStatus())
|
||||||
|
|
||||||
|
// Validate
|
||||||
|
actions := kubeClient.Actions()
|
||||||
|
if tc.expectedNode != nil {
|
||||||
|
assert.Len(t, actions, 2)
|
||||||
|
assert.IsType(t, core.GetActionImpl{}, actions[0])
|
||||||
|
assert.IsType(t, core.PatchActionImpl{}, actions[1])
|
||||||
|
patchAction := actions[1].(core.PatchActionImpl)
|
||||||
|
|
||||||
|
updatedNode, err := applyNodeStatusPatch(tc.existingNode, patchAction.GetPatch())
|
||||||
|
require.NoError(t, err)
|
||||||
|
assert.True(t, apiequality.Semantic.DeepEqual(tc.expectedNode, updatedNode), "%s", diff.ObjectDiff(tc.expectedNode, updatedNode))
|
||||||
|
} else {
|
||||||
|
assert.Len(t, actions, 1)
|
||||||
|
assert.IsType(t, core.GetActionImpl{}, actions[0])
|
||||||
|
}
|
||||||
|
|
||||||
|
reportedInUse := fakeVolumeManager.GetVolumesReportedInUse()
|
||||||
|
assert.True(t, apiequality.Semantic.DeepEqual(tc.expectedReportedInUse, reportedInUse), "%s", diff.ObjectDiff(tc.expectedReportedInUse, reportedInUse))
|
||||||
|
})
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestUpdateNodeStatusAndVolumesInUseWithNodeLease(t *testing.T) {
|
||||||
|
defer utilfeaturetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.NodeLease, true)()
|
||||||
|
|
||||||
|
cases := []struct {
|
||||||
|
desc string
|
||||||
|
existingVolumes []v1.UniqueVolumeName // volumes to initially populate volumeManager
|
||||||
|
existingNode *v1.Node // existing node object
|
||||||
|
expectedNode *v1.Node // new node object after patch
|
||||||
|
expectedReportedInUse []v1.UniqueVolumeName // expected volumes reported in use in volumeManager
|
||||||
|
}{
|
||||||
|
{
|
||||||
|
desc: "no volumes and no update",
|
||||||
|
existingNode: &v1.Node{ObjectMeta: metav1.ObjectMeta{Name: testKubeletHostname}},
|
||||||
|
},
|
||||||
|
{
|
||||||
|
desc: "volumes inuse on node and volumeManager",
|
||||||
|
existingVolumes: []v1.UniqueVolumeName{"vol1"},
|
||||||
|
existingNode: &v1.Node{
|
||||||
|
ObjectMeta: metav1.ObjectMeta{Name: testKubeletHostname},
|
||||||
|
Status: v1.NodeStatus{
|
||||||
|
VolumesInUse: []v1.UniqueVolumeName{"vol1"},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
expectedReportedInUse: []v1.UniqueVolumeName{"vol1"},
|
||||||
|
},
|
||||||
|
{
|
||||||
|
desc: "volumes inuse on node but not in volumeManager",
|
||||||
|
existingNode: &v1.Node{
|
||||||
|
ObjectMeta: metav1.ObjectMeta{Name: testKubeletHostname},
|
||||||
|
Status: v1.NodeStatus{
|
||||||
|
VolumesInUse: []v1.UniqueVolumeName{"vol1"},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
expectedNode: &v1.Node{ObjectMeta: metav1.ObjectMeta{Name: testKubeletHostname}},
|
||||||
|
},
|
||||||
|
{
|
||||||
|
desc: "volumes inuse in volumeManager but not on node",
|
||||||
|
existingVolumes: []v1.UniqueVolumeName{"vol1"},
|
||||||
|
existingNode: &v1.Node{ObjectMeta: metav1.ObjectMeta{Name: testKubeletHostname}},
|
||||||
|
expectedNode: &v1.Node{
|
||||||
|
ObjectMeta: metav1.ObjectMeta{Name: testKubeletHostname},
|
||||||
|
Status: v1.NodeStatus{
|
||||||
|
VolumesInUse: []v1.UniqueVolumeName{"vol1"},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
expectedReportedInUse: []v1.UniqueVolumeName{"vol1"},
|
||||||
|
},
|
||||||
|
}
|
||||||
|
|
||||||
|
for _, tc := range cases {
|
||||||
|
t.Run(tc.desc, func(t *testing.T) {
|
||||||
|
// Setup
|
||||||
|
testKubelet := newTestKubelet(t, false /* controllerAttachDetachEnabled */)
|
||||||
|
defer testKubelet.Cleanup()
|
||||||
|
|
||||||
|
kubelet := testKubelet.kubelet
|
||||||
|
kubelet.kubeClient = nil // ensure only the heartbeat client is used
|
||||||
|
kubelet.containerManager = &localCM{ContainerManager: cm.NewStubContainerManager()}
|
||||||
|
kubelet.lastStatusReportTime = kubelet.clock.Now()
|
||||||
|
kubelet.nodeStatusReportFrequency = time.Hour
|
||||||
|
kubelet.machineInfo = &cadvisorapi.MachineInfo{}
|
||||||
|
|
||||||
|
// override test volumeManager
|
||||||
|
fakeVolumeManager := kubeletvolume.NewFakeVolumeManager(tc.existingVolumes)
|
||||||
|
kubelet.volumeManager = fakeVolumeManager
|
||||||
|
|
||||||
|
// Only test VolumesInUse setter
|
||||||
|
kubelet.setNodeStatusFuncs = []func(*v1.Node) error{
|
||||||
|
nodestatus.VolumesInUse(kubelet.volumeManager.ReconcilerStatesHasBeenSynced,
|
||||||
|
kubelet.volumeManager.GetVolumesInUse),
|
||||||
|
}
|
||||||
|
|
||||||
|
kubeClient := testKubelet.fakeKubeClient
|
||||||
|
kubeClient.ReactionChain = fake.NewSimpleClientset(&v1.NodeList{Items: []v1.Node{*tc.existingNode}}).ReactionChain
|
||||||
|
|
||||||
|
// Execute
|
||||||
|
assert.NoError(t, kubelet.updateNodeStatus())
|
||||||
|
|
||||||
|
// Validate
|
||||||
|
actions := kubeClient.Actions()
|
||||||
|
if tc.expectedNode != nil {
|
||||||
|
assert.Len(t, actions, 2)
|
||||||
|
assert.IsType(t, core.GetActionImpl{}, actions[0])
|
||||||
|
assert.IsType(t, core.PatchActionImpl{}, actions[1])
|
||||||
|
patchAction := actions[1].(core.PatchActionImpl)
|
||||||
|
|
||||||
|
updatedNode, err := applyNodeStatusPatch(tc.existingNode, patchAction.GetPatch())
|
||||||
|
require.NoError(t, err)
|
||||||
|
assert.True(t, apiequality.Semantic.DeepEqual(tc.expectedNode, updatedNode), "%s", diff.ObjectDiff(tc.expectedNode, updatedNode))
|
||||||
|
} else {
|
||||||
|
assert.Len(t, actions, 1)
|
||||||
|
assert.IsType(t, core.GetActionImpl{}, actions[0])
|
||||||
|
}
|
||||||
|
|
||||||
|
reportedInUse := fakeVolumeManager.GetVolumesReportedInUse()
|
||||||
|
assert.True(t, apiequality.Semantic.DeepEqual(tc.expectedReportedInUse, reportedInUse), "%s", diff.ObjectDiff(tc.expectedReportedInUse, reportedInUse))
|
||||||
|
})
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
func TestRegisterWithApiServer(t *testing.T) {
|
func TestRegisterWithApiServer(t *testing.T) {
|
||||||
testKubelet := newTestKubelet(t, false /* controllerAttachDetachEnabled */)
|
testKubelet := newTestKubelet(t, false /* controllerAttachDetachEnabled */)
|
||||||
defer testKubelet.Cleanup()
|
defer testKubelet.Cleanup()
|
||||||
|
@ -8,7 +8,10 @@ load(
|
|||||||
|
|
||||||
go_library(
|
go_library(
|
||||||
name = "go_default_library",
|
name = "go_default_library",
|
||||||
srcs = ["volume_manager.go"],
|
srcs = [
|
||||||
|
"volume_manager.go",
|
||||||
|
"volume_manager_fake.go",
|
||||||
|
],
|
||||||
importpath = "k8s.io/kubernetes/pkg/kubelet/volumemanager",
|
importpath = "k8s.io/kubernetes/pkg/kubelet/volumemanager",
|
||||||
deps = [
|
deps = [
|
||||||
"//pkg/kubelet/config:go_default_library",
|
"//pkg/kubelet/config:go_default_library",
|
||||||
|
99
pkg/kubelet/volumemanager/volume_manager_fake.go
Normal file
99
pkg/kubelet/volumemanager/volume_manager_fake.go
Normal file
@ -0,0 +1,99 @@
|
|||||||
|
/*
|
||||||
|
Copyright 2019 The Kubernetes Authors.
|
||||||
|
|
||||||
|
Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
|
you may not use this file except in compliance with the License.
|
||||||
|
You may obtain a copy of the License at
|
||||||
|
|
||||||
|
http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
|
||||||
|
Unless required by applicable law or agreed to in writing, software
|
||||||
|
distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
See the License for the specific language governing permissions and
|
||||||
|
limitations under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package volumemanager
|
||||||
|
|
||||||
|
import (
|
||||||
|
"k8s.io/api/core/v1"
|
||||||
|
"k8s.io/kubernetes/pkg/kubelet/config"
|
||||||
|
"k8s.io/kubernetes/pkg/kubelet/container"
|
||||||
|
"k8s.io/kubernetes/pkg/volume/util/types"
|
||||||
|
)
|
||||||
|
|
||||||
|
// FakeVolumeManager is a test implementation that just tracks calls
|
||||||
|
type FakeVolumeManager struct {
|
||||||
|
volumes map[v1.UniqueVolumeName]bool
|
||||||
|
reportedInUse map[v1.UniqueVolumeName]bool
|
||||||
|
}
|
||||||
|
|
||||||
|
// NewFakeVolumeManager creates a new VolumeManager test instance
|
||||||
|
func NewFakeVolumeManager(initialVolumes []v1.UniqueVolumeName) *FakeVolumeManager {
|
||||||
|
volumes := map[v1.UniqueVolumeName]bool{}
|
||||||
|
for _, v := range initialVolumes {
|
||||||
|
volumes[v] = true
|
||||||
|
}
|
||||||
|
return &FakeVolumeManager{
|
||||||
|
volumes: volumes,
|
||||||
|
reportedInUse: map[v1.UniqueVolumeName]bool{},
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Run is not implemented
|
||||||
|
func (f *FakeVolumeManager) Run(sourcesReady config.SourcesReady, stopCh <-chan struct{}) {
|
||||||
|
}
|
||||||
|
|
||||||
|
// WaitForAttachAndMount is not implemented
|
||||||
|
func (f *FakeVolumeManager) WaitForAttachAndMount(pod *v1.Pod) error {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// GetMountedVolumesForPod is not implemented
|
||||||
|
func (f *FakeVolumeManager) GetMountedVolumesForPod(podName types.UniquePodName) container.VolumeMap {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// GetExtraSupplementalGroupsForPod is not implemented
|
||||||
|
func (f *FakeVolumeManager) GetExtraSupplementalGroupsForPod(pod *v1.Pod) []int64 {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// GetVolumesInUse returns a list of the initial volumes
|
||||||
|
func (f *FakeVolumeManager) GetVolumesInUse() []v1.UniqueVolumeName {
|
||||||
|
inuse := []v1.UniqueVolumeName{}
|
||||||
|
for v := range f.volumes {
|
||||||
|
inuse = append(inuse, v)
|
||||||
|
}
|
||||||
|
return inuse
|
||||||
|
}
|
||||||
|
|
||||||
|
// ReconcilerStatesHasBeenSynced is not implemented
|
||||||
|
func (f *FakeVolumeManager) ReconcilerStatesHasBeenSynced() bool {
|
||||||
|
return true
|
||||||
|
}
|
||||||
|
|
||||||
|
// VolumeIsAttached is not implemented
|
||||||
|
func (f *FakeVolumeManager) VolumeIsAttached(volumeName v1.UniqueVolumeName) bool {
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
|
||||||
|
// MarkVolumesAsReportedInUse adds the given volumes to the reportedInUse map
|
||||||
|
func (f *FakeVolumeManager) MarkVolumesAsReportedInUse(volumesReportedAsInUse []v1.UniqueVolumeName) {
|
||||||
|
for _, reportedVolume := range volumesReportedAsInUse {
|
||||||
|
if _, ok := f.volumes[reportedVolume]; ok {
|
||||||
|
f.reportedInUse[reportedVolume] = true
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// GetVolumesReportedInUse is a test function only that returns a list of volumes
|
||||||
|
// from the reportedInUse map
|
||||||
|
func (f *FakeVolumeManager) GetVolumesReportedInUse() []v1.UniqueVolumeName {
|
||||||
|
inuse := []v1.UniqueVolumeName{}
|
||||||
|
for reportedVolume := range f.reportedInUse {
|
||||||
|
inuse = append(inuse, reportedVolume)
|
||||||
|
}
|
||||||
|
return inuse
|
||||||
|
}
|
Loading…
Reference in New Issue
Block a user