change kubelte nodeInfo to nodeLister

This commit is contained in:
Alex Wang 2019-10-24 22:46:59 +08:00
parent d5cdc097d9
commit b1a3fb4988
6 changed files with 31 additions and 43 deletions

View File

@ -95,7 +95,6 @@ go_library(
"//pkg/kubelet/util/queue:go_default_library", "//pkg/kubelet/util/queue:go_default_library",
"//pkg/kubelet/util/sliceutils:go_default_library", "//pkg/kubelet/util/sliceutils:go_default_library",
"//pkg/kubelet/volumemanager:go_default_library", "//pkg/kubelet/volumemanager:go_default_library",
"//pkg/scheduler/algorithm/predicates:go_default_library",
"//pkg/scheduler/api:go_default_library", "//pkg/scheduler/api:go_default_library",
"//pkg/security/apparmor:go_default_library", "//pkg/security/apparmor:go_default_library",
"//pkg/security/podsecuritypolicy/sysctl:go_default_library", "//pkg/security/podsecuritypolicy/sysctl:go_default_library",
@ -240,6 +239,7 @@ go_test(
"//staging/src/k8s.io/apiserver/pkg/util/feature:go_default_library", "//staging/src/k8s.io/apiserver/pkg/util/feature:go_default_library",
"//staging/src/k8s.io/client-go/kubernetes:go_default_library", "//staging/src/k8s.io/client-go/kubernetes:go_default_library",
"//staging/src/k8s.io/client-go/kubernetes/fake:go_default_library", "//staging/src/k8s.io/client-go/kubernetes/fake:go_default_library",
"//staging/src/k8s.io/client-go/listers/core/v1:go_default_library",
"//staging/src/k8s.io/client-go/rest:go_default_library", "//staging/src/k8s.io/client-go/rest:go_default_library",
"//staging/src/k8s.io/client-go/testing:go_default_library", "//staging/src/k8s.io/client-go/testing:go_default_library",
"//staging/src/k8s.io/client-go/tools/record:go_default_library", "//staging/src/k8s.io/client-go/tools/record:go_default_library",

View File

@ -34,7 +34,6 @@ import (
cadvisorapi "github.com/google/cadvisor/info/v1" cadvisorapi "github.com/google/cadvisor/info/v1"
v1 "k8s.io/api/core/v1" v1 "k8s.io/api/core/v1"
apierrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/fields" "k8s.io/apimachinery/pkg/fields"
"k8s.io/apimachinery/pkg/labels" "k8s.io/apimachinery/pkg/labels"
@ -106,7 +105,6 @@ import (
"k8s.io/kubernetes/pkg/kubelet/util/queue" "k8s.io/kubernetes/pkg/kubelet/util/queue"
"k8s.io/kubernetes/pkg/kubelet/util/sliceutils" "k8s.io/kubernetes/pkg/kubelet/util/sliceutils"
"k8s.io/kubernetes/pkg/kubelet/volumemanager" "k8s.io/kubernetes/pkg/kubelet/volumemanager"
"k8s.io/kubernetes/pkg/scheduler/algorithm/predicates"
"k8s.io/kubernetes/pkg/security/apparmor" "k8s.io/kubernetes/pkg/security/apparmor"
sysctlwhitelist "k8s.io/kubernetes/pkg/security/podsecuritypolicy/sysctl" sysctlwhitelist "k8s.io/kubernetes/pkg/security/podsecuritypolicy/sysctl"
utilipt "k8s.io/kubernetes/pkg/util/iptables" utilipt "k8s.io/kubernetes/pkg/util/iptables"
@ -458,7 +456,7 @@ func NewMainKubelet(kubeCfg *kubeletconfiginternal.KubeletConfiguration,
r := cache.NewReflector(nodeLW, &v1.Node{}, nodeIndexer, 0) r := cache.NewReflector(nodeLW, &v1.Node{}, nodeIndexer, 0)
go r.Run(wait.NeverStop) go r.Run(wait.NeverStop)
} }
nodeInfo := &CachedNodeInfo{NodeLister: corelisters.NewNodeLister(nodeIndexer)} nodeLister := corelisters.NewNodeLister(nodeIndexer)
// TODO: get the real node object of ourself, // TODO: get the real node object of ourself,
// and use the real node name and UID. // and use the real node name and UID.
@ -506,7 +504,7 @@ func NewMainKubelet(kubeCfg *kubeletconfiginternal.KubeletConfiguration,
registerSchedulable: registerSchedulable, registerSchedulable: registerSchedulable,
dnsConfigurer: dns.NewConfigurer(kubeDeps.Recorder, nodeRef, parsedNodeIP, clusterDNS, kubeCfg.ClusterDomain, kubeCfg.ResolverConfig), dnsConfigurer: dns.NewConfigurer(kubeDeps.Recorder, nodeRef, parsedNodeIP, clusterDNS, kubeCfg.ClusterDomain, kubeCfg.ResolverConfig),
serviceLister: serviceLister, serviceLister: serviceLister,
nodeInfo: nodeInfo, nodeLister: nodeLister,
masterServiceNamespace: masterServiceNamespace, masterServiceNamespace: masterServiceNamespace,
streamingConnectionIdleTimeout: kubeCfg.StreamingConnectionIdleTimeout.Duration, streamingConnectionIdleTimeout: kubeCfg.StreamingConnectionIdleTimeout.Duration,
recorder: kubeDeps.Recorder, recorder: kubeDeps.Recorder,
@ -959,8 +957,8 @@ type Kubelet struct {
masterServiceNamespace string masterServiceNamespace string
// serviceLister knows how to list services // serviceLister knows how to list services
serviceLister serviceLister serviceLister serviceLister
// nodeInfo knows how to get information about the node for this kubelet. // nodeLister knows how to list nodes
nodeInfo predicates.NodeInfo nodeLister corelisters.NodeLister
// a list of node labels to register // a list of node labels to register
nodeLabels map[string]string nodeLabels map[string]string
@ -2306,23 +2304,3 @@ func getStreamingConfig(kubeCfg *kubeletconfiginternal.KubeletConfiguration, kub
} }
return config return config
} }
// CachedNodeInfo implements NodeInfo
type CachedNodeInfo struct {
corelisters.NodeLister
}
// GetNodeInfo returns cached data for the node name.
func (c *CachedNodeInfo) GetNodeInfo(nodeName string) (*v1.Node, error) {
node, err := c.Get(nodeName)
if apierrors.IsNotFound(err) {
return nil, err
}
if err != nil {
return nil, fmt.Errorf("error retrieving node '%v' from cache: %v", nodeName, err)
}
return node, nil
}

View File

@ -227,7 +227,7 @@ func (kl *Kubelet) GetNode() (*v1.Node, error) {
if kl.kubeClient == nil { if kl.kubeClient == nil {
return kl.initialNode() return kl.initialNode()
} }
return kl.nodeInfo.GetNodeInfo(string(kl.nodeName)) return kl.nodeLister.Get(string(kl.nodeName))
} }
// getNodeAnyWay() must return a *v1.Node which is required by RunGeneralPredicates(). // getNodeAnyWay() must return a *v1.Node which is required by RunGeneralPredicates().
@ -237,7 +237,7 @@ func (kl *Kubelet) GetNode() (*v1.Node, error) {
// zero capacity, and the default labels. // zero capacity, and the default labels.
func (kl *Kubelet) getNodeAnyWay() (*v1.Node, error) { func (kl *Kubelet) getNodeAnyWay() (*v1.Node, error) {
if kl.kubeClient != nil { if kl.kubeClient != nil {
if n, err := kl.nodeInfo.GetNodeInfo(string(kl.nodeName)); err == nil { if n, err := kl.nodeLister.Get(string(kl.nodeName)); err == nil {
return n, nil return n, nil
} }
} }

View File

@ -30,7 +30,7 @@ import (
func TestPodResourceLimitsDefaulting(t *testing.T) { func TestPodResourceLimitsDefaulting(t *testing.T) {
tk := newTestKubelet(t, true) tk := newTestKubelet(t, true)
defer tk.Cleanup() defer tk.Cleanup()
tk.kubelet.nodeInfo = &testNodeInfo{ tk.kubelet.nodeLister = &testNodeLister{
nodes: []*v1.Node{ nodes: []*v1.Node{
{ {
ObjectMeta: metav1.ObjectMeta{ ObjectMeta: metav1.ObjectMeta{

View File

@ -30,12 +30,14 @@ import (
v1 "k8s.io/api/core/v1" v1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/resource" "k8s.io/apimachinery/pkg/api/resource"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/types" "k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/clock" "k8s.io/apimachinery/pkg/util/clock"
utilruntime "k8s.io/apimachinery/pkg/util/runtime" utilruntime "k8s.io/apimachinery/pkg/util/runtime"
"k8s.io/apimachinery/pkg/util/sets" "k8s.io/apimachinery/pkg/util/sets"
"k8s.io/apimachinery/pkg/util/wait" "k8s.io/apimachinery/pkg/util/wait"
"k8s.io/client-go/kubernetes/fake" "k8s.io/client-go/kubernetes/fake"
corelisters "k8s.io/client-go/listers/core/v1"
"k8s.io/client-go/tools/record" "k8s.io/client-go/tools/record"
"k8s.io/client-go/util/flowcontrol" "k8s.io/client-go/util/flowcontrol"
cadvisortest "k8s.io/kubernetes/pkg/kubelet/cadvisor/testing" cadvisortest "k8s.io/kubernetes/pkg/kubelet/cadvisor/testing"
@ -178,7 +180,7 @@ func newTestKubeletWithImageList(
kubelet.sourcesReady = config.NewSourcesReady(func(_ sets.String) bool { return true }) kubelet.sourcesReady = config.NewSourcesReady(func(_ sets.String) bool { return true })
kubelet.masterServiceNamespace = metav1.NamespaceDefault kubelet.masterServiceNamespace = metav1.NamespaceDefault
kubelet.serviceLister = testServiceLister{} kubelet.serviceLister = testServiceLister{}
kubelet.nodeInfo = testNodeInfo{ kubelet.nodeLister = testNodeLister{
nodes: []*v1.Node{ nodes: []*v1.Node{
{ {
ObjectMeta: metav1.ObjectMeta{ ObjectMeta: metav1.ObjectMeta{
@ -426,17 +428,25 @@ func TestSyncPodsDeletesWhenSourcesAreReady(t *testing.T) {
fakeRuntime.AssertKilledPods([]string{"12345678"}) fakeRuntime.AssertKilledPods([]string{"12345678"})
} }
type testNodeInfo struct { type testNodeLister struct {
nodes []*v1.Node nodes []*v1.Node
} }
func (ls testNodeInfo) GetNodeInfo(id string) (*v1.Node, error) { func (nl testNodeLister) Get(name string) (*v1.Node, error) {
for _, node := range ls.nodes { for _, node := range nl.nodes {
if node.Name == id { if node.Name == name {
return node, nil return node, nil
} }
} }
return nil, fmt.Errorf("Node with name: %s does not exist", id) return nil, fmt.Errorf("Node with name: %s does not exist", name)
}
func (nl testNodeLister) List(_ labels.Selector) (ret []*v1.Node, err error) {
return nl.nodes, nil
}
func (nl testNodeLister) ListWithPredicate(_ corelisters.NodeConditionPredicate) ([]*v1.Node, error) {
return nl.nodes, nil
} }
func checkPodStatus(t *testing.T, kl *Kubelet, pod *v1.Pod, phase v1.PodPhase) { func checkPodStatus(t *testing.T, kl *Kubelet, pod *v1.Pod, phase v1.PodPhase) {
@ -451,7 +461,7 @@ func TestHandlePortConflicts(t *testing.T) {
defer testKubelet.Cleanup() defer testKubelet.Cleanup()
kl := testKubelet.kubelet kl := testKubelet.kubelet
kl.nodeInfo = testNodeInfo{nodes: []*v1.Node{ kl.nodeLister = testNodeLister{nodes: []*v1.Node{
{ {
ObjectMeta: metav1.ObjectMeta{Name: string(kl.nodeName)}, ObjectMeta: metav1.ObjectMeta{Name: string(kl.nodeName)},
Status: v1.NodeStatus{ Status: v1.NodeStatus{
@ -497,7 +507,7 @@ func TestHandleHostNameConflicts(t *testing.T) {
defer testKubelet.Cleanup() defer testKubelet.Cleanup()
kl := testKubelet.kubelet kl := testKubelet.kubelet
kl.nodeInfo = testNodeInfo{nodes: []*v1.Node{ kl.nodeLister = testNodeLister{nodes: []*v1.Node{
{ {
ObjectMeta: metav1.ObjectMeta{Name: "127.0.0.1"}, ObjectMeta: metav1.ObjectMeta{Name: "127.0.0.1"},
Status: v1.NodeStatus{ Status: v1.NodeStatus{
@ -549,7 +559,7 @@ func TestHandleNodeSelector(t *testing.T) {
}, },
}, },
} }
kl.nodeInfo = testNodeInfo{nodes: nodes} kl.nodeLister = testNodeLister{nodes: nodes}
recorder := record.NewFakeRecorder(20) recorder := record.NewFakeRecorder(20)
nodeRef := &v1.ObjectReference{ nodeRef := &v1.ObjectReference{
@ -589,7 +599,7 @@ func TestHandleMemExceeded(t *testing.T) {
v1.ResourcePods: *resource.NewQuantity(40, resource.DecimalSI), v1.ResourcePods: *resource.NewQuantity(40, resource.DecimalSI),
}}}, }}},
} }
kl.nodeInfo = testNodeInfo{nodes: nodes} kl.nodeLister = testNodeLister{nodes: nodes}
recorder := record.NewFakeRecorder(20) recorder := record.NewFakeRecorder(20)
nodeRef := &v1.ObjectReference{ nodeRef := &v1.ObjectReference{
@ -650,7 +660,7 @@ func TestHandlePluginResources(t *testing.T) {
v1.ResourcePods: allowedPodQuantity, v1.ResourcePods: allowedPodQuantity,
}}}, }}},
} }
kl.nodeInfo = testNodeInfo{nodes: nodes} kl.nodeLister = testNodeLister{nodes: nodes}
updatePluginResourcesFunc := func(node *schedulernodeinfo.NodeInfo, attrs *lifecycle.PodAdmitAttributes) error { updatePluginResourcesFunc := func(node *schedulernodeinfo.NodeInfo, attrs *lifecycle.PodAdmitAttributes) error {
// Maps from resourceName to the value we use to set node.allocatableResource[resourceName]. // Maps from resourceName to the value we use to set node.allocatableResource[resourceName].
@ -1801,7 +1811,7 @@ func TestHandlePodAdditionsInvokesPodAdmitHandlers(t *testing.T) {
testKubelet := newTestKubelet(t, false /* controllerAttachDetachEnabled */) testKubelet := newTestKubelet(t, false /* controllerAttachDetachEnabled */)
defer testKubelet.Cleanup() defer testKubelet.Cleanup()
kl := testKubelet.kubelet kl := testKubelet.kubelet
kl.nodeInfo = testNodeInfo{nodes: []*v1.Node{ kl.nodeLister = testNodeLister{nodes: []*v1.Node{
{ {
ObjectMeta: metav1.ObjectMeta{Name: string(kl.nodeName)}, ObjectMeta: metav1.ObjectMeta{Name: string(kl.nodeName)},
Status: v1.NodeStatus{ Status: v1.NodeStatus{

View File

@ -75,7 +75,7 @@ func TestRunOnce(t *testing.T) {
rootDirectory: basePath, rootDirectory: basePath,
recorder: &record.FakeRecorder{}, recorder: &record.FakeRecorder{},
cadvisor: cadvisor, cadvisor: cadvisor,
nodeInfo: testNodeInfo{}, nodeLister: testNodeLister{},
statusManager: status.NewManager(nil, podManager, &statustest.FakePodDeletionSafetyProvider{}), statusManager: status.NewManager(nil, podManager, &statustest.FakePodDeletionSafetyProvider{}),
podManager: podManager, podManager: podManager,
os: &containertest.FakeOS{}, os: &containertest.FakeOS{},