From 62f874b19b579f4cda25e97da6aaffa74d6c6e5e Mon Sep 17 00:00:00 2001 From: Michelle Au Date: Wed, 30 Jan 2019 17:33:51 -0800 Subject: [PATCH 1/2] Mark volume as in use even when node status didn't change --- pkg/kubelet/kubelet_node_status.go | 17 +++++++++++++++++ 1 file changed, 17 insertions(+) diff --git a/pkg/kubelet/kubelet_node_status.go b/pkg/kubelet/kubelet_node_status.go index bc86e4d0060..ab276ad0e5d 100644 --- a/pkg/kubelet/kubelet_node_status.go +++ b/pkg/kubelet/kubelet_node_status.go @@ -427,6 +427,23 @@ func (kl *Kubelet) tryUpdateNodeStatus(tryNumber int) error { now := kl.clock.Now() if utilfeature.DefaultFeatureGate.Enabled(features.NodeLease) && now.Before(kl.lastStatusReportTime.Add(kl.nodeStatusReportFrequency)) { if !podCIDRChanged && !nodeStatusHasChanged(&originalNode.Status, &node.Status) { + // We must mark the volumes as ReportedInUse in volume manager's dsw even + // if no changes were made to the node status (no volumes were added or removed + // from the VolumesInUse list). + // + // The reason is that on a kubelet restart, the volume manager's dsw is + // repopulated and the volume ReportedInUse is initialized to false, while the + // VolumesInUse list from the Node object still contains the state from the + // previous kubelet instantiation. + // + // Once the volumes are added to the dsw, the ReportedInUse field needs to be + // synced from the VolumesInUse list in the Node.Status. + // + // The MarkVolumesAsReportedInUse() call cannot be performed in dsw directly + // because it does not have access to the Node object. + // This also cannot be populated on node status manager init because the volume + // may not have been added to dsw at that time. + kl.volumeManager.MarkVolumesAsReportedInUse(node.Status.VolumesInUse) return nil } } From 80a2698a02ae5475f7dc5ef889c2724d2a6a0406 Mon Sep 17 00:00:00 2001 From: Michelle Au Date: Fri, 1 Feb 2019 18:20:31 -0800 Subject: [PATCH 2/2] Add unit tests for volumesinuse during node status update --- pkg/kubelet/kubelet_node_status_test.go | 208 ++++++++++++++++++ pkg/kubelet/volumemanager/BUILD | 5 +- .../volumemanager/volume_manager_fake.go | 99 +++++++++ 3 files changed, 311 insertions(+), 1 deletion(-) create mode 100644 pkg/kubelet/volumemanager/volume_manager_fake.go diff --git a/pkg/kubelet/kubelet_node_status_test.go b/pkg/kubelet/kubelet_node_status_test.go index ceb76205aaa..312992f49ea 100644 --- a/pkg/kubelet/kubelet_node_status_test.go +++ b/pkg/kubelet/kubelet_node_status_test.go @@ -55,6 +55,7 @@ import ( kubecontainer "k8s.io/kubernetes/pkg/kubelet/container" "k8s.io/kubernetes/pkg/kubelet/nodestatus" "k8s.io/kubernetes/pkg/kubelet/util/sliceutils" + kubeletvolume "k8s.io/kubernetes/pkg/kubelet/volumemanager" schedulerapi "k8s.io/kubernetes/pkg/scheduler/api" taintutil "k8s.io/kubernetes/pkg/util/taints" "k8s.io/kubernetes/pkg/version" @@ -1029,6 +1030,213 @@ func TestUpdateNodeStatusWithLease(t *testing.T) { assert.IsType(t, core.GetActionImpl{}, actions[9]) } +func TestUpdateNodeStatusAndVolumesInUseWithoutNodeLease(t *testing.T) { + defer utilfeaturetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.NodeLease, false)() + + cases := []struct { + desc string + existingVolumes []v1.UniqueVolumeName // volumes to initially populate volumeManager + existingNode *v1.Node // existing node object + expectedNode *v1.Node // new node object after patch + expectedReportedInUse []v1.UniqueVolumeName // expected volumes reported in use in volumeManager + }{ + { + desc: "no volumes and no update", + existingNode: &v1.Node{ObjectMeta: metav1.ObjectMeta{Name: testKubeletHostname}}, + expectedNode: &v1.Node{ObjectMeta: metav1.ObjectMeta{Name: testKubeletHostname}}, + }, + { + desc: "volumes inuse on node and volumeManager", + existingVolumes: []v1.UniqueVolumeName{"vol1"}, + existingNode: &v1.Node{ + ObjectMeta: metav1.ObjectMeta{Name: testKubeletHostname}, + Status: v1.NodeStatus{ + VolumesInUse: []v1.UniqueVolumeName{"vol1"}, + }, + }, + expectedNode: &v1.Node{ + ObjectMeta: metav1.ObjectMeta{Name: testKubeletHostname}, + Status: v1.NodeStatus{ + VolumesInUse: []v1.UniqueVolumeName{"vol1"}, + }, + }, + expectedReportedInUse: []v1.UniqueVolumeName{"vol1"}, + }, + { + desc: "volumes inuse on node but not in volumeManager", + existingNode: &v1.Node{ + ObjectMeta: metav1.ObjectMeta{Name: testKubeletHostname}, + Status: v1.NodeStatus{ + VolumesInUse: []v1.UniqueVolumeName{"vol1"}, + }, + }, + expectedNode: &v1.Node{ObjectMeta: metav1.ObjectMeta{Name: testKubeletHostname}}, + }, + { + desc: "volumes inuse in volumeManager but not on node", + existingVolumes: []v1.UniqueVolumeName{"vol1"}, + existingNode: &v1.Node{ObjectMeta: metav1.ObjectMeta{Name: testKubeletHostname}}, + expectedNode: &v1.Node{ + ObjectMeta: metav1.ObjectMeta{Name: testKubeletHostname}, + Status: v1.NodeStatus{ + VolumesInUse: []v1.UniqueVolumeName{"vol1"}, + }, + }, + expectedReportedInUse: []v1.UniqueVolumeName{"vol1"}, + }, + } + + for _, tc := range cases { + t.Run(tc.desc, func(t *testing.T) { + // Setup + testKubelet := newTestKubelet(t, false /* controllerAttachDetachEnabled */) + defer testKubelet.Cleanup() + + kubelet := testKubelet.kubelet + kubelet.kubeClient = nil // ensure only the heartbeat client is used + kubelet.containerManager = &localCM{ContainerManager: cm.NewStubContainerManager()} + kubelet.lastStatusReportTime = kubelet.clock.Now() + kubelet.nodeStatusReportFrequency = time.Hour + kubelet.machineInfo = &cadvisorapi.MachineInfo{} + + // override test volumeManager + fakeVolumeManager := kubeletvolume.NewFakeVolumeManager(tc.existingVolumes) + kubelet.volumeManager = fakeVolumeManager + + // Only test VolumesInUse setter + kubelet.setNodeStatusFuncs = []func(*v1.Node) error{ + nodestatus.VolumesInUse(kubelet.volumeManager.ReconcilerStatesHasBeenSynced, + kubelet.volumeManager.GetVolumesInUse), + } + + kubeClient := testKubelet.fakeKubeClient + kubeClient.ReactionChain = fake.NewSimpleClientset(&v1.NodeList{Items: []v1.Node{*tc.existingNode}}).ReactionChain + + // Execute + assert.NoError(t, kubelet.updateNodeStatus()) + + // Validate + actions := kubeClient.Actions() + if tc.expectedNode != nil { + assert.Len(t, actions, 2) + assert.IsType(t, core.GetActionImpl{}, actions[0]) + assert.IsType(t, core.PatchActionImpl{}, actions[1]) + patchAction := actions[1].(core.PatchActionImpl) + + updatedNode, err := applyNodeStatusPatch(tc.existingNode, patchAction.GetPatch()) + require.NoError(t, err) + assert.True(t, apiequality.Semantic.DeepEqual(tc.expectedNode, updatedNode), "%s", diff.ObjectDiff(tc.expectedNode, updatedNode)) + } else { + assert.Len(t, actions, 1) + assert.IsType(t, core.GetActionImpl{}, actions[0]) + } + + reportedInUse := fakeVolumeManager.GetVolumesReportedInUse() + assert.True(t, apiequality.Semantic.DeepEqual(tc.expectedReportedInUse, reportedInUse), "%s", diff.ObjectDiff(tc.expectedReportedInUse, reportedInUse)) + }) + } +} + +func TestUpdateNodeStatusAndVolumesInUseWithNodeLease(t *testing.T) { + defer utilfeaturetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.NodeLease, true)() + + cases := []struct { + desc string + existingVolumes []v1.UniqueVolumeName // volumes to initially populate volumeManager + existingNode *v1.Node // existing node object + expectedNode *v1.Node // new node object after patch + expectedReportedInUse []v1.UniqueVolumeName // expected volumes reported in use in volumeManager + }{ + { + desc: "no volumes and no update", + existingNode: &v1.Node{ObjectMeta: metav1.ObjectMeta{Name: testKubeletHostname}}, + }, + { + desc: "volumes inuse on node and volumeManager", + existingVolumes: []v1.UniqueVolumeName{"vol1"}, + existingNode: &v1.Node{ + ObjectMeta: metav1.ObjectMeta{Name: testKubeletHostname}, + Status: v1.NodeStatus{ + VolumesInUse: []v1.UniqueVolumeName{"vol1"}, + }, + }, + expectedReportedInUse: []v1.UniqueVolumeName{"vol1"}, + }, + { + desc: "volumes inuse on node but not in volumeManager", + existingNode: &v1.Node{ + ObjectMeta: metav1.ObjectMeta{Name: testKubeletHostname}, + Status: v1.NodeStatus{ + VolumesInUse: []v1.UniqueVolumeName{"vol1"}, + }, + }, + expectedNode: &v1.Node{ObjectMeta: metav1.ObjectMeta{Name: testKubeletHostname}}, + }, + { + desc: "volumes inuse in volumeManager but not on node", + existingVolumes: []v1.UniqueVolumeName{"vol1"}, + existingNode: &v1.Node{ObjectMeta: metav1.ObjectMeta{Name: testKubeletHostname}}, + expectedNode: &v1.Node{ + ObjectMeta: metav1.ObjectMeta{Name: testKubeletHostname}, + Status: v1.NodeStatus{ + VolumesInUse: []v1.UniqueVolumeName{"vol1"}, + }, + }, + expectedReportedInUse: []v1.UniqueVolumeName{"vol1"}, + }, + } + + for _, tc := range cases { + t.Run(tc.desc, func(t *testing.T) { + // Setup + testKubelet := newTestKubelet(t, false /* controllerAttachDetachEnabled */) + defer testKubelet.Cleanup() + + kubelet := testKubelet.kubelet + kubelet.kubeClient = nil // ensure only the heartbeat client is used + kubelet.containerManager = &localCM{ContainerManager: cm.NewStubContainerManager()} + kubelet.lastStatusReportTime = kubelet.clock.Now() + kubelet.nodeStatusReportFrequency = time.Hour + kubelet.machineInfo = &cadvisorapi.MachineInfo{} + + // override test volumeManager + fakeVolumeManager := kubeletvolume.NewFakeVolumeManager(tc.existingVolumes) + kubelet.volumeManager = fakeVolumeManager + + // Only test VolumesInUse setter + kubelet.setNodeStatusFuncs = []func(*v1.Node) error{ + nodestatus.VolumesInUse(kubelet.volumeManager.ReconcilerStatesHasBeenSynced, + kubelet.volumeManager.GetVolumesInUse), + } + + kubeClient := testKubelet.fakeKubeClient + kubeClient.ReactionChain = fake.NewSimpleClientset(&v1.NodeList{Items: []v1.Node{*tc.existingNode}}).ReactionChain + + // Execute + assert.NoError(t, kubelet.updateNodeStatus()) + + // Validate + actions := kubeClient.Actions() + if tc.expectedNode != nil { + assert.Len(t, actions, 2) + assert.IsType(t, core.GetActionImpl{}, actions[0]) + assert.IsType(t, core.PatchActionImpl{}, actions[1]) + patchAction := actions[1].(core.PatchActionImpl) + + updatedNode, err := applyNodeStatusPatch(tc.existingNode, patchAction.GetPatch()) + require.NoError(t, err) + assert.True(t, apiequality.Semantic.DeepEqual(tc.expectedNode, updatedNode), "%s", diff.ObjectDiff(tc.expectedNode, updatedNode)) + } else { + assert.Len(t, actions, 1) + assert.IsType(t, core.GetActionImpl{}, actions[0]) + } + + reportedInUse := fakeVolumeManager.GetVolumesReportedInUse() + assert.True(t, apiequality.Semantic.DeepEqual(tc.expectedReportedInUse, reportedInUse), "%s", diff.ObjectDiff(tc.expectedReportedInUse, reportedInUse)) + }) + } +} + func TestRegisterWithApiServer(t *testing.T) { testKubelet := newTestKubelet(t, false /* controllerAttachDetachEnabled */) defer testKubelet.Cleanup() diff --git a/pkg/kubelet/volumemanager/BUILD b/pkg/kubelet/volumemanager/BUILD index 269eb189192..f967754b53f 100644 --- a/pkg/kubelet/volumemanager/BUILD +++ b/pkg/kubelet/volumemanager/BUILD @@ -8,7 +8,10 @@ load( go_library( name = "go_default_library", - srcs = ["volume_manager.go"], + srcs = [ + "volume_manager.go", + "volume_manager_fake.go", + ], importpath = "k8s.io/kubernetes/pkg/kubelet/volumemanager", deps = [ "//pkg/kubelet/config:go_default_library", diff --git a/pkg/kubelet/volumemanager/volume_manager_fake.go b/pkg/kubelet/volumemanager/volume_manager_fake.go new file mode 100644 index 00000000000..b21d34a14ce --- /dev/null +++ b/pkg/kubelet/volumemanager/volume_manager_fake.go @@ -0,0 +1,99 @@ +/* +Copyright 2019 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package volumemanager + +import ( + "k8s.io/api/core/v1" + "k8s.io/kubernetes/pkg/kubelet/config" + "k8s.io/kubernetes/pkg/kubelet/container" + "k8s.io/kubernetes/pkg/volume/util/types" +) + +// FakeVolumeManager is a test implementation that just tracks calls +type FakeVolumeManager struct { + volumes map[v1.UniqueVolumeName]bool + reportedInUse map[v1.UniqueVolumeName]bool +} + +// NewFakeVolumeManager creates a new VolumeManager test instance +func NewFakeVolumeManager(initialVolumes []v1.UniqueVolumeName) *FakeVolumeManager { + volumes := map[v1.UniqueVolumeName]bool{} + for _, v := range initialVolumes { + volumes[v] = true + } + return &FakeVolumeManager{ + volumes: volumes, + reportedInUse: map[v1.UniqueVolumeName]bool{}, + } +} + +// Run is not implemented +func (f *FakeVolumeManager) Run(sourcesReady config.SourcesReady, stopCh <-chan struct{}) { +} + +// WaitForAttachAndMount is not implemented +func (f *FakeVolumeManager) WaitForAttachAndMount(pod *v1.Pod) error { + return nil +} + +// GetMountedVolumesForPod is not implemented +func (f *FakeVolumeManager) GetMountedVolumesForPod(podName types.UniquePodName) container.VolumeMap { + return nil +} + +// GetExtraSupplementalGroupsForPod is not implemented +func (f *FakeVolumeManager) GetExtraSupplementalGroupsForPod(pod *v1.Pod) []int64 { + return nil +} + +// GetVolumesInUse returns a list of the initial volumes +func (f *FakeVolumeManager) GetVolumesInUse() []v1.UniqueVolumeName { + inuse := []v1.UniqueVolumeName{} + for v := range f.volumes { + inuse = append(inuse, v) + } + return inuse +} + +// ReconcilerStatesHasBeenSynced is not implemented +func (f *FakeVolumeManager) ReconcilerStatesHasBeenSynced() bool { + return true +} + +// VolumeIsAttached is not implemented +func (f *FakeVolumeManager) VolumeIsAttached(volumeName v1.UniqueVolumeName) bool { + return false +} + +// MarkVolumesAsReportedInUse adds the given volumes to the reportedInUse map +func (f *FakeVolumeManager) MarkVolumesAsReportedInUse(volumesReportedAsInUse []v1.UniqueVolumeName) { + for _, reportedVolume := range volumesReportedAsInUse { + if _, ok := f.volumes[reportedVolume]; ok { + f.reportedInUse[reportedVolume] = true + } + } +} + +// GetVolumesReportedInUse is a test function only that returns a list of volumes +// from the reportedInUse map +func (f *FakeVolumeManager) GetVolumesReportedInUse() []v1.UniqueVolumeName { + inuse := []v1.UniqueVolumeName{} + for reportedVolume := range f.reportedInUse { + inuse = append(inuse, reportedVolume) + } + return inuse +}