diff --git a/pkg/kubelet/container/testing/os.go b/pkg/kubelet/container/testing/os.go index c1454161aed..6dc59726864 100644 --- a/pkg/kubelet/container/testing/os.go +++ b/pkg/kubelet/container/testing/os.go @@ -36,13 +36,6 @@ type FakeOS struct { Files map[string][]*os.FileInfo } -func NewFakeOS() *FakeOS { - return &FakeOS{ - Removes: []string{}, - Files: make(map[string][]*os.FileInfo), - } -} - // Mkdir is a fake call that just returns nil. func (f *FakeOS) MkdirAll(path string, perm os.FileMode) error { if f.MkdirAllFn != nil { diff --git a/pkg/kubelet/cri/streaming/errors.go b/pkg/kubelet/cri/streaming/errors.go index 5cbe9d505d3..83e218ddb4f 100644 --- a/pkg/kubelet/cri/streaming/errors.go +++ b/pkg/kubelet/cri/streaming/errors.go @@ -24,11 +24,6 @@ import ( grpcstatus "google.golang.org/grpc/status" ) -// NewErrorStreamingDisabled creates an error for disabled streaming method. -func NewErrorStreamingDisabled(method string) error { - return grpcstatus.Errorf(codes.NotFound, "streaming method %s disabled", method) -} - // NewErrorTooManyInFlight creates an error for exceeding the maximum number of in-flight requests. func NewErrorTooManyInFlight() error { return grpcstatus.Error(codes.ResourceExhausted, "maximum number of in-flight requests exceeded") diff --git a/pkg/kubelet/kubeletconfig/checkpoint/store/fakestore.go b/pkg/kubelet/kubeletconfig/checkpoint/store/fakestore.go index 854294806b0..6e442cf2cc2 100644 --- a/pkg/kubelet/kubeletconfig/checkpoint/store/fakestore.go +++ b/pkg/kubelet/kubeletconfig/checkpoint/store/fakestore.go @@ -32,11 +32,6 @@ type fakeStore struct { var _ Store = (*fakeStore)(nil) -// NewFakeStore constructs a fake Store -func NewFakeStore() Store { - return &fakeStore{} -} - func (s *fakeStore) Initialize() error { return fmt.Errorf("Initialize method not supported") } diff --git a/pkg/kubelet/kubeletconfig/status/status.go b/pkg/kubelet/kubeletconfig/status/status.go index 0def8ebd630..c540b57e573 100644 --- a/pkg/kubelet/kubeletconfig/status/status.go +++ b/pkg/kubelet/kubeletconfig/status/status.go @@ -17,17 +17,8 @@ limitations under the License. package status import ( - "context" - "fmt" - "sync" - - "k8s.io/klog/v2" - apiv1 "k8s.io/api/core/v1" - metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" - "k8s.io/apimachinery/pkg/types" clientset "k8s.io/client-go/kubernetes" - nodeutil "k8s.io/component-helpers/node/util" ) const ( @@ -64,122 +55,3 @@ type NodeConfigStatus interface { // Sync patches the current status into the Node identified by `nodeName` if an update is pending Sync(client clientset.Interface, nodeName string) } - -type nodeConfigStatus struct { - // status is the core NodeConfigStatus that we report - status apiv1.NodeConfigStatus - // mux is a mutex on the nodeConfigStatus, alternate between setting and syncing the status - mux sync.Mutex - // errorOverride is sent in place of the usual error if it is non-empty - errorOverride string - // syncCh; write to this channel to indicate that the status needs to be synced to the API server - syncCh chan bool -} - -// NewNodeConfigStatus returns a new NodeConfigStatus interface -func NewNodeConfigStatus() NodeConfigStatus { - // channels must have capacity at least 1, since we signal with non-blocking writes - syncCh := make(chan bool, 1) - // prime new status managers to sync with the API server on the first call to Sync - syncCh <- true - return &nodeConfigStatus{ - syncCh: syncCh, - } -} - -// transact grabs the lock, performs the fn, records the need to sync, and releases the lock -func (s *nodeConfigStatus) transact(fn func()) { - s.mux.Lock() - defer s.mux.Unlock() - fn() - s.sync() -} - -func (s *nodeConfigStatus) SetAssigned(source *apiv1.NodeConfigSource) { - s.transact(func() { - s.status.Assigned = source - }) -} - -func (s *nodeConfigStatus) SetActive(source *apiv1.NodeConfigSource) { - s.transact(func() { - s.status.Active = source - }) -} - -func (s *nodeConfigStatus) SetLastKnownGood(source *apiv1.NodeConfigSource) { - s.transact(func() { - s.status.LastKnownGood = source - }) -} - -func (s *nodeConfigStatus) SetError(err string) { - s.transact(func() { - s.status.Error = err - }) -} - -func (s *nodeConfigStatus) SetErrorOverride(err string) { - s.transact(func() { - s.errorOverride = err - }) -} - -// sync notes that the status needs to be synced to the API server -func (s *nodeConfigStatus) sync() { - select { - case s.syncCh <- true: - default: - } -} - -// Sync attempts to sync the status with the Node object for this Kubelet, -// if syncing fails, an error is logged, and work is queued for retry. -func (s *nodeConfigStatus) Sync(client clientset.Interface, nodeName string) { - select { - case <-s.syncCh: - default: - // no work to be done, return - return - } - - klog.InfoS("Kubelet config controller updating Node.Status.Config") - - // grab the lock - s.mux.Lock() - defer s.mux.Unlock() - - // if the sync fails, we want to retry - var err error - defer func() { - if err != nil { - klog.ErrorS(err, "Kubelet config controller") - s.sync() - } - }() - - // get the Node so we can check the current status - oldNode, err := client.CoreV1().Nodes().Get(context.TODO(), nodeName, metav1.GetOptions{}) - if err != nil { - err = fmt.Errorf("could not get Node %q, will not sync status, error: %v", nodeName, err) - return - } - - status := &s.status - // override error, if necessary - if len(s.errorOverride) > 0 { - // copy the status, so we don't overwrite the prior error - // with the override - status = status.DeepCopy() - status.Error = s.errorOverride - } - - // apply the status to a copy of the node so we don't modify the object in the informer's store - newNode := oldNode.DeepCopy() - newNode.Status.Config = status - - // patch the node with the new status - if _, _, err := nodeutil.PatchNodeStatus(client.CoreV1(), types.NodeName(nodeName), oldNode, newNode); err != nil { - klog.ErrorS(err, "Kubelet config controller failed to patch node status") - } -} diff --git a/pkg/kubelet/kubeletconfig/util/codec/codec.go b/pkg/kubelet/kubeletconfig/util/codec/codec.go index 6f12c4671c5..8598c0ca2d9 100644 --- a/pkg/kubelet/kubeletconfig/util/codec/codec.go +++ b/pkg/kubelet/kubeletconfig/util/codec/codec.go @@ -28,7 +28,6 @@ import ( "k8s.io/apimachinery/pkg/runtime/schema" "k8s.io/apimachinery/pkg/runtime/serializer" "k8s.io/component-base/codec" - "k8s.io/kubernetes/pkg/api/legacyscheme" kubeletconfig "k8s.io/kubernetes/pkg/kubelet/apis/config" "k8s.io/kubernetes/pkg/kubelet/apis/config/scheme" kubeletconfigv1beta1 "k8s.io/kubernetes/pkg/kubelet/apis/config/v1beta1" @@ -62,24 +61,6 @@ func NewKubeletconfigYAMLEncoder(targetVersion schema.GroupVersion) (runtime.Enc return codecs.EncoderForVersion(info.Serializer, targetVersion), nil } -// NewYAMLEncoder generates a new runtime.Encoder that encodes objects to YAML. -func NewYAMLEncoder(groupName string) (runtime.Encoder, error) { - // encode to YAML - mediaType := "application/yaml" - info, ok := runtime.SerializerInfoForMediaType(legacyscheme.Codecs.SupportedMediaTypes(), mediaType) - if !ok { - return nil, fmt.Errorf("unsupported media type %q", mediaType) - } - - versions := legacyscheme.Scheme.PrioritizedVersionsForGroup(groupName) - if len(versions) == 0 { - return nil, fmt.Errorf("no enabled versions for group %q", groupName) - } - - // the "best" version supposedly comes first in the list returned from legacyscheme.Registry.EnabledVersionsForGroup. - return legacyscheme.Codecs.EncoderForVersion(info.Serializer, versions[0]), nil -} - // DecodeKubeletConfiguration decodes a serialized KubeletConfiguration to the internal type. func DecodeKubeletConfiguration(kubeletCodecs *serializer.CodecFactory, data []byte) (*kubeletconfig.KubeletConfiguration, error) { var ( diff --git a/pkg/kubelet/kubeletconfig/util/equal/equal.go b/pkg/kubelet/kubeletconfig/util/equal/equal.go deleted file mode 100644 index b943e019f92..00000000000 --- a/pkg/kubelet/kubeletconfig/util/equal/equal.go +++ /dev/null @@ -1,24 +0,0 @@ -/* -Copyright 2017 The Kubernetes Authors. - -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. -*/ - -package equal - -import apiv1 "k8s.io/api/core/v1" - -// KubeletConfigOkEq returns true if the two conditions are semantically equivalent in the context of dynamic config -func KubeletConfigOkEq(a, b *apiv1.NodeCondition) bool { - return a.Message == b.Message && a.Reason == b.Reason && a.Status == b.Status -} diff --git a/pkg/kubelet/kubeletconfig/util/panic/panic.go b/pkg/kubelet/kubeletconfig/util/panic/panic.go deleted file mode 100644 index f1916ebf731..00000000000 --- a/pkg/kubelet/kubeletconfig/util/panic/panic.go +++ /dev/null @@ -1,36 +0,0 @@ -/* -Copyright 2017 The Kubernetes Authors. - -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. -*/ - -package panic - -import utilruntime "k8s.io/apimachinery/pkg/util/runtime" - -// HandlePanic returns a function that wraps `fn` with the utilruntime.PanicHandlers, and continues -// to bubble the panic after the PanicHandlers are called -func HandlePanic(fn func()) func() { - return func() { - defer func() { - if r := recover(); r != nil { - for _, fn := range utilruntime.PanicHandlers { - fn(r) - } - panic(r) - } - }() - // call the function - fn() - } -} diff --git a/pkg/kubelet/pluginmanager/pluginwatcher/example_handler.go b/pkg/kubelet/pluginmanager/pluginwatcher/example_handler.go index e9a40710ea0..3c845a7bc24 100644 --- a/pkg/kubelet/pluginmanager/pluginwatcher/example_handler.go +++ b/pkg/kubelet/pluginmanager/pluginwatcher/example_handler.go @@ -30,8 +30,8 @@ import ( "k8s.io/klog/v2" registerapi "k8s.io/kubelet/pkg/apis/pluginregistration/v1" - v1beta1 "k8s.io/kubernetes/pkg/kubelet/pluginmanager/pluginwatcher/example_plugin_apis/v1beta1" - v1beta2 "k8s.io/kubernetes/pkg/kubelet/pluginmanager/pluginwatcher/example_plugin_apis/v1beta2" + "k8s.io/kubernetes/pkg/kubelet/pluginmanager/pluginwatcher/example_plugin_apis/v1beta1" + "k8s.io/kubernetes/pkg/kubelet/pluginmanager/pluginwatcher/example_plugin_apis/v1beta2" ) type exampleHandler struct { @@ -117,28 +117,11 @@ func (p *exampleHandler) DeRegisterPlugin(pluginName string) { p.SendEvent(pluginName, exampleEventDeRegister) } -func (p *exampleHandler) EventChan(pluginName string) chan examplePluginEvent { - return p.eventChans[pluginName] -} - func (p *exampleHandler) SendEvent(pluginName string, event examplePluginEvent) { klog.V(2).InfoS("Sending event for plugin", "pluginName", pluginName, "event", event, "channel", p.eventChans[pluginName]) p.eventChans[pluginName] <- event } -func (p *exampleHandler) AddPluginName(pluginName string) { - p.m.Lock() - defer p.m.Unlock() - - v, ok := p.ExpectedNames[pluginName] - if !ok { - p.eventChans[pluginName] = make(chan examplePluginEvent) - v = 1 - } - - p.ExpectedNames[pluginName] = v -} - func (p *exampleHandler) DecreasePluginCount(pluginName string) (old int, ok bool) { p.m.Lock() defer p.m.Unlock() diff --git a/pkg/kubelet/util/ioutils/ioutils.go b/pkg/kubelet/util/ioutils/ioutils.go index 1b2b5a6d5dd..21760543085 100644 --- a/pkg/kubelet/util/ioutils/ioutils.go +++ b/pkg/kubelet/util/ioutils/ioutils.go @@ -18,24 +18,6 @@ package ioutils import "io" -// writeCloserWrapper represents a WriteCloser whose closer operation is noop. -type writeCloserWrapper struct { - Writer io.Writer -} - -func (w *writeCloserWrapper) Write(buf []byte) (int, error) { - return w.Writer.Write(buf) -} - -func (w *writeCloserWrapper) Close() error { - return nil -} - -// WriteCloserWrapper returns a writeCloserWrapper. -func WriteCloserWrapper(w io.Writer) io.WriteCloser { - return &writeCloserWrapper{w} -} - // LimitWriter is a copy of the standard library ioutils.LimitReader, // applied to the writer interface. // LimitWriter returns a Writer that writes to w