Migrate RuntimeClass support to the generated typed client

This commit is contained in:
Tim Allclair 2019-01-23 14:26:02 -08:00
parent c21f60f862
commit aab3523e0e
10 changed files with 86 additions and 114 deletions

View File

@ -123,7 +123,6 @@ go_library(
"//staging/src/k8s.io/apiserver/pkg/server/healthz:go_default_library",
"//staging/src/k8s.io/apiserver/pkg/util/feature:go_default_library",
"//staging/src/k8s.io/apiserver/pkg/util/flag:go_default_library",
"//staging/src/k8s.io/client-go/dynamic:go_default_library",
"//staging/src/k8s.io/client-go/kubernetes:go_default_library",
"//staging/src/k8s.io/client-go/kubernetes/typed/authentication/v1beta1:go_default_library",
"//staging/src/k8s.io/client-go/kubernetes/typed/authorization/v1beta1:go_default_library",
@ -137,6 +136,7 @@ go_library(
"//staging/src/k8s.io/cloud-provider:go_default_library",
"//staging/src/k8s.io/csi-api/pkg/client/clientset/versioned:go_default_library",
"//staging/src/k8s.io/kubelet/config/v1beta1:go_default_library",
"//staging/src/k8s.io/node-api/pkg/client/clientset/versioned:go_default_library",
"//vendor/github.com/coreos/go-systemd/daemon:go_default_library",
"//vendor/github.com/spf13/cobra:go_default_library",
"//vendor/github.com/spf13/pflag:go_default_library",

View File

@ -48,7 +48,6 @@ import (
"k8s.io/apiserver/pkg/server/healthz"
utilfeature "k8s.io/apiserver/pkg/util/feature"
"k8s.io/apiserver/pkg/util/flag"
"k8s.io/client-go/dynamic"
clientset "k8s.io/client-go/kubernetes"
certificatesclient "k8s.io/client-go/kubernetes/typed/certificates/v1beta1"
v1core "k8s.io/client-go/kubernetes/typed/core/v1"
@ -96,6 +95,7 @@ import (
"k8s.io/kubernetes/pkg/util/rlimit"
"k8s.io/kubernetes/pkg/version"
"k8s.io/kubernetes/pkg/version/verflag"
nodeapiclientset "k8s.io/node-api/pkg/client/clientset/versioned"
"k8s.io/utils/exec"
)
@ -542,12 +542,11 @@ func run(s *options.KubeletServer, kubeDeps *kubelet.Dependencies, stopCh <-chan
switch {
case standaloneMode:
kubeDeps.KubeClient = nil
kubeDeps.DynamicKubeClient = nil
kubeDeps.EventClient = nil
kubeDeps.HeartbeatClient = nil
klog.Warningf("standalone mode, no API client")
case kubeDeps.KubeClient == nil, kubeDeps.EventClient == nil, kubeDeps.HeartbeatClient == nil, kubeDeps.DynamicKubeClient == nil:
case kubeDeps.KubeClient == nil, kubeDeps.EventClient == nil, kubeDeps.HeartbeatClient == nil:
clientConfig, closeAllConns, err := buildKubeletClientConfig(s, nodeName)
if err != nil {
return err
@ -559,11 +558,6 @@ func run(s *options.KubeletServer, kubeDeps *kubelet.Dependencies, stopCh <-chan
return fmt.Errorf("failed to initialize kubelet client: %v", err)
}
kubeDeps.DynamicKubeClient, err = dynamic.NewForConfig(clientConfig)
if err != nil {
return fmt.Errorf("failed to initialize kubelet dynamic client: %v", err)
}
// make a separate client for events
eventClientConfig := *clientConfig
eventClientConfig.QPS = float32(s.EventRecordQPS)
@ -590,12 +584,17 @@ func run(s *options.KubeletServer, kubeDeps *kubelet.Dependencies, stopCh <-chan
}
// CRDs are JSON only, and client renegotiation for streaming is not correct as per #67803
csiClientConfig := restclient.CopyConfig(clientConfig)
csiClientConfig.ContentType = "application/json"
kubeDeps.CSIClient, err = csiclientset.NewForConfig(csiClientConfig)
crdClientConfig := restclient.CopyConfig(clientConfig)
crdClientConfig.ContentType = "application/json"
kubeDeps.CSIClient, err = csiclientset.NewForConfig(crdClientConfig)
if err != nil {
return fmt.Errorf("failed to initialize kubelet storage client: %v", err)
}
kubeDeps.NodeAPIClient, err = nodeapiclientset.NewForConfig(crdClientConfig)
if err != nil {
return fmt.Errorf("failed to initialize kubelet node-api client: %v", err)
}
}
// If the kubelet config controller is available, and dynamic config is enabled, start the config and status sync loops

View File

@ -132,7 +132,6 @@ go_library(
"//staging/src/k8s.io/apimachinery/pkg/util/validation:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/util/wait:go_default_library",
"//staging/src/k8s.io/apiserver/pkg/util/feature:go_default_library",
"//staging/src/k8s.io/client-go/dynamic:go_default_library",
"//staging/src/k8s.io/client-go/kubernetes:go_default_library",
"//staging/src/k8s.io/client-go/kubernetes/typed/core/v1:go_default_library",
"//staging/src/k8s.io/client-go/listers/core/v1:go_default_library",
@ -143,6 +142,7 @@ go_library(
"//staging/src/k8s.io/client-go/util/integer:go_default_library",
"//staging/src/k8s.io/cloud-provider:go_default_library",
"//staging/src/k8s.io/csi-api/pkg/client/clientset/versioned:go_default_library",
"//staging/src/k8s.io/node-api/pkg/client/clientset/versioned:go_default_library",
"//third_party/forked/golang/expansion:go_default_library",
"//vendor/github.com/golang/groupcache/lru:go_default_library",
"//vendor/github.com/google/cadvisor/events:go_default_library",

View File

@ -43,7 +43,6 @@ import (
"k8s.io/apimachinery/pkg/util/sets"
"k8s.io/apimachinery/pkg/util/wait"
utilfeature "k8s.io/apiserver/pkg/util/feature"
"k8s.io/client-go/dynamic"
clientset "k8s.io/client-go/kubernetes"
v1core "k8s.io/client-go/kubernetes/typed/core/v1"
corelisters "k8s.io/client-go/listers/core/v1"
@ -115,6 +114,7 @@ import (
"k8s.io/kubernetes/pkg/util/oom"
"k8s.io/kubernetes/pkg/volume"
"k8s.io/kubernetes/pkg/volume/csi"
nodeapiclientset "k8s.io/node-api/pkg/client/clientset/versioned"
utilexec "k8s.io/utils/exec"
)
@ -249,7 +249,7 @@ type Dependencies struct {
OnHeartbeatFailure func()
KubeClient clientset.Interface
CSIClient csiclientset.Interface
DynamicKubeClient dynamic.Interface
NodeAPIClient nodeapiclientset.Interface
Mounter mount.Interface
OOMAdjuster *oom.OOMAdjuster
OSInterface kubecontainer.OSInterface
@ -658,8 +658,8 @@ func NewMainKubelet(kubeCfg *kubeletconfiginternal.KubeletConfiguration,
}
klet.runtimeService = runtimeService
if utilfeature.DefaultFeatureGate.Enabled(features.RuntimeClass) && kubeDeps.DynamicKubeClient != nil {
klet.runtimeClassManager = runtimeclass.NewManager(kubeDeps.DynamicKubeClient)
if utilfeature.DefaultFeatureGate.Enabled(features.RuntimeClass) && kubeDeps.NodeAPIClient != nil {
klet.runtimeClassManager = runtimeclass.NewManager(kubeDeps.NodeAPIClient)
}
runtime, err := kuberuntime.NewKubeGenericRuntimeManager(
@ -1426,7 +1426,7 @@ func (kl *Kubelet) Run(updates <-chan kubetypes.PodUpdate) {
// Start syncing RuntimeClasses if enabled.
if kl.runtimeClassManager != nil {
go kl.runtimeClassManager.Run(wait.NeverStop)
kl.runtimeClassManager.Start(wait.NeverStop)
}
// Start the pod lifecycle event generator.

View File

@ -61,8 +61,8 @@ func TestCreatePodSandbox(t *testing.T) {
func TestCreatePodSandbox_RuntimeClass(t *testing.T) {
defer utilfeaturetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.RuntimeClass, true)()
rcm := runtimeclass.NewManager(rctest.NewPopulatedDynamicClient())
defer rctest.StartManagerSync(t, rcm)()
rcm := runtimeclass.NewManager(rctest.NewPopulatedClient())
defer rctest.StartManagerSync(rcm)()
fakeRuntime, _, m, err := createTestRuntimeManager()
require.NoError(t, err)

View File

@ -7,12 +7,10 @@ go_library(
visibility = ["//visibility:public"],
deps = [
"//staging/src/k8s.io/apimachinery/pkg/api/errors:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/apis/meta/v1/unstructured:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/runtime:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/runtime/schema:go_default_library",
"//staging/src/k8s.io/client-go/dynamic:go_default_library",
"//staging/src/k8s.io/client-go/tools/cache:go_default_library",
"//staging/src/k8s.io/node-api/pkg/client/clientset/versioned:go_default_library",
"//staging/src/k8s.io/node-api/pkg/client/informers/externalversions:go_default_library",
"//staging/src/k8s.io/node-api/pkg/client/listers/node/v1alpha1:go_default_library",
],
)

View File

@ -20,12 +20,10 @@ import (
"fmt"
"k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/client-go/dynamic"
"k8s.io/client-go/tools/cache"
nodeapiclient "k8s.io/node-api/pkg/client/clientset/versioned"
nodeapiinformer "k8s.io/node-api/pkg/client/informers/externalversions"
nodev1alpha1 "k8s.io/node-api/pkg/client/listers/node/v1alpha1"
)
var (
@ -38,33 +36,36 @@ var (
// Manager caches RuntimeClass API objects, and provides accessors to the Kubelet.
type Manager struct {
informer cache.SharedInformer
informerFactory nodeapiinformer.SharedInformerFactory
lister nodev1alpha1.RuntimeClassLister
}
// NewManager returns a new RuntimeClass Manager. Run must be called before the manager can be used.
func NewManager(client dynamic.Interface) *Manager {
rc := client.Resource(runtimeClassGVR)
lw := &cache.ListWatch{
ListFunc: func(options metav1.ListOptions) (runtime.Object, error) {
return rc.List(options)
},
WatchFunc: rc.Watch,
}
informer := cache.NewSharedInformer(lw, &unstructured.Unstructured{}, 0)
func NewManager(client nodeapiclient.Interface) *Manager {
const resyncPeriod = 0
factory := nodeapiinformer.NewSharedInformerFactory(client, resyncPeriod)
lister := factory.Node().V1alpha1().RuntimeClasses().Lister()
return &Manager{
informer: informer,
informerFactory: factory,
lister: lister,
}
}
// Run starts syncing the RuntimeClass cache with the apiserver.
func (m *Manager) Run(stopCh <-chan struct{}) {
m.informer.Run(stopCh)
// Start starts syncing the RuntimeClass cache with the apiserver.
func (m *Manager) Start(stopCh <-chan struct{}) {
m.informerFactory.Start(stopCh)
}
// WaitForCacheSync exposes the WaitForCacheSync method on the informer factory for testing
// purposes.
func (m *Manager) WaitForCacheSync(stopCh <-chan struct{}) {
m.informerFactory.WaitForCacheSync(stopCh)
}
// LookupRuntimeHandler returns the RuntimeHandler string associated with the given RuntimeClass
// name (or the default of "" for nil). If the RuntimeClass is not found, it returns an
// apierrors.NotFound error.
// errors.NotFound error.
func (m *Manager) LookupRuntimeHandler(runtimeClassName *string) (string, error) {
if runtimeClassName == nil || *runtimeClassName == "" {
// The default RuntimeClass always resolves to the empty runtime handler.
@ -72,26 +73,18 @@ func (m *Manager) LookupRuntimeHandler(runtimeClassName *string) (string, error)
}
name := *runtimeClassName
item, exists, err := m.informer.GetStore().GetByKey(name)
rc, err := m.lister.Get(name)
if err != nil {
if errors.IsNotFound(err) {
return "", err
}
return "", fmt.Errorf("Failed to lookup RuntimeClass %s: %v", name, err)
}
if !exists {
return "", errors.NewNotFound(schema.GroupResource{
Group: runtimeClassGVR.Group,
Resource: runtimeClassGVR.Resource,
}, name)
}
rc, ok := item.(*unstructured.Unstructured)
if !ok {
return "", fmt.Errorf("unexpected RuntimeClass type %T", item)
handler := rc.Spec.RuntimeHandler
if handler == nil {
return "", nil
}
handler, _, err := unstructured.NestedString(rc.Object, "spec", "runtimeHandler")
if err != nil {
return "", fmt.Errorf("Invalid RuntimeClass object: %v", err)
}
return handler, nil
return *handler, nil
}

View File

@ -21,6 +21,7 @@ import (
"testing"
"github.com/stretchr/testify/assert"
"k8s.io/kubernetes/pkg/kubelet/runtimeclass"
rctest "k8s.io/kubernetes/pkg/kubelet/runtimeclass/testing"
"k8s.io/utils/pointer"
@ -36,12 +37,11 @@ func TestLookupRuntimeHandler(t *testing.T) {
{rcn: pointer.StringPtr(""), expected: ""},
{rcn: pointer.StringPtr(rctest.EmptyRuntimeClass), expected: ""},
{rcn: pointer.StringPtr(rctest.SandboxRuntimeClass), expected: "kata-containers"},
{rcn: pointer.StringPtr(rctest.InvalidRuntimeClass), expectError: true},
{rcn: pointer.StringPtr("phantom"), expectError: true},
}
manager := runtimeclass.NewManager(rctest.NewPopulatedDynamicClient())
defer rctest.StartManagerSync(t, manager)()
manager := runtimeclass.NewManager(rctest.NewPopulatedClient())
defer rctest.StartManagerSync(manager)()
for _, test := range tests {
tname := "nil"

View File

@ -7,14 +7,11 @@ go_library(
visibility = ["//visibility:public"],
deps = [
"//pkg/kubelet/runtimeclass:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/api/errors:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/apis/meta/v1/unstructured:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/runtime:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/util/wait:go_default_library",
"//staging/src/k8s.io/client-go/dynamic:go_default_library",
"//staging/src/k8s.io/client-go/dynamic/fake:go_default_library",
"//vendor/github.com/stretchr/testify/require:go_default_library",
"//vendor/k8s.io/utils/pointer:go_default_library",
"//staging/src/k8s.io/node-api/pkg/apis/node/v1alpha1:go_default_library",
"//staging/src/k8s.io/node-api/pkg/client/clientset/versioned:go_default_library",
"//staging/src/k8s.io/node-api/pkg/client/clientset/versioned/fake:go_default_library",
],
)

View File

@ -17,18 +17,12 @@ limitations under the License.
package testing
import (
"testing"
"time"
"github.com/stretchr/testify/require"
"k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/client-go/dynamic"
fakedynamic "k8s.io/client-go/dynamic/fake"
"k8s.io/kubernetes/pkg/kubelet/runtimeclass"
"k8s.io/utils/pointer"
nodev1alpha1 "k8s.io/node-api/pkg/apis/node/v1alpha1"
nodeapiclient "k8s.io/node-api/pkg/client/clientset/versioned"
nodeapifake "k8s.io/node-api/pkg/client/clientset/versioned/fake"
)
const (
@ -39,51 +33,42 @@ const (
// EmptyRuntimeClass is a valid RuntimeClass without a handler pre-populated in the populated dynamic client.
EmptyRuntimeClass = "native"
// InvalidRuntimeClass is an invalid RuntimeClass pre-populated in the populated dynamic client.
InvalidRuntimeClass = "foo"
)
// NewPopulatedDynamicClient creates a dynamic client for use with the runtimeclass.Manager,
// NewPopulatedClient creates a node-api client for use with the runtimeclass.Manager,
// and populates it with a few test RuntimeClass objects.
func NewPopulatedDynamicClient() dynamic.Interface {
invalidRC := NewUnstructuredRuntimeClass(InvalidRuntimeClass, "")
invalidRC.Object["spec"].(map[string]interface{})["runtimeHandler"] = true
client := fakedynamic.NewSimpleDynamicClient(runtime.NewScheme(),
NewUnstructuredRuntimeClass(EmptyRuntimeClass, ""),
NewUnstructuredRuntimeClass(SandboxRuntimeClass, SandboxRuntimeHandler),
invalidRC,
func NewPopulatedClient() nodeapiclient.Interface {
return nodeapifake.NewSimpleClientset(
NewRuntimeClass(EmptyRuntimeClass, ""),
NewRuntimeClass(SandboxRuntimeClass, SandboxRuntimeHandler),
)
return client
}
// StartManagerSync runs the manager, and waits for startup by polling for the expected "native"
// RuntimeClass to be populated. Returns a function to stop the manager, which should be called with
// a defer:
// StartManagerSync starts the manager, and waits for the informer cache to sync.
// Returns a function to stop the manager, which should be called with a defer:
// defer StartManagerSync(t, m)()
// Any errors are considered fatal to the test.
func StartManagerSync(t *testing.T, m *runtimeclass.Manager) func() {
func StartManagerSync(m *runtimeclass.Manager) func() {
stopCh := make(chan struct{})
go m.Run(stopCh)
// Wait for informer to populate.
err := wait.PollImmediate(100*time.Millisecond, wait.ForeverTestTimeout, func() (bool, error) {
_, err := m.LookupRuntimeHandler(pointer.StringPtr(EmptyRuntimeClass))
if err != nil {
if errors.IsNotFound(err) {
return false, nil
}
return false, err
}
return true, nil
})
require.NoError(t, err, "Failed to start manager")
m.Start(stopCh)
m.WaitForCacheSync(stopCh)
return func() {
close(stopCh)
}
}
// NewRuntimeClass is a helper to generate a RuntimeClass resource with
// the given name & handler.
func NewRuntimeClass(name, handler string) *nodev1alpha1.RuntimeClass {
return &nodev1alpha1.RuntimeClass{
ObjectMeta: metav1.ObjectMeta{
Name: name,
},
Spec: nodev1alpha1.RuntimeClassSpec{
RuntimeHandler: &handler,
},
}
}
// NewUnstructuredRuntimeClass is a helper to generate an unstructured RuntimeClass resource with
// the given name & handler.
func NewUnstructuredRuntimeClass(name, handler string) *unstructured.Unstructured {