mirror of
https://github.com/k3s-io/kubernetes.git
synced 2025-07-29 06:27:05 +00:00
Merge pull request #81206 from tallclair/staticcheck-kubelet-push
Cleanup Kubelet static analysis issues
This commit is contained in:
commit
a3488b4cee
@ -34,41 +34,11 @@ pkg/credentialprovider/aws
|
||||
pkg/credentialprovider/azure
|
||||
pkg/kubeapiserver/admission
|
||||
pkg/kubectl/cmd/get
|
||||
pkg/kubelet
|
||||
pkg/kubelet/apis/config
|
||||
pkg/kubelet/apis/podresources
|
||||
pkg/kubelet/apis/stats/v1alpha1
|
||||
pkg/kubelet/checkpoint
|
||||
pkg/kubelet/cm
|
||||
pkg/kubelet/cm/cpumanager
|
||||
pkg/kubelet/cm/cpumanager/state
|
||||
pkg/kubelet/cm/devicemanager
|
||||
pkg/kubelet/config
|
||||
pkg/kubelet/container/testing
|
||||
pkg/kubelet/dockershim/libdocker
|
||||
pkg/kubelet/dockershim/network/cni
|
||||
pkg/kubelet/dockershim/network/kubenet
|
||||
pkg/kubelet/eviction
|
||||
pkg/kubelet/kubeletconfig/checkpoint
|
||||
pkg/kubelet/kuberuntime
|
||||
pkg/kubelet/kuberuntime/logs
|
||||
pkg/kubelet/lifecycle
|
||||
pkg/kubelet/network/dns
|
||||
pkg/kubelet/pluginmanager
|
||||
pkg/kubelet/pluginmanager/metrics
|
||||
pkg/kubelet/pluginmanager/operationexecutor
|
||||
pkg/kubelet/pluginmanager/pluginwatcher
|
||||
pkg/kubelet/pluginmanager/reconciler
|
||||
pkg/kubelet/remote
|
||||
pkg/kubelet/remote/fake
|
||||
pkg/kubelet/server
|
||||
pkg/kubelet/server/streaming
|
||||
pkg/kubelet/stats
|
||||
pkg/kubelet/status
|
||||
pkg/kubelet/types
|
||||
pkg/kubelet/util/format
|
||||
pkg/kubelet/volumemanager/cache
|
||||
pkg/kubelet/volumemanager/reconciler
|
||||
pkg/master
|
||||
pkg/printers
|
||||
pkg/printers/internalversion
|
||||
|
@ -51,7 +51,7 @@ func newActiveDeadlineHandler(
|
||||
|
||||
// check for all required fields
|
||||
if clock == nil || podStatusProvider == nil || recorder == nil {
|
||||
return nil, fmt.Errorf("Required arguments must not be nil: %v, %v, %v", clock, podStatusProvider, recorder)
|
||||
return nil, fmt.Errorf("required arguments must not be nil: %v, %v, %v", clock, podStatusProvider, recorder)
|
||||
}
|
||||
return &activeDeadlineHandler{
|
||||
clock: clock,
|
||||
|
@ -79,6 +79,8 @@ func allPrimitiveFieldPaths(t *testing.T, tp reflect.Type, path *field.Path) set
|
||||
return paths
|
||||
}
|
||||
|
||||
//lint:file-ignore U1000 Ignore dummy types, used by tests.
|
||||
|
||||
// dummy helper types
|
||||
type foo struct {
|
||||
foo int
|
||||
|
@ -38,7 +38,7 @@ func GetClient(socket string, connectionTimeout time.Duration, maxMsgSize int) (
|
||||
|
||||
conn, err := grpc.DialContext(ctx, addr, grpc.WithInsecure(), grpc.WithDialer(dialer), grpc.WithDefaultCallOptions(grpc.MaxCallRecvMsgSize(maxMsgSize)))
|
||||
if err != nil {
|
||||
return nil, nil, fmt.Errorf("Error dialing socket %s: %v", socket, err)
|
||||
return nil, nil, fmt.Errorf("error dialing socket %s: %v", socket, err)
|
||||
}
|
||||
return podresourcesapi.NewPodResourcesListerClient(conn), conn, nil
|
||||
}
|
||||
|
@ -146,7 +146,7 @@ type ContainerStats struct {
|
||||
// User defined metrics that are exposed by containers in the pod. Typically, we expect only one container in the pod to be exposing user defined metrics. In the event of multiple containers exposing metrics, they will be combined here.
|
||||
// +patchMergeKey=name
|
||||
// +patchStrategy=merge
|
||||
UserDefinedMetrics []UserDefinedMetric `json:"userDefinedMetrics,omitmepty" patchStrategy:"merge" patchMergeKey:"name"`
|
||||
UserDefinedMetrics []UserDefinedMetric `json:"userDefinedMetrics,omitempty" patchStrategy:"merge" patchMergeKey:"name"`
|
||||
}
|
||||
|
||||
// PodReference contains enough information to locate the referenced pod.
|
||||
|
@ -235,32 +235,32 @@ func isClientConfigStillValid(kubeconfigPath string) (bool, error) {
|
||||
}
|
||||
bootstrapClientConfig, err := loadRESTClientConfig(kubeconfigPath)
|
||||
if err != nil {
|
||||
utilruntime.HandleError(fmt.Errorf("Unable to read existing bootstrap client config: %v", err))
|
||||
utilruntime.HandleError(fmt.Errorf("unable to read existing bootstrap client config: %v", err))
|
||||
return false, nil
|
||||
}
|
||||
transportConfig, err := bootstrapClientConfig.TransportConfig()
|
||||
if err != nil {
|
||||
utilruntime.HandleError(fmt.Errorf("Unable to load transport configuration from existing bootstrap client config: %v", err))
|
||||
utilruntime.HandleError(fmt.Errorf("unable to load transport configuration from existing bootstrap client config: %v", err))
|
||||
return false, nil
|
||||
}
|
||||
// has side effect of populating transport config data fields
|
||||
if _, err := transport.TLSConfigFor(transportConfig); err != nil {
|
||||
utilruntime.HandleError(fmt.Errorf("Unable to load TLS configuration from existing bootstrap client config: %v", err))
|
||||
utilruntime.HandleError(fmt.Errorf("unable to load TLS configuration from existing bootstrap client config: %v", err))
|
||||
return false, nil
|
||||
}
|
||||
certs, err := certutil.ParseCertsPEM(transportConfig.TLS.CertData)
|
||||
if err != nil {
|
||||
utilruntime.HandleError(fmt.Errorf("Unable to load TLS certificates from existing bootstrap client config: %v", err))
|
||||
utilruntime.HandleError(fmt.Errorf("unable to load TLS certificates from existing bootstrap client config: %v", err))
|
||||
return false, nil
|
||||
}
|
||||
if len(certs) == 0 {
|
||||
utilruntime.HandleError(fmt.Errorf("Unable to read TLS certificates from existing bootstrap client config: %v", err))
|
||||
utilruntime.HandleError(fmt.Errorf("unable to read TLS certificates from existing bootstrap client config: %v", err))
|
||||
return false, nil
|
||||
}
|
||||
now := time.Now()
|
||||
for _, cert := range certs {
|
||||
if now.After(cert.NotAfter) {
|
||||
utilruntime.HandleError(fmt.Errorf("Part of the existing bootstrap client certificate is expired: %s", cert.NotAfter))
|
||||
utilruntime.HandleError(fmt.Errorf("part of the existing bootstrap client certificate is expired: %s", cert.NotAfter))
|
||||
return false, nil
|
||||
}
|
||||
}
|
||||
|
@ -91,9 +91,7 @@ func getPodKey(pod *v1.Pod) string {
|
||||
func LoadPods(cpm checkpointmanager.CheckpointManager) ([]*v1.Pod, error) {
|
||||
pods := make([]*v1.Pod, 0)
|
||||
|
||||
var err error
|
||||
checkpointKeys := []string{}
|
||||
checkpointKeys, err = cpm.ListCheckpoints()
|
||||
checkpointKeys, err := cpm.ListCheckpoints()
|
||||
if err != nil {
|
||||
klog.Errorf("Failed to list checkpoints: %v", err)
|
||||
}
|
||||
|
@ -303,7 +303,7 @@ func (m *cgroupManagerImpl) Destroy(cgroupConfig *CgroupConfig) error {
|
||||
|
||||
// Delete cgroups using libcontainers Managers Destroy() method
|
||||
if err = manager.Destroy(); err != nil {
|
||||
return fmt.Errorf("Unable to destroy cgroup paths for cgroup %v : %v", cgroupConfig.Name, err)
|
||||
return fmt.Errorf("unable to destroy cgroup paths for cgroup %v : %v", cgroupConfig.Name, err)
|
||||
}
|
||||
|
||||
return nil
|
||||
@ -346,14 +346,14 @@ func setSupportedSubsystems(cgroupConfig *libcontainerconfigs.Cgroup) error {
|
||||
for sys, required := range getSupportedSubsystems() {
|
||||
if _, ok := cgroupConfig.Paths[sys.Name()]; !ok {
|
||||
if required {
|
||||
return fmt.Errorf("Failed to find subsystem mount for required subsystem: %v", sys.Name())
|
||||
return fmt.Errorf("failed to find subsystem mount for required subsystem: %v", sys.Name())
|
||||
}
|
||||
// the cgroup is not mounted, but its not required so continue...
|
||||
klog.V(6).Infof("Unable to find subsystem mount for optional subsystem: %v", sys.Name())
|
||||
continue
|
||||
}
|
||||
if err := sys.Set(cgroupConfig.Paths[sys.Name()], cgroupConfig); err != nil {
|
||||
return fmt.Errorf("Failed to set config for supported subsystems : %v", err)
|
||||
return fmt.Errorf("failed to set config for supported subsystems : %v", err)
|
||||
}
|
||||
}
|
||||
return nil
|
||||
@ -560,14 +560,14 @@ func getStatsSupportedSubsystems(cgroupPaths map[string]string) (*libcontainercg
|
||||
for sys, required := range getSupportedSubsystems() {
|
||||
if _, ok := cgroupPaths[sys.Name()]; !ok {
|
||||
if required {
|
||||
return nil, fmt.Errorf("Failed to find subsystem mount for required subsystem: %v", sys.Name())
|
||||
return nil, fmt.Errorf("failed to find subsystem mount for required subsystem: %v", sys.Name())
|
||||
}
|
||||
// the cgroup is not mounted, but its not required so continue...
|
||||
klog.V(6).Infof("Unable to find subsystem mount for optional subsystem: %v", sys.Name())
|
||||
continue
|
||||
}
|
||||
if err := sys.GetStats(cgroupPaths[sys.Name()], stats); err != nil {
|
||||
return nil, fmt.Errorf("Failed to get stats for supported subsystems : %v", err)
|
||||
return nil, fmt.Errorf("failed to get stats for supported subsystems : %v", err)
|
||||
}
|
||||
}
|
||||
return stats, nil
|
||||
|
@ -110,7 +110,6 @@ type containerManagerImpl struct {
|
||||
status Status
|
||||
// External containers being managed.
|
||||
systemContainers []*systemContainer
|
||||
qosContainers QOSContainersInfo
|
||||
// Tasks that are run periodically
|
||||
periodicTasks []func()
|
||||
// Holds all the mounted cgroup subsystems
|
||||
@ -215,7 +214,7 @@ func NewContainerManager(mountUtil mount.Interface, cadvisorInterface cadvisor.I
|
||||
// If there is more than one line (table headers) in /proc/swaps, swap is enabled and we should
|
||||
// error out unless --fail-swap-on is set to false.
|
||||
if len(swapLines) > 1 {
|
||||
return nil, fmt.Errorf("Running with swap on is not supported, please disable swap! or set --fail-swap-on flag to false. /proc/swaps contained: %v", swapLines)
|
||||
return nil, fmt.Errorf("running with swap on is not supported, please disable swap! or set --fail-swap-on flag to false. /proc/swaps contained: %v", swapLines)
|
||||
}
|
||||
}
|
||||
|
||||
@ -396,7 +395,7 @@ func setupKernelTunables(option KernelTunableBehavior) error {
|
||||
|
||||
switch option {
|
||||
case KernelTunableError:
|
||||
errList = append(errList, fmt.Errorf("Invalid kernel flag: %v, expected value: %v, actual value: %v", flag, expectedValue, val))
|
||||
errList = append(errList, fmt.Errorf("invalid kernel flag: %v, expected value: %v, actual value: %v", flag, expectedValue, val))
|
||||
case KernelTunableWarn:
|
||||
klog.V(2).Infof("Invalid kernel flag: %v, expected value: %v, actual value: %v", flag, expectedValue, val)
|
||||
case KernelTunableModify:
|
||||
|
@ -52,11 +52,8 @@ type staticPolicyMultiContainerTest struct {
|
||||
stAssignments state.ContainerCPUAssignments
|
||||
stDefaultCPUSet cpuset.CPUSet
|
||||
pod *v1.Pod
|
||||
expErr error
|
||||
expCPUAlloc bool
|
||||
expInitCSets []cpuset.CPUSet
|
||||
expCSets []cpuset.CPUSet
|
||||
expPanic bool
|
||||
}
|
||||
|
||||
func TestStaticPolicyName(t *testing.T) {
|
||||
|
@ -32,6 +32,7 @@ go_test(
|
||||
"//pkg/kubelet/checkpointmanager:go_default_library",
|
||||
"//pkg/kubelet/cm/cpumanager/state/testing:go_default_library",
|
||||
"//pkg/kubelet/cm/cpuset:go_default_library",
|
||||
"//vendor/github.com/stretchr/testify/require:go_default_library",
|
||||
],
|
||||
)
|
||||
|
||||
|
@ -51,6 +51,7 @@ func NewCheckpointState(stateDir, checkpointName, policyName string) (State, err
|
||||
}
|
||||
|
||||
if err := stateCheckpoint.restoreState(); err != nil {
|
||||
//lint:ignore ST1005 user-facing error message
|
||||
return nil, fmt.Errorf("could not restore state from checkpoint: %v\n"+
|
||||
"Please drain this node and delete the CPU manager checkpoint file %q before restarting Kubelet.",
|
||||
err, path.Join(stateDir, checkpointName))
|
||||
|
@ -21,6 +21,7 @@ import (
|
||||
"path"
|
||||
"testing"
|
||||
|
||||
"github.com/stretchr/testify/require"
|
||||
"k8s.io/kubernetes/pkg/kubelet/checkpointmanager"
|
||||
"k8s.io/kubernetes/pkg/kubelet/cm/cpuset"
|
||||
)
|
||||
@ -68,6 +69,7 @@ func TestCheckpointToFileCompatibility(t *testing.T) {
|
||||
defer cpm.RemoveCheckpoint(compatibilityTestingCheckpoint)
|
||||
|
||||
checkpointState, err := NewCheckpointState(testingDir, compatibilityTestingCheckpoint, "none")
|
||||
require.NoError(t, err)
|
||||
|
||||
checkpointState.SetDefaultCPUSet(state.defaultCPUSet)
|
||||
checkpointState.SetCPUAssignments(state.assignments)
|
||||
|
@ -20,10 +20,11 @@ import (
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"io/ioutil"
|
||||
"k8s.io/klog"
|
||||
"k8s.io/kubernetes/pkg/kubelet/cm/cpuset"
|
||||
"os"
|
||||
"sync"
|
||||
|
||||
"k8s.io/klog"
|
||||
"k8s.io/kubernetes/pkg/kubelet/cm/cpuset"
|
||||
)
|
||||
|
||||
type stateFileData struct {
|
||||
@ -144,7 +145,6 @@ func (sf *stateFile) storeState() {
|
||||
if err = ioutil.WriteFile(sf.stateFilePath, content, 0644); err != nil {
|
||||
panic("[cpumanager] state file not written")
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
func (sf *stateFile) GetCPUSet(containerID string) (cpuset.CPUSet, bool) {
|
||||
|
@ -276,12 +276,12 @@ func (m *ManagerImpl) RegisterPlugin(pluginName string, endpoint string, version
|
||||
|
||||
e, err := newEndpointImpl(endpoint, pluginName, m.callback)
|
||||
if err != nil {
|
||||
return fmt.Errorf("Failed to dial device plugin with socketPath %s: %v", endpoint, err)
|
||||
return fmt.Errorf("failed to dial device plugin with socketPath %s: %v", endpoint, err)
|
||||
}
|
||||
|
||||
options, err := e.client.GetDevicePluginOptions(context.Background(), &pluginapi.Empty{})
|
||||
if err != nil {
|
||||
return fmt.Errorf("Failed to get device plugin options: %v", err)
|
||||
return fmt.Errorf("failed to get device plugin options: %v", err)
|
||||
}
|
||||
|
||||
m.registerEndpoint(pluginName, options, e)
|
||||
@ -697,7 +697,7 @@ func (m *ManagerImpl) allocateContainerResources(pod *v1.Pod, container *v1.Cont
|
||||
m.mutex.Lock()
|
||||
m.allocatedDevices = m.podDevices.devices()
|
||||
m.mutex.Unlock()
|
||||
return fmt.Errorf("Unknown Device Plugin %s", resource)
|
||||
return fmt.Errorf("unknown Device Plugin %s", resource)
|
||||
}
|
||||
|
||||
devs := allocDevices.UnsortedList()
|
||||
@ -717,7 +717,7 @@ func (m *ManagerImpl) allocateContainerResources(pod *v1.Pod, container *v1.Cont
|
||||
}
|
||||
|
||||
if len(resp.ContainerResponses) == 0 {
|
||||
return fmt.Errorf("No containers return in allocation response %v", resp)
|
||||
return fmt.Errorf("no containers return in allocation response %v", resp)
|
||||
}
|
||||
|
||||
// Update internal cached podDevices state.
|
||||
|
@ -109,8 +109,8 @@ func TestDevicePluginReRegistration(t *testing.T) {
|
||||
t.Fatalf("timeout while waiting for manager update")
|
||||
}
|
||||
capacity, allocatable, _ := m.GetCapacity()
|
||||
resourceCapacity, _ := capacity[v1.ResourceName(testResourceName)]
|
||||
resourceAllocatable, _ := allocatable[v1.ResourceName(testResourceName)]
|
||||
resourceCapacity := capacity[v1.ResourceName(testResourceName)]
|
||||
resourceAllocatable := allocatable[v1.ResourceName(testResourceName)]
|
||||
require.Equal(t, resourceCapacity.Value(), resourceAllocatable.Value(), "capacity should equal to allocatable")
|
||||
require.Equal(t, int64(2), resourceAllocatable.Value(), "Devices are not updated.")
|
||||
|
||||
@ -125,8 +125,8 @@ func TestDevicePluginReRegistration(t *testing.T) {
|
||||
t.Fatalf("timeout while waiting for manager update")
|
||||
}
|
||||
capacity, allocatable, _ = m.GetCapacity()
|
||||
resourceCapacity, _ = capacity[v1.ResourceName(testResourceName)]
|
||||
resourceAllocatable, _ = allocatable[v1.ResourceName(testResourceName)]
|
||||
resourceCapacity = capacity[v1.ResourceName(testResourceName)]
|
||||
resourceAllocatable = allocatable[v1.ResourceName(testResourceName)]
|
||||
require.Equal(t, resourceCapacity.Value(), resourceAllocatable.Value(), "capacity should equal to allocatable")
|
||||
require.Equal(t, int64(2), resourceAllocatable.Value(), "Devices shouldn't change.")
|
||||
|
||||
@ -142,8 +142,8 @@ func TestDevicePluginReRegistration(t *testing.T) {
|
||||
t.Fatalf("timeout while waiting for manager update")
|
||||
}
|
||||
capacity, allocatable, _ = m.GetCapacity()
|
||||
resourceCapacity, _ = capacity[v1.ResourceName(testResourceName)]
|
||||
resourceAllocatable, _ = allocatable[v1.ResourceName(testResourceName)]
|
||||
resourceCapacity = capacity[v1.ResourceName(testResourceName)]
|
||||
resourceAllocatable = allocatable[v1.ResourceName(testResourceName)]
|
||||
require.Equal(t, resourceCapacity.Value(), resourceAllocatable.Value(), "capacity should equal to allocatable")
|
||||
require.Equal(t, int64(1), resourceAllocatable.Value(), "Devices of plugin previously registered should be removed.")
|
||||
p2.Stop()
|
||||
@ -178,8 +178,8 @@ func TestDevicePluginReRegistrationProbeMode(t *testing.T) {
|
||||
t.FailNow()
|
||||
}
|
||||
capacity, allocatable, _ := m.GetCapacity()
|
||||
resourceCapacity, _ := capacity[v1.ResourceName(testResourceName)]
|
||||
resourceAllocatable, _ := allocatable[v1.ResourceName(testResourceName)]
|
||||
resourceCapacity := capacity[v1.ResourceName(testResourceName)]
|
||||
resourceAllocatable := allocatable[v1.ResourceName(testResourceName)]
|
||||
require.Equal(t, resourceCapacity.Value(), resourceAllocatable.Value(), "capacity should equal to allocatable")
|
||||
require.Equal(t, int64(2), resourceAllocatable.Value(), "Devices are not updated.")
|
||||
|
||||
@ -194,8 +194,8 @@ func TestDevicePluginReRegistrationProbeMode(t *testing.T) {
|
||||
}
|
||||
|
||||
capacity, allocatable, _ = m.GetCapacity()
|
||||
resourceCapacity, _ = capacity[v1.ResourceName(testResourceName)]
|
||||
resourceAllocatable, _ = allocatable[v1.ResourceName(testResourceName)]
|
||||
resourceCapacity = capacity[v1.ResourceName(testResourceName)]
|
||||
resourceAllocatable = allocatable[v1.ResourceName(testResourceName)]
|
||||
require.Equal(t, resourceCapacity.Value(), resourceAllocatable.Value(), "capacity should equal to allocatable")
|
||||
require.Equal(t, int64(2), resourceAllocatable.Value(), "Devices are not updated.")
|
||||
|
||||
@ -211,8 +211,8 @@ func TestDevicePluginReRegistrationProbeMode(t *testing.T) {
|
||||
}
|
||||
|
||||
capacity, allocatable, _ = m.GetCapacity()
|
||||
resourceCapacity, _ = capacity[v1.ResourceName(testResourceName)]
|
||||
resourceAllocatable, _ = allocatable[v1.ResourceName(testResourceName)]
|
||||
resourceCapacity = capacity[v1.ResourceName(testResourceName)]
|
||||
resourceAllocatable = allocatable[v1.ResourceName(testResourceName)]
|
||||
require.Equal(t, resourceCapacity.Value(), resourceAllocatable.Value(), "capacity should equal to allocatable")
|
||||
require.Equal(t, int64(1), resourceAllocatable.Value(), "Devices of previous registered should be removed")
|
||||
p2.Stop()
|
||||
@ -373,17 +373,14 @@ func TestUpdateCapacityAllocatable(t *testing.T) {
|
||||
e1.setStopTime(time.Now().Add(-1*endpointStopGracePeriod - time.Duration(10)*time.Second))
|
||||
capacity, allocatable, removed := testManager.GetCapacity()
|
||||
as.Equal([]string{resourceName1}, removed)
|
||||
_, ok = capacity[v1.ResourceName(resourceName1)]
|
||||
as.False(ok)
|
||||
as.NotContains(capacity, v1.ResourceName(resourceName1))
|
||||
as.NotContains(allocatable, v1.ResourceName(resourceName1))
|
||||
val, ok := capacity[v1.ResourceName(resourceName2)]
|
||||
as.True(ok)
|
||||
as.Equal(int64(3), val.Value())
|
||||
_, ok = testManager.healthyDevices[resourceName1]
|
||||
as.False(ok)
|
||||
_, ok = testManager.unhealthyDevices[resourceName1]
|
||||
as.False(ok)
|
||||
_, ok = testManager.endpoints[resourceName1]
|
||||
as.False(ok)
|
||||
as.NotContains(testManager.healthyDevices, resourceName1)
|
||||
as.NotContains(testManager.unhealthyDevices, resourceName1)
|
||||
as.NotContains(testManager.endpoints, resourceName1)
|
||||
as.Equal(1, len(testManager.endpoints))
|
||||
|
||||
// Stops resourceName2 endpoint. Verifies its stopTime is set, allocate and
|
||||
@ -417,12 +414,14 @@ func TestUpdateCapacityAllocatable(t *testing.T) {
|
||||
err = testManager.readCheckpoint()
|
||||
as.Nil(err)
|
||||
as.Equal(1, len(testManager.endpoints))
|
||||
_, ok = testManager.endpoints[resourceName2]
|
||||
as.True(ok)
|
||||
as.Contains(testManager.endpoints, resourceName2)
|
||||
capacity, allocatable, removed = testManager.GetCapacity()
|
||||
val, ok = capacity[v1.ResourceName(resourceName2)]
|
||||
as.True(ok)
|
||||
as.Equal(int64(0), val.Value())
|
||||
val, ok = allocatable[v1.ResourceName(resourceName2)]
|
||||
as.True(ok)
|
||||
as.Equal(int64(0), val.Value())
|
||||
as.Empty(removed)
|
||||
as.True(testManager.isDevicePluginResource(resourceName2))
|
||||
}
|
||||
|
@ -237,7 +237,7 @@ func (cm *containerManagerImpl) validateNodeAllocatable() error {
|
||||
}
|
||||
|
||||
if len(errors) > 0 {
|
||||
return fmt.Errorf("Invalid Node Allocatable configuration. %s", strings.Join(errors, " "))
|
||||
return fmt.Errorf("invalid Node Allocatable configuration. %s", strings.Join(errors, " "))
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
@ -49,7 +49,6 @@ type QOSContainerManager interface {
|
||||
|
||||
type qosContainerManagerImpl struct {
|
||||
sync.Mutex
|
||||
nodeInfo *v1.Node
|
||||
qosContainersInfo QOSContainersInfo
|
||||
subsystems *CgroupSubsystems
|
||||
cgroupManager CgroupManager
|
||||
|
@ -164,8 +164,8 @@ func (m *manager) calculateAffinity(pod v1.Pod, container v1.Container) Topology
|
||||
// Get the TopologyHints from a provider.
|
||||
hints := provider.GetTopologyHints(pod, container)
|
||||
|
||||
// If hints is nil, insert a single, preferred any-socket hint into allProviderHints.
|
||||
if hints == nil || len(hints) == 0 {
|
||||
// If hints is empty, insert a single, preferred any-socket hint into allProviderHints.
|
||||
if len(hints) == 0 {
|
||||
klog.Infof("[topologymanager] Hint Provider has no preference for socket affinity with any resource")
|
||||
affinity, _ := socketmask.NewSocketMask()
|
||||
affinity.Fill()
|
||||
@ -294,7 +294,7 @@ func (m *manager) Admit(attrs *lifecycle.PodAdmitAttributes) lifecycle.PodAdmitR
|
||||
for _, container := range append(pod.Spec.InitContainers, pod.Spec.Containers...) {
|
||||
result := m.calculateAffinity(*pod, container)
|
||||
admitPod := m.policy.CanAdmitPodResult(result.Preferred)
|
||||
if admitPod.Admit == false {
|
||||
if !admitPod.Admit {
|
||||
return admitPod
|
||||
}
|
||||
c[container.Name] = result
|
||||
|
@ -382,7 +382,7 @@ func TestPodUpdateAnnotations(t *testing.T) {
|
||||
channel, ch, _ := createPodConfigTester(PodConfigNotificationIncremental)
|
||||
|
||||
pod := CreateValidPod("foo2", "new")
|
||||
pod.Annotations = make(map[string]string, 0)
|
||||
pod.Annotations = make(map[string]string)
|
||||
pod.Annotations["kubernetes.io/blah"] = "blah"
|
||||
|
||||
clone := pod.DeepCopy()
|
||||
@ -411,7 +411,7 @@ func TestPodUpdateLabels(t *testing.T) {
|
||||
channel, ch, _ := createPodConfigTester(PodConfigNotificationIncremental)
|
||||
|
||||
pod := CreateValidPod("foo2", "new")
|
||||
pod.Labels = make(map[string]string, 0)
|
||||
pod.Labels = make(map[string]string)
|
||||
pod.Labels["key"] = "value"
|
||||
|
||||
clone := pod.DeepCopy()
|
||||
@ -432,7 +432,7 @@ func TestPodRestore(t *testing.T) {
|
||||
defer os.RemoveAll(tmpDir)
|
||||
|
||||
pod := CreateValidPod("api-server", "kube-default")
|
||||
pod.Annotations = make(map[string]string, 0)
|
||||
pod.Annotations = make(map[string]string)
|
||||
pod.Annotations["kubernetes.io/config.source"] = kubetypes.ApiserverSource
|
||||
pod.Annotations[core.BootstrapCheckpointAnnotationKey] = "true"
|
||||
|
||||
|
@ -130,11 +130,10 @@ func TestWatchFileChanged(t *testing.T) {
|
||||
}
|
||||
|
||||
type testCase struct {
|
||||
lock *sync.Mutex
|
||||
desc string
|
||||
linkedFile string
|
||||
pod runtime.Object
|
||||
expected kubetypes.PodUpdate
|
||||
lock *sync.Mutex
|
||||
desc string
|
||||
pod runtime.Object
|
||||
expected kubetypes.PodUpdate
|
||||
}
|
||||
|
||||
func getTestCases(hostname types.NodeName) []*testCase {
|
||||
|
@ -106,7 +106,7 @@ func (s *sourceURL) extractFromURL() error {
|
||||
return fmt.Errorf("zero-length data received from %v", s.url)
|
||||
}
|
||||
// Short circuit if the data has not changed since the last time it was read.
|
||||
if bytes.Compare(data, s.data) == 0 {
|
||||
if bytes.Equal(data, s.data) {
|
||||
return nil
|
||||
}
|
||||
s.data = data
|
||||
@ -138,6 +138,6 @@ func (s *sourceURL) extractFromURL() error {
|
||||
}
|
||||
|
||||
return fmt.Errorf("%v: received '%v', but couldn't parse as "+
|
||||
"single (%v) or multiple pods (%v).\n",
|
||||
"single (%v) or multiple pods (%v)",
|
||||
s.url, string(data), singlePodErr, multiPodErr)
|
||||
}
|
||||
|
@ -116,11 +116,11 @@ func (p *PodSyncResult) Fail(err error) {
|
||||
func (p *PodSyncResult) Error() error {
|
||||
errlist := []error{}
|
||||
if p.SyncError != nil {
|
||||
errlist = append(errlist, fmt.Errorf("failed to SyncPod: %v\n", p.SyncError))
|
||||
errlist = append(errlist, fmt.Errorf("failed to SyncPod: %v", p.SyncError))
|
||||
}
|
||||
for _, result := range p.SyncResults {
|
||||
if result.Error != nil {
|
||||
errlist = append(errlist, fmt.Errorf("failed to %q for %q with %v: %q\n", result.Action, result.Target,
|
||||
errlist = append(errlist, fmt.Errorf("failed to %q for %q with %v: %q", result.Action, result.Target,
|
||||
result.Error, result.Message))
|
||||
}
|
||||
}
|
||||
|
@ -29,12 +29,12 @@ import (
|
||||
"k8s.io/apimachinery/pkg/types"
|
||||
"k8s.io/client-go/util/flowcontrol"
|
||||
runtimeapi "k8s.io/cri-api/pkg/apis/runtime/v1alpha2"
|
||||
. "k8s.io/kubernetes/pkg/kubelet/container"
|
||||
kubecontainer "k8s.io/kubernetes/pkg/kubelet/container"
|
||||
"k8s.io/kubernetes/pkg/volume"
|
||||
)
|
||||
|
||||
type FakePod struct {
|
||||
Pod *Pod
|
||||
Pod *kubecontainer.Pod
|
||||
NetnsPath string
|
||||
}
|
||||
|
||||
@ -44,14 +44,14 @@ type FakeRuntime struct {
|
||||
CalledFunctions []string
|
||||
PodList []*FakePod
|
||||
AllPodList []*FakePod
|
||||
ImageList []Image
|
||||
ImageList []kubecontainer.Image
|
||||
APIPodStatus v1.PodStatus
|
||||
PodStatus PodStatus
|
||||
PodStatus kubecontainer.PodStatus
|
||||
StartedPods []string
|
||||
KilledPods []string
|
||||
StartedContainers []string
|
||||
KilledContainers []string
|
||||
RuntimeStatus *RuntimeStatus
|
||||
RuntimeStatus *kubecontainer.RuntimeStatus
|
||||
VersionInfo string
|
||||
APIVersionInfo string
|
||||
RuntimeType string
|
||||
@ -66,10 +66,10 @@ type FakeStreamingRuntime struct {
|
||||
*FakeRuntime
|
||||
}
|
||||
|
||||
var _ StreamingRuntime = &FakeStreamingRuntime{}
|
||||
var _ kubecontainer.StreamingRuntime = &FakeStreamingRuntime{}
|
||||
|
||||
// FakeRuntime should implement Runtime.
|
||||
var _ Runtime = &FakeRuntime{}
|
||||
var _ kubecontainer.Runtime = &FakeRuntime{}
|
||||
|
||||
type FakeVersion struct {
|
||||
Version string
|
||||
@ -90,18 +90,18 @@ func (fv *FakeVersion) Compare(other string) (int, error) {
|
||||
}
|
||||
|
||||
type podsGetter interface {
|
||||
GetPods(bool) ([]*Pod, error)
|
||||
GetPods(bool) ([]*kubecontainer.Pod, error)
|
||||
}
|
||||
|
||||
type FakeRuntimeCache struct {
|
||||
getter podsGetter
|
||||
}
|
||||
|
||||
func NewFakeRuntimeCache(getter podsGetter) RuntimeCache {
|
||||
func NewFakeRuntimeCache(getter podsGetter) kubecontainer.RuntimeCache {
|
||||
return &FakeRuntimeCache{getter}
|
||||
}
|
||||
|
||||
func (f *FakeRuntimeCache) GetPods() ([]*Pod, error) {
|
||||
func (f *FakeRuntimeCache) GetPods() ([]*kubecontainer.Pod, error) {
|
||||
return f.getter.GetPods(false)
|
||||
}
|
||||
|
||||
@ -177,7 +177,7 @@ func (f *FakeRuntime) Type() string {
|
||||
return f.RuntimeType
|
||||
}
|
||||
|
||||
func (f *FakeRuntime) Version() (Version, error) {
|
||||
func (f *FakeRuntime) Version() (kubecontainer.Version, error) {
|
||||
f.Lock()
|
||||
defer f.Unlock()
|
||||
|
||||
@ -185,7 +185,7 @@ func (f *FakeRuntime) Version() (Version, error) {
|
||||
return &FakeVersion{Version: f.VersionInfo}, f.Err
|
||||
}
|
||||
|
||||
func (f *FakeRuntime) APIVersion() (Version, error) {
|
||||
func (f *FakeRuntime) APIVersion() (kubecontainer.Version, error) {
|
||||
f.Lock()
|
||||
defer f.Unlock()
|
||||
|
||||
@ -193,7 +193,7 @@ func (f *FakeRuntime) APIVersion() (Version, error) {
|
||||
return &FakeVersion{Version: f.APIVersionInfo}, f.Err
|
||||
}
|
||||
|
||||
func (f *FakeRuntime) Status() (*RuntimeStatus, error) {
|
||||
func (f *FakeRuntime) Status() (*kubecontainer.RuntimeStatus, error) {
|
||||
f.Lock()
|
||||
defer f.Unlock()
|
||||
|
||||
@ -201,11 +201,11 @@ func (f *FakeRuntime) Status() (*RuntimeStatus, error) {
|
||||
return f.RuntimeStatus, f.StatusErr
|
||||
}
|
||||
|
||||
func (f *FakeRuntime) GetPods(all bool) ([]*Pod, error) {
|
||||
func (f *FakeRuntime) GetPods(all bool) ([]*kubecontainer.Pod, error) {
|
||||
f.Lock()
|
||||
defer f.Unlock()
|
||||
|
||||
var pods []*Pod
|
||||
var pods []*kubecontainer.Pod
|
||||
|
||||
f.CalledFunctions = append(f.CalledFunctions, "GetPods")
|
||||
if all {
|
||||
@ -220,7 +220,7 @@ func (f *FakeRuntime) GetPods(all bool) ([]*Pod, error) {
|
||||
return pods, f.Err
|
||||
}
|
||||
|
||||
func (f *FakeRuntime) SyncPod(pod *v1.Pod, _ *PodStatus, _ []v1.Secret, backOff *flowcontrol.Backoff) (result PodSyncResult) {
|
||||
func (f *FakeRuntime) SyncPod(pod *v1.Pod, _ *kubecontainer.PodStatus, _ []v1.Secret, backOff *flowcontrol.Backoff) (result kubecontainer.PodSyncResult) {
|
||||
f.Lock()
|
||||
defer f.Unlock()
|
||||
|
||||
@ -236,7 +236,7 @@ func (f *FakeRuntime) SyncPod(pod *v1.Pod, _ *PodStatus, _ []v1.Secret, backOff
|
||||
return
|
||||
}
|
||||
|
||||
func (f *FakeRuntime) KillPod(pod *v1.Pod, runningPod Pod, gracePeriodOverride *int64) error {
|
||||
func (f *FakeRuntime) KillPod(pod *v1.Pod, runningPod kubecontainer.Pod, gracePeriodOverride *int64) error {
|
||||
f.Lock()
|
||||
defer f.Unlock()
|
||||
|
||||
@ -271,18 +271,10 @@ func (f *FakeRuntime) KillContainerInPod(container v1.Container, pod *v1.Pod) er
|
||||
|
||||
f.CalledFunctions = append(f.CalledFunctions, "KillContainerInPod")
|
||||
f.KilledContainers = append(f.KilledContainers, container.Name)
|
||||
|
||||
var containers []v1.Container
|
||||
for _, c := range pod.Spec.Containers {
|
||||
if c.Name == container.Name {
|
||||
continue
|
||||
}
|
||||
containers = append(containers, c)
|
||||
}
|
||||
return f.Err
|
||||
}
|
||||
|
||||
func (f *FakeRuntime) GetPodStatus(uid types.UID, name, namespace string) (*PodStatus, error) {
|
||||
func (f *FakeRuntime) GetPodStatus(uid types.UID, name, namespace string) (*kubecontainer.PodStatus, error) {
|
||||
f.Lock()
|
||||
defer f.Unlock()
|
||||
|
||||
@ -291,7 +283,7 @@ func (f *FakeRuntime) GetPodStatus(uid types.UID, name, namespace string) (*PodS
|
||||
return &status, f.Err
|
||||
}
|
||||
|
||||
func (f *FakeRuntime) GetContainerLogs(_ context.Context, pod *v1.Pod, containerID ContainerID, logOptions *v1.PodLogOptions, stdout, stderr io.Writer) (err error) {
|
||||
func (f *FakeRuntime) GetContainerLogs(_ context.Context, pod *v1.Pod, containerID kubecontainer.ContainerID, logOptions *v1.PodLogOptions, stdout, stderr io.Writer) (err error) {
|
||||
f.Lock()
|
||||
defer f.Unlock()
|
||||
|
||||
@ -299,7 +291,7 @@ func (f *FakeRuntime) GetContainerLogs(_ context.Context, pod *v1.Pod, container
|
||||
return f.Err
|
||||
}
|
||||
|
||||
func (f *FakeRuntime) PullImage(image ImageSpec, pullSecrets []v1.Secret, podSandboxConfig *runtimeapi.PodSandboxConfig) (string, error) {
|
||||
func (f *FakeRuntime) PullImage(image kubecontainer.ImageSpec, pullSecrets []v1.Secret, podSandboxConfig *runtimeapi.PodSandboxConfig) (string, error) {
|
||||
f.Lock()
|
||||
defer f.Unlock()
|
||||
|
||||
@ -307,7 +299,7 @@ func (f *FakeRuntime) PullImage(image ImageSpec, pullSecrets []v1.Secret, podSan
|
||||
return image.Image, f.Err
|
||||
}
|
||||
|
||||
func (f *FakeRuntime) GetImageRef(image ImageSpec) (string, error) {
|
||||
func (f *FakeRuntime) GetImageRef(image kubecontainer.ImageSpec) (string, error) {
|
||||
f.Lock()
|
||||
defer f.Unlock()
|
||||
|
||||
@ -320,7 +312,7 @@ func (f *FakeRuntime) GetImageRef(image ImageSpec) (string, error) {
|
||||
return "", f.InspectErr
|
||||
}
|
||||
|
||||
func (f *FakeRuntime) ListImages() ([]Image, error) {
|
||||
func (f *FakeRuntime) ListImages() ([]kubecontainer.Image, error) {
|
||||
f.Lock()
|
||||
defer f.Unlock()
|
||||
|
||||
@ -328,7 +320,7 @@ func (f *FakeRuntime) ListImages() ([]Image, error) {
|
||||
return f.ImageList, f.Err
|
||||
}
|
||||
|
||||
func (f *FakeRuntime) RemoveImage(image ImageSpec) error {
|
||||
func (f *FakeRuntime) RemoveImage(image kubecontainer.ImageSpec) error {
|
||||
f.Lock()
|
||||
defer f.Unlock()
|
||||
|
||||
@ -345,7 +337,7 @@ func (f *FakeRuntime) RemoveImage(image ImageSpec) error {
|
||||
return f.Err
|
||||
}
|
||||
|
||||
func (f *FakeRuntime) GarbageCollect(gcPolicy ContainerGCPolicy, ready bool, evictNonDeletedPods bool) error {
|
||||
func (f *FakeRuntime) GarbageCollect(gcPolicy kubecontainer.ContainerGCPolicy, ready bool, evictNonDeletedPods bool) error {
|
||||
f.Lock()
|
||||
defer f.Unlock()
|
||||
|
||||
@ -353,7 +345,7 @@ func (f *FakeRuntime) GarbageCollect(gcPolicy ContainerGCPolicy, ready bool, evi
|
||||
return f.Err
|
||||
}
|
||||
|
||||
func (f *FakeRuntime) DeleteContainer(containerID ContainerID) error {
|
||||
func (f *FakeRuntime) DeleteContainer(containerID kubecontainer.ContainerID) error {
|
||||
f.Lock()
|
||||
defer f.Unlock()
|
||||
|
||||
@ -361,7 +353,7 @@ func (f *FakeRuntime) DeleteContainer(containerID ContainerID) error {
|
||||
return f.Err
|
||||
}
|
||||
|
||||
func (f *FakeRuntime) ImageStats() (*ImageStats, error) {
|
||||
func (f *FakeRuntime) ImageStats() (*kubecontainer.ImageStats, error) {
|
||||
f.Lock()
|
||||
defer f.Unlock()
|
||||
|
||||
@ -369,7 +361,7 @@ func (f *FakeRuntime) ImageStats() (*ImageStats, error) {
|
||||
return nil, f.Err
|
||||
}
|
||||
|
||||
func (f *FakeStreamingRuntime) GetExec(id ContainerID, cmd []string, stdin, stdout, stderr, tty bool) (*url.URL, error) {
|
||||
func (f *FakeStreamingRuntime) GetExec(id kubecontainer.ContainerID, cmd []string, stdin, stdout, stderr, tty bool) (*url.URL, error) {
|
||||
f.Lock()
|
||||
defer f.Unlock()
|
||||
|
||||
@ -377,7 +369,7 @@ func (f *FakeStreamingRuntime) GetExec(id ContainerID, cmd []string, stdin, stdo
|
||||
return &url.URL{Host: FakeHost}, f.Err
|
||||
}
|
||||
|
||||
func (f *FakeStreamingRuntime) GetAttach(id ContainerID, stdin, stdout, stderr, tty bool) (*url.URL, error) {
|
||||
func (f *FakeStreamingRuntime) GetAttach(id kubecontainer.ContainerID, stdin, stdout, stderr, tty bool) (*url.URL, error) {
|
||||
f.Lock()
|
||||
defer f.Unlock()
|
||||
|
||||
@ -399,13 +391,13 @@ type FakeContainerCommandRunner struct {
|
||||
Err error
|
||||
|
||||
// actual values when invoked
|
||||
ContainerID ContainerID
|
||||
ContainerID kubecontainer.ContainerID
|
||||
Cmd []string
|
||||
}
|
||||
|
||||
var _ ContainerCommandRunner = &FakeContainerCommandRunner{}
|
||||
var _ kubecontainer.ContainerCommandRunner = &FakeContainerCommandRunner{}
|
||||
|
||||
func (f *FakeContainerCommandRunner) RunInContainer(containerID ContainerID, cmd []string, timeout time.Duration) ([]byte, error) {
|
||||
func (f *FakeContainerCommandRunner) RunInContainer(containerID kubecontainer.ContainerID, cmd []string, timeout time.Duration) ([]byte, error) {
|
||||
// record invoked values
|
||||
f.ContainerID = containerID
|
||||
f.Cmd = cmd
|
||||
|
@ -27,7 +27,7 @@ import (
|
||||
"k8s.io/client-go/tools/remotecommand"
|
||||
"k8s.io/client-go/util/flowcontrol"
|
||||
runtimeapi "k8s.io/cri-api/pkg/apis/runtime/v1alpha2"
|
||||
. "k8s.io/kubernetes/pkg/kubelet/container"
|
||||
kubecontainer "k8s.io/kubernetes/pkg/kubelet/container"
|
||||
"k8s.io/kubernetes/pkg/volume"
|
||||
)
|
||||
|
||||
@ -35,7 +35,7 @@ type Mock struct {
|
||||
mock.Mock
|
||||
}
|
||||
|
||||
var _ Runtime = new(Mock)
|
||||
var _ kubecontainer.Runtime = new(Mock)
|
||||
|
||||
func (r *Mock) Start() error {
|
||||
args := r.Called()
|
||||
@ -47,32 +47,32 @@ func (r *Mock) Type() string {
|
||||
return args.Get(0).(string)
|
||||
}
|
||||
|
||||
func (r *Mock) Version() (Version, error) {
|
||||
func (r *Mock) Version() (kubecontainer.Version, error) {
|
||||
args := r.Called()
|
||||
return args.Get(0).(Version), args.Error(1)
|
||||
return args.Get(0).(kubecontainer.Version), args.Error(1)
|
||||
}
|
||||
|
||||
func (r *Mock) APIVersion() (Version, error) {
|
||||
func (r *Mock) APIVersion() (kubecontainer.Version, error) {
|
||||
args := r.Called()
|
||||
return args.Get(0).(Version), args.Error(1)
|
||||
return args.Get(0).(kubecontainer.Version), args.Error(1)
|
||||
}
|
||||
|
||||
func (r *Mock) Status() (*RuntimeStatus, error) {
|
||||
func (r *Mock) Status() (*kubecontainer.RuntimeStatus, error) {
|
||||
args := r.Called()
|
||||
return args.Get(0).(*RuntimeStatus), args.Error(0)
|
||||
return args.Get(0).(*kubecontainer.RuntimeStatus), args.Error(0)
|
||||
}
|
||||
|
||||
func (r *Mock) GetPods(all bool) ([]*Pod, error) {
|
||||
func (r *Mock) GetPods(all bool) ([]*kubecontainer.Pod, error) {
|
||||
args := r.Called(all)
|
||||
return args.Get(0).([]*Pod), args.Error(1)
|
||||
return args.Get(0).([]*kubecontainer.Pod), args.Error(1)
|
||||
}
|
||||
|
||||
func (r *Mock) SyncPod(pod *v1.Pod, status *PodStatus, secrets []v1.Secret, backOff *flowcontrol.Backoff) PodSyncResult {
|
||||
func (r *Mock) SyncPod(pod *v1.Pod, status *kubecontainer.PodStatus, secrets []v1.Secret, backOff *flowcontrol.Backoff) kubecontainer.PodSyncResult {
|
||||
args := r.Called(pod, status, secrets, backOff)
|
||||
return args.Get(0).(PodSyncResult)
|
||||
return args.Get(0).(kubecontainer.PodSyncResult)
|
||||
}
|
||||
|
||||
func (r *Mock) KillPod(pod *v1.Pod, runningPod Pod, gracePeriodOverride *int64) error {
|
||||
func (r *Mock) KillPod(pod *v1.Pod, runningPod kubecontainer.Pod, gracePeriodOverride *int64) error {
|
||||
args := r.Called(pod, runningPod, gracePeriodOverride)
|
||||
return args.Error(0)
|
||||
}
|
||||
@ -87,64 +87,64 @@ func (r *Mock) KillContainerInPod(container v1.Container, pod *v1.Pod) error {
|
||||
return args.Error(0)
|
||||
}
|
||||
|
||||
func (r *Mock) GetPodStatus(uid types.UID, name, namespace string) (*PodStatus, error) {
|
||||
func (r *Mock) GetPodStatus(uid types.UID, name, namespace string) (*kubecontainer.PodStatus, error) {
|
||||
args := r.Called(uid, name, namespace)
|
||||
return args.Get(0).(*PodStatus), args.Error(1)
|
||||
return args.Get(0).(*kubecontainer.PodStatus), args.Error(1)
|
||||
}
|
||||
|
||||
func (r *Mock) ExecInContainer(containerID ContainerID, cmd []string, stdin io.Reader, stdout, stderr io.WriteCloser, tty bool, resize <-chan remotecommand.TerminalSize, timeout time.Duration) error {
|
||||
func (r *Mock) ExecInContainer(containerID kubecontainer.ContainerID, cmd []string, stdin io.Reader, stdout, stderr io.WriteCloser, tty bool, resize <-chan remotecommand.TerminalSize, timeout time.Duration) error {
|
||||
args := r.Called(containerID, cmd, stdin, stdout, stderr, tty)
|
||||
return args.Error(0)
|
||||
}
|
||||
|
||||
func (r *Mock) AttachContainer(containerID ContainerID, stdin io.Reader, stdout, stderr io.WriteCloser, tty bool, resize <-chan remotecommand.TerminalSize) error {
|
||||
func (r *Mock) AttachContainer(containerID kubecontainer.ContainerID, stdin io.Reader, stdout, stderr io.WriteCloser, tty bool, resize <-chan remotecommand.TerminalSize) error {
|
||||
args := r.Called(containerID, stdin, stdout, stderr, tty)
|
||||
return args.Error(0)
|
||||
}
|
||||
|
||||
func (r *Mock) GetContainerLogs(_ context.Context, pod *v1.Pod, containerID ContainerID, logOptions *v1.PodLogOptions, stdout, stderr io.Writer) (err error) {
|
||||
func (r *Mock) GetContainerLogs(_ context.Context, pod *v1.Pod, containerID kubecontainer.ContainerID, logOptions *v1.PodLogOptions, stdout, stderr io.Writer) (err error) {
|
||||
args := r.Called(pod, containerID, logOptions, stdout, stderr)
|
||||
return args.Error(0)
|
||||
}
|
||||
|
||||
func (r *Mock) PullImage(image ImageSpec, pullSecrets []v1.Secret, podSandboxConfig *runtimeapi.PodSandboxConfig) (string, error) {
|
||||
func (r *Mock) PullImage(image kubecontainer.ImageSpec, pullSecrets []v1.Secret, podSandboxConfig *runtimeapi.PodSandboxConfig) (string, error) {
|
||||
args := r.Called(image, pullSecrets)
|
||||
return image.Image, args.Error(0)
|
||||
}
|
||||
|
||||
func (r *Mock) GetImageRef(image ImageSpec) (string, error) {
|
||||
func (r *Mock) GetImageRef(image kubecontainer.ImageSpec) (string, error) {
|
||||
args := r.Called(image)
|
||||
return args.Get(0).(string), args.Error(1)
|
||||
}
|
||||
|
||||
func (r *Mock) ListImages() ([]Image, error) {
|
||||
func (r *Mock) ListImages() ([]kubecontainer.Image, error) {
|
||||
args := r.Called()
|
||||
return args.Get(0).([]Image), args.Error(1)
|
||||
return args.Get(0).([]kubecontainer.Image), args.Error(1)
|
||||
}
|
||||
|
||||
func (r *Mock) RemoveImage(image ImageSpec) error {
|
||||
func (r *Mock) RemoveImage(image kubecontainer.ImageSpec) error {
|
||||
args := r.Called(image)
|
||||
return args.Error(0)
|
||||
}
|
||||
|
||||
func (r *Mock) PortForward(pod *Pod, port uint16, stream io.ReadWriteCloser) error {
|
||||
func (r *Mock) PortForward(pod *kubecontainer.Pod, port uint16, stream io.ReadWriteCloser) error {
|
||||
args := r.Called(pod, port, stream)
|
||||
return args.Error(0)
|
||||
}
|
||||
|
||||
func (r *Mock) GarbageCollect(gcPolicy ContainerGCPolicy, ready bool, evictNonDeletedPods bool) error {
|
||||
func (r *Mock) GarbageCollect(gcPolicy kubecontainer.ContainerGCPolicy, ready bool, evictNonDeletedPods bool) error {
|
||||
args := r.Called(gcPolicy, ready, evictNonDeletedPods)
|
||||
return args.Error(0)
|
||||
}
|
||||
|
||||
func (r *Mock) DeleteContainer(containerID ContainerID) error {
|
||||
func (r *Mock) DeleteContainer(containerID kubecontainer.ContainerID) error {
|
||||
args := r.Called(containerID)
|
||||
return args.Error(0)
|
||||
}
|
||||
|
||||
func (r *Mock) ImageStats() (*ImageStats, error) {
|
||||
func (r *Mock) ImageStats() (*kubecontainer.ImageStats, error) {
|
||||
args := r.Called()
|
||||
return args.Get(0).(*ImageStats), args.Error(1)
|
||||
return args.Get(0).(*kubecontainer.ImageStats), args.Error(1)
|
||||
}
|
||||
|
||||
// UpdatePodCIDR fulfills the cri interface.
|
||||
|
@ -527,7 +527,7 @@ func (ds *dockerService) getDockerVersionFromCache() (*dockertypes.Version, erro
|
||||
}
|
||||
dv, ok := value.(*dockertypes.Version)
|
||||
if !ok {
|
||||
return nil, fmt.Errorf("Converted to *dockertype.Version error")
|
||||
return nil, fmt.Errorf("converted to *dockertype.Version error")
|
||||
}
|
||||
return dv, nil
|
||||
}
|
||||
|
@ -41,14 +41,14 @@ func (r *streamingRuntime) portForward(podSandboxID string, port int32, stream i
|
||||
containerPid := container.State.Pid
|
||||
socatPath, lookupErr := exec.LookPath("socat")
|
||||
if lookupErr != nil {
|
||||
return fmt.Errorf("unable to do port forwarding: socat not found.")
|
||||
return fmt.Errorf("unable to do port forwarding: socat not found")
|
||||
}
|
||||
|
||||
args := []string{"-t", fmt.Sprintf("%d", containerPid), "-n", socatPath, "-", fmt.Sprintf("TCP4:localhost:%d", port)}
|
||||
|
||||
nsenterPath, lookupErr := exec.LookPath("nsenter")
|
||||
if lookupErr != nil {
|
||||
return fmt.Errorf("unable to do port forwarding: nsenter not found.")
|
||||
return fmt.Errorf("unable to do port forwarding: nsenter not found")
|
||||
}
|
||||
|
||||
commandString := fmt.Sprintf("%s %s", nsenterPath, strings.Join(args, " "))
|
||||
|
@ -75,7 +75,7 @@ func getDockerClient(dockerEndpoint string) (*dockerapi.Client, error) {
|
||||
klog.Infof("Connecting to docker on %s", dockerEndpoint)
|
||||
return dockerapi.NewClient(dockerEndpoint, "", nil, nil)
|
||||
}
|
||||
return dockerapi.NewEnvClient()
|
||||
return dockerapi.NewClientWithOpts(dockerapi.FromEnv)
|
||||
}
|
||||
|
||||
// ConnectToDockerOrDie creates docker client connecting to docker daemon.
|
||||
|
@ -52,6 +52,7 @@ go_test(
|
||||
"//staging/src/k8s.io/client-go/util/testing:go_default_library",
|
||||
"//vendor/github.com/containernetworking/cni/pkg/types/020:go_default_library",
|
||||
"//vendor/github.com/stretchr/testify/mock:go_default_library",
|
||||
"//vendor/github.com/stretchr/testify/require:go_default_library",
|
||||
"//vendor/k8s.io/utils/exec:go_default_library",
|
||||
"//vendor/k8s.io/utils/exec/testing:go_default_library",
|
||||
],
|
||||
|
@ -66,7 +66,7 @@ func (plugin *cniNetworkPlugin) GetPodNetworkStatus(namespace string, name strin
|
||||
return nil, fmt.Errorf("CNI failed to retrieve network namespace path: %v", err)
|
||||
}
|
||||
if netnsPath == "" {
|
||||
return nil, fmt.Errorf("Cannot find the network namespace, skipping pod network status for container %q", id)
|
||||
return nil, fmt.Errorf("cannot find the network namespace, skipping pod network status for container %q", id)
|
||||
}
|
||||
|
||||
ips, err := network.GetPodIPs(plugin.execer, plugin.nsenterPath, netnsPath, network.DefaultInterfaceName)
|
||||
|
@ -34,6 +34,7 @@ import (
|
||||
|
||||
types020 "github.com/containernetworking/cni/pkg/types/020"
|
||||
"github.com/stretchr/testify/mock"
|
||||
"github.com/stretchr/testify/require"
|
||||
"k8s.io/api/core/v1"
|
||||
clientset "k8s.io/client-go/kubernetes"
|
||||
utiltesting "k8s.io/client-go/util/testing"
|
||||
@ -73,6 +74,7 @@ func installPluginUnderTest(t *testing.T, testBinDir, testConfDir, testDataDir,
|
||||
|
||||
pluginExec := path.Join(testBinDir, binName)
|
||||
f, err = os.Create(pluginExec)
|
||||
require.NoError(t, err)
|
||||
|
||||
const execScriptTempl = `#!/usr/bin/env bash
|
||||
cat > {{.InputFile}}
|
||||
@ -329,6 +331,7 @@ func TestCNIPlugin(t *testing.T) {
|
||||
t.Errorf("Expected nil: %v", err)
|
||||
}
|
||||
output, err = ioutil.ReadFile(outputFile)
|
||||
require.NoError(t, err)
|
||||
expectedOutput = "DEL /proc/12345/ns/net podNamespace podName test_infra_container"
|
||||
if string(output) != expectedOutput {
|
||||
t.Errorf("Mismatch in expected output for setup hook. Expected '%s', got '%s'", expectedOutput, string(output))
|
||||
|
@ -37,7 +37,7 @@ const (
|
||||
)
|
||||
|
||||
var (
|
||||
ethtoolOutputRegex = regexp.MustCompile("peer_ifindex: (\\d+)")
|
||||
ethtoolOutputRegex = regexp.MustCompile(`peer_ifindex: (\d+)`)
|
||||
)
|
||||
|
||||
func findPairInterfaceOfContainerInterface(e exec.Interface, containerInterfaceName, containerDesc string, nsenterArgs []string) (string, error) {
|
||||
@ -53,12 +53,12 @@ func findPairInterfaceOfContainerInterface(e exec.Interface, containerInterfaceN
|
||||
nsenterArgs = append(nsenterArgs, "-F", "--", ethtoolPath, "--statistics", containerInterfaceName)
|
||||
output, err := e.Command(nsenterPath, nsenterArgs...).CombinedOutput()
|
||||
if err != nil {
|
||||
return "", fmt.Errorf("Unable to query interface %s of container %s: %v: %s", containerInterfaceName, containerDesc, err, string(output))
|
||||
return "", fmt.Errorf("unable to query interface %s of container %s: %v: %s", containerInterfaceName, containerDesc, err, string(output))
|
||||
}
|
||||
// look for peer_ifindex
|
||||
match := ethtoolOutputRegex.FindSubmatch(output)
|
||||
if match == nil {
|
||||
return "", fmt.Errorf("No peer_ifindex in interface statistics for %s of container %s", containerInterfaceName, containerDesc)
|
||||
return "", fmt.Errorf("no peer_ifindex in interface statistics for %s of container %s", containerInterfaceName, containerDesc)
|
||||
}
|
||||
peerIfIndex, err := strconv.Atoi(string(match[1]))
|
||||
if err != nil { // seems impossible (\d+ not numeric)
|
||||
|
@ -43,7 +43,7 @@ type fakeIPTables struct {
|
||||
|
||||
func NewFakeIPTables() *fakeIPTables {
|
||||
return &fakeIPTables{
|
||||
tables: make(map[string]*fakeTable, 0),
|
||||
tables: make(map[string]*fakeTable),
|
||||
builtinChains: map[string]sets.String{
|
||||
string(utiliptables.TableFilter): sets.NewString("INPUT", "FORWARD", "OUTPUT"),
|
||||
string(utiliptables.TableNAT): sets.NewString("PREROUTING", "INPUT", "OUTPUT", "POSTROUTING"),
|
||||
@ -55,7 +55,7 @@ func NewFakeIPTables() *fakeIPTables {
|
||||
func (f *fakeIPTables) getTable(tableName utiliptables.Table) (*fakeTable, error) {
|
||||
table, ok := f.tables[string(tableName)]
|
||||
if !ok {
|
||||
return nil, fmt.Errorf("Table %s does not exist", tableName)
|
||||
return nil, fmt.Errorf("table %s does not exist", tableName)
|
||||
}
|
||||
return table, nil
|
||||
}
|
||||
@ -68,7 +68,7 @@ func (f *fakeIPTables) getChain(tableName utiliptables.Table, chainName utilipta
|
||||
|
||||
chain, ok := table.chains[string(chainName)]
|
||||
if !ok {
|
||||
return table, nil, fmt.Errorf("Chain %s/%s does not exist", tableName, chainName)
|
||||
return table, nil, fmt.Errorf("chain %s/%s does not exist", tableName, chainName)
|
||||
}
|
||||
|
||||
return table, chain, nil
|
||||
@ -148,7 +148,7 @@ func (f *fakeIPTables) ensureRule(position utiliptables.RulePosition, tableName
|
||||
} else if position == utiliptables.Append {
|
||||
chain.rules = append(chain.rules, rule)
|
||||
} else {
|
||||
return false, fmt.Errorf("Unknown position argument %q", position)
|
||||
return false, fmt.Errorf("unknown position argument %q", position)
|
||||
}
|
||||
|
||||
return false, nil
|
||||
@ -167,7 +167,7 @@ func normalizeRule(rule string) (string, error) {
|
||||
if remaining[0] == '"' {
|
||||
end = strings.Index(remaining[1:], "\"")
|
||||
if end < 0 {
|
||||
return "", fmt.Errorf("Invalid rule syntax: mismatched quotes")
|
||||
return "", fmt.Errorf("invalid rule syntax: mismatched quotes")
|
||||
}
|
||||
end += 2
|
||||
} else {
|
||||
@ -199,7 +199,7 @@ func (f *fakeIPTables) EnsureRule(position utiliptables.RulePosition, tableName
|
||||
ruleArgs := make([]string, 0)
|
||||
for _, arg := range args {
|
||||
// quote args with internal spaces (like comments)
|
||||
if strings.Index(arg, " ") >= 0 {
|
||||
if strings.Contains(arg, " ") {
|
||||
arg = fmt.Sprintf("\"%s\"", arg)
|
||||
}
|
||||
ruleArgs = append(ruleArgs, arg)
|
||||
@ -288,7 +288,7 @@ func (f *fakeIPTables) restore(restoreTableName utiliptables.Table, data []byte,
|
||||
} else if strings.HasPrefix(line, "-A") {
|
||||
parts := strings.Split(line, " ")
|
||||
if len(parts) < 3 {
|
||||
return fmt.Errorf("Invalid iptables rule '%s'", line)
|
||||
return fmt.Errorf("invalid iptables rule '%s'", line)
|
||||
}
|
||||
chainName := utiliptables.Chain(parts[1])
|
||||
rule := strings.TrimPrefix(line, fmt.Sprintf("-A %s ", chainName))
|
||||
@ -299,7 +299,7 @@ func (f *fakeIPTables) restore(restoreTableName utiliptables.Table, data []byte,
|
||||
} else if strings.HasPrefix(line, "-I") {
|
||||
parts := strings.Split(line, " ")
|
||||
if len(parts) < 3 {
|
||||
return fmt.Errorf("Invalid iptables rule '%s'", line)
|
||||
return fmt.Errorf("invalid iptables rule '%s'", line)
|
||||
}
|
||||
chainName := utiliptables.Chain(parts[1])
|
||||
rule := strings.TrimPrefix(line, fmt.Sprintf("-I %s ", chainName))
|
||||
@ -310,7 +310,7 @@ func (f *fakeIPTables) restore(restoreTableName utiliptables.Table, data []byte,
|
||||
} else if strings.HasPrefix(line, "-X") {
|
||||
parts := strings.Split(line, " ")
|
||||
if len(parts) < 2 {
|
||||
return fmt.Errorf("Invalid iptables rule '%s'", line)
|
||||
return fmt.Errorf("invalid iptables rule '%s'", line)
|
||||
}
|
||||
if err := f.DeleteChain(tableName, utiliptables.Chain(parts[1])); err != nil {
|
||||
return err
|
||||
|
@ -114,7 +114,7 @@ func ensureKubeHostportChains(iptables utiliptables.Interface, natInterfaceName
|
||||
klog.V(4).Info("Ensuring kubelet hostport chains")
|
||||
// Ensure kubeHostportChain
|
||||
if _, err := iptables.EnsureChain(utiliptables.TableNAT, kubeHostportsChain); err != nil {
|
||||
return fmt.Errorf("Failed to ensure that %s chain %s exists: %v", utiliptables.TableNAT, kubeHostportsChain, err)
|
||||
return fmt.Errorf("failed to ensure that %s chain %s exists: %v", utiliptables.TableNAT, kubeHostportsChain, err)
|
||||
}
|
||||
tableChainsNeedJumpServices := []struct {
|
||||
table utiliptables.Table
|
||||
@ -131,14 +131,14 @@ func ensureKubeHostportChains(iptables utiliptables.Interface, natInterfaceName
|
||||
// This ensures KUBE-SERVICES chain gets processed first.
|
||||
// Since rules in KUBE-HOSTPORTS chain matches broader cases, allow the more specific rules to be processed first.
|
||||
if _, err := iptables.EnsureRule(utiliptables.Append, tc.table, tc.chain, args...); err != nil {
|
||||
return fmt.Errorf("Failed to ensure that %s chain %s jumps to %s: %v", tc.table, tc.chain, kubeHostportsChain, err)
|
||||
return fmt.Errorf("failed to ensure that %s chain %s jumps to %s: %v", tc.table, tc.chain, kubeHostportsChain, err)
|
||||
}
|
||||
}
|
||||
if natInterfaceName != "" && natInterfaceName != "lo" {
|
||||
// Need to SNAT traffic from localhost
|
||||
args = []string{"-m", "comment", "--comment", "SNAT for localhost access to hostports", "-o", natInterfaceName, "-s", "127.0.0.0/8", "-j", "MASQUERADE"}
|
||||
if _, err := iptables.EnsureRule(utiliptables.Append, utiliptables.TableNAT, utiliptables.ChainPostrouting, args...); err != nil {
|
||||
return fmt.Errorf("Failed to ensure that %s chain %s jumps to MASQUERADE: %v", utiliptables.TableNAT, utiliptables.ChainPostrouting, err)
|
||||
return fmt.Errorf("failed to ensure that %s chain %s jumps to MASQUERADE: %v", utiliptables.TableNAT, utiliptables.ChainPostrouting, err)
|
||||
}
|
||||
}
|
||||
return nil
|
||||
|
@ -249,7 +249,7 @@ func (hm *hostportManager) syncIPTables(lines []byte) error {
|
||||
klog.V(3).Infof("Restoring iptables rules: %s", lines)
|
||||
err := hm.iptables.RestoreAll(lines, utiliptables.NoFlushTables, utiliptables.RestoreCounters)
|
||||
if err != nil {
|
||||
return fmt.Errorf("Failed to execute iptables-restore: %v", err)
|
||||
return fmt.Errorf("failed to execute iptables-restore: %v", err)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
@ -351,7 +351,7 @@ func getExistingHostportIPTablesRules(iptables utiliptables.Interface) (map[util
|
||||
}
|
||||
}
|
||||
|
||||
for _, line := range strings.Split(string(iptablesData.Bytes()), "\n") {
|
||||
for _, line := range strings.Split(iptablesData.String(), "\n") {
|
||||
if strings.HasPrefix(line, fmt.Sprintf("-A %s", kubeHostportChainPrefix)) ||
|
||||
strings.HasPrefix(line, fmt.Sprintf("-A %s", string(kubeHostportsChain))) {
|
||||
existingHostportRules = append(existingHostportRules, line)
|
||||
@ -382,8 +382,6 @@ func filterRules(rules []string, filters []utiliptables.Chain) []string {
|
||||
// filterChains deletes all entries of filter chains from chain map
|
||||
func filterChains(chains map[utiliptables.Chain]string, filterChains []utiliptables.Chain) {
|
||||
for _, chain := range filterChains {
|
||||
if _, ok := chains[chain]; ok {
|
||||
delete(chains, chain)
|
||||
}
|
||||
delete(chains, chain)
|
||||
}
|
||||
}
|
||||
|
@ -284,7 +284,7 @@ func TestHostportManager(t *testing.T) {
|
||||
err := iptables.SaveInto(utiliptables.TableNAT, raw)
|
||||
assert.NoError(t, err)
|
||||
|
||||
lines := strings.Split(string(raw.Bytes()), "\n")
|
||||
lines := strings.Split(raw.String(), "\n")
|
||||
expectedLines := map[string]bool{
|
||||
`*nat`: true,
|
||||
`:KUBE-HOSTPORTS - [0:0]`: true,
|
||||
@ -331,7 +331,7 @@ func TestHostportManager(t *testing.T) {
|
||||
raw.Reset()
|
||||
err = iptables.SaveInto(utiliptables.TableNAT, raw)
|
||||
assert.NoError(t, err)
|
||||
lines = strings.Split(string(raw.Bytes()), "\n")
|
||||
lines = strings.Split(raw.String(), "\n")
|
||||
remainingChains := make(map[string]bool)
|
||||
for _, line := range lines {
|
||||
if strings.HasPrefix(line, ":") {
|
||||
|
@ -123,7 +123,7 @@ func gatherAllHostports(activePodPortMappings []*PodPortMapping) (map[*PortMappi
|
||||
podHostportMap := make(map[*PortMapping]targetPod)
|
||||
for _, pm := range activePodPortMappings {
|
||||
if pm.IP.To4() == nil {
|
||||
return nil, fmt.Errorf("Invalid or missing pod %s IP", getPodFullName(pm))
|
||||
return nil, fmt.Errorf("invalid or missing pod %s IP", getPodFullName(pm))
|
||||
}
|
||||
// should not handle hostports for hostnetwork pods
|
||||
if pm.HostNetwork {
|
||||
@ -286,7 +286,7 @@ func (h *hostportSyncer) SyncHostports(natInterfaceName string, activePodPortMap
|
||||
klog.V(3).Infof("Restoring iptables rules: %s", natLines)
|
||||
err = h.iptables.RestoreAll(natLines, utiliptables.NoFlushTables, utiliptables.RestoreCounters)
|
||||
if err != nil {
|
||||
return fmt.Errorf("Failed to execute iptables-restore: %v", err)
|
||||
return fmt.Errorf("failed to execute iptables-restore: %v", err)
|
||||
}
|
||||
|
||||
h.cleanupHostportMap(hostportPodMap)
|
||||
|
@ -35,7 +35,7 @@ func (h *fakeSyncer) OpenPodHostportsAndSync(newPortMapping *hostport.PodPortMap
|
||||
func (h *fakeSyncer) SyncHostports(natInterfaceName string, activePortMapping []*hostport.PodPortMapping) error {
|
||||
for _, r := range activePortMapping {
|
||||
if r.IP.To4() == nil {
|
||||
return fmt.Errorf("Invalid or missing pod %s/%s IP", r.Namespace, r.Name)
|
||||
return fmt.Errorf("invalid or missing pod %s/%s IP", r.Namespace, r.Name)
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -64,10 +64,6 @@ const (
|
||||
// ebtables Chain to store dedup rules
|
||||
dedupChain = utilebtables.Chain("KUBE-DEDUP")
|
||||
|
||||
// defaultIPAMDir is the default location for the checkpoint files stored by host-local ipam
|
||||
// https://github.com/containernetworking/cni/tree/master/plugins/ipam/host-local#backends
|
||||
defaultIPAMDir = "/var/lib/cni/networks"
|
||||
|
||||
zeroCIDRv6 = "::/0"
|
||||
zeroCIDRv4 = "0.0.0.0/0"
|
||||
|
||||
@ -184,12 +180,12 @@ func (plugin *kubenetNetworkPlugin) Init(host network.Host, hairpinMode kubeletc
|
||||
"type": "loopback"
|
||||
}`))
|
||||
if err != nil {
|
||||
return fmt.Errorf("Failed to generate loopback config: %v", err)
|
||||
return fmt.Errorf("failed to generate loopback config: %v", err)
|
||||
}
|
||||
|
||||
plugin.nsenterPath, err = plugin.execer.LookPath("nsenter")
|
||||
if err != nil {
|
||||
return fmt.Errorf("Failed to find nsenter binary: %v", err)
|
||||
return fmt.Errorf("failed to find nsenter binary: %v", err)
|
||||
}
|
||||
|
||||
// Need to SNAT outbound traffic from cluster
|
||||
@ -213,7 +209,7 @@ func (plugin *kubenetNetworkPlugin) ensureMasqRule() error {
|
||||
"-m", "addrtype", "!", "--dst-type", "LOCAL",
|
||||
"!", "-d", plugin.nonMasqueradeCIDR,
|
||||
"-j", "MASQUERADE"); err != nil {
|
||||
return fmt.Errorf("Failed to ensure that %s chain %s jumps to MASQUERADE: %v", utiliptables.TableNAT, utiliptables.ChainPostrouting, err)
|
||||
return fmt.Errorf("failed to ensure that %s chain %s jumps to MASQUERADE: %v", utiliptables.TableNAT, utiliptables.ChainPostrouting, err)
|
||||
}
|
||||
}
|
||||
return nil
|
||||
@ -385,7 +381,7 @@ func (plugin *kubenetNetworkPlugin) setup(namespace string, name string, id kube
|
||||
// promiscuous mode is not on, then turn it on.
|
||||
err := netlink.SetPromiscOn(link)
|
||||
if err != nil {
|
||||
return fmt.Errorf("Error setting promiscuous mode on %s: %v", BridgeName, err)
|
||||
return fmt.Errorf("error setting promiscuous mode on %s: %v", BridgeName, err)
|
||||
}
|
||||
}
|
||||
|
||||
@ -414,7 +410,7 @@ func (plugin *kubenetNetworkPlugin) addTrafficShaping(id kubecontainer.Container
|
||||
shaper := plugin.shaper()
|
||||
ingress, egress, err := bandwidth.ExtractPodBandwidthResources(annotations)
|
||||
if err != nil {
|
||||
return fmt.Errorf("Error reading pod bandwidth annotations: %v", err)
|
||||
return fmt.Errorf("error reading pod bandwidth annotations: %v", err)
|
||||
}
|
||||
iplist, exists := plugin.getCachedPodIPs(id)
|
||||
if !exists {
|
||||
@ -432,7 +428,7 @@ func (plugin *kubenetNetworkPlugin) addTrafficShaping(id kubecontainer.Container
|
||||
}
|
||||
|
||||
if err := shaper.ReconcileCIDR(fmt.Sprintf("%v/%v", ip, mask), egress, ingress); err != nil {
|
||||
return fmt.Errorf("Failed to add pod to shaper: %v", err)
|
||||
return fmt.Errorf("failed to add pod to shaper: %v", err)
|
||||
}
|
||||
}
|
||||
}
|
||||
@ -475,7 +471,7 @@ func (plugin *kubenetNetworkPlugin) SetUpPod(namespace string, name string, id k
|
||||
start := time.Now()
|
||||
|
||||
if err := plugin.Status(); err != nil {
|
||||
return fmt.Errorf("Kubenet cannot SetUpPod: %v", err)
|
||||
return fmt.Errorf("kubenet cannot SetUpPod: %v", err)
|
||||
}
|
||||
|
||||
defer func() {
|
||||
@ -511,7 +507,7 @@ func (plugin *kubenetNetworkPlugin) teardown(namespace string, name string, id k
|
||||
portMappings, err := plugin.host.GetPodPortMappings(id.ID)
|
||||
if err != nil {
|
||||
errList = append(errList, err)
|
||||
} else if portMappings != nil && len(portMappings) > 0 {
|
||||
} else if len(portMappings) > 0 {
|
||||
if err = plugin.hostportManager.Remove(id.ID, &hostport.PodPortMapping{
|
||||
Namespace: namespace,
|
||||
Name: name,
|
||||
@ -554,7 +550,7 @@ func (plugin *kubenetNetworkPlugin) TearDownPod(namespace string, name string, i
|
||||
}()
|
||||
|
||||
if plugin.netConfig == nil {
|
||||
return fmt.Errorf("Kubenet needs a PodCIDR to tear down pods")
|
||||
return fmt.Errorf("kubenet needs a PodCIDR to tear down pods")
|
||||
}
|
||||
|
||||
if err := plugin.teardown(namespace, name, id); err != nil {
|
||||
@ -580,10 +576,10 @@ func (plugin *kubenetNetworkPlugin) GetPodNetworkStatus(namespace string, name s
|
||||
// not a cached version, get via network ns
|
||||
netnsPath, err := plugin.host.GetNetNS(id.ID)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("Kubenet failed to retrieve network namespace path: %v", err)
|
||||
return nil, fmt.Errorf("kubenet failed to retrieve network namespace path: %v", err)
|
||||
}
|
||||
if netnsPath == "" {
|
||||
return nil, fmt.Errorf("Cannot find the network namespace, skipping pod network status for container %q", id)
|
||||
return nil, fmt.Errorf("cannot find the network namespace, skipping pod network status for container %q", id)
|
||||
}
|
||||
ips, err := network.GetPodIPs(plugin.execer, plugin.nsenterPath, netnsPath, network.DefaultInterfaceName)
|
||||
if err != nil {
|
||||
@ -630,7 +626,7 @@ func (plugin *kubenetNetworkPlugin) getNetworkStatus(id kubecontainer.ContainerI
|
||||
func (plugin *kubenetNetworkPlugin) Status() error {
|
||||
// Can't set up pods if we don't have a PodCIDR yet
|
||||
if plugin.netConfig == nil {
|
||||
return fmt.Errorf("Kubenet does not have netConfig. This is most likely due to lack of PodCIDR")
|
||||
return fmt.Errorf("kubenet does not have netConfig. This is most likely due to lack of PodCIDR")
|
||||
}
|
||||
|
||||
if !plugin.checkRequiredCNIPlugins() {
|
||||
@ -687,14 +683,14 @@ func (plugin *kubenetNetworkPlugin) buildCNIRuntimeConf(ifName string, id kubeco
|
||||
func (plugin *kubenetNetworkPlugin) addContainerToNetwork(config *libcni.NetworkConfig, ifName, namespace, name string, id kubecontainer.ContainerID) (cnitypes.Result, error) {
|
||||
rt, err := plugin.buildCNIRuntimeConf(ifName, id, true)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("Error building CNI config: %v", err)
|
||||
return nil, fmt.Errorf("error building CNI config: %v", err)
|
||||
}
|
||||
|
||||
klog.V(3).Infof("Adding %s/%s to '%s' with CNI '%s' plugin and runtime: %+v", namespace, name, config.Network.Name, config.Network.Type, rt)
|
||||
|
||||
res, err := plugin.cniConfig.AddNetwork(context.TODO(), config, rt)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("Error adding container to network: %v", err)
|
||||
return nil, fmt.Errorf("error adding container to network: %v", err)
|
||||
}
|
||||
return res, nil
|
||||
}
|
||||
@ -702,7 +698,7 @@ func (plugin *kubenetNetworkPlugin) addContainerToNetwork(config *libcni.Network
|
||||
func (plugin *kubenetNetworkPlugin) delContainerFromNetwork(config *libcni.NetworkConfig, ifName, namespace, name string, id kubecontainer.ContainerID) error {
|
||||
rt, err := plugin.buildCNIRuntimeConf(ifName, id, false)
|
||||
if err != nil {
|
||||
return fmt.Errorf("Error building CNI config: %v", err)
|
||||
return fmt.Errorf("error building CNI config: %v", err)
|
||||
}
|
||||
|
||||
klog.V(3).Infof("Removing %s/%s from '%s' with CNI '%s' plugin and runtime: %+v", namespace, name, config.Network.Name, config.Network.Type, rt)
|
||||
@ -710,7 +706,7 @@ func (plugin *kubenetNetworkPlugin) delContainerFromNetwork(config *libcni.Netwo
|
||||
// The pod may not get deleted successfully at the first time.
|
||||
// Ignore "no such file or directory" error in case the network has already been deleted in previous attempts.
|
||||
if err != nil && !strings.Contains(err.Error(), "no such file or directory") {
|
||||
return fmt.Errorf("Error removing container from network: %v", err)
|
||||
return fmt.Errorf("error removing container from network: %v", err)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
@ -795,20 +791,20 @@ func (plugin *kubenetNetworkPlugin) disableContainerDAD(id kubecontainer.Contain
|
||||
|
||||
sysctlBin, err := plugin.execer.LookPath("sysctl")
|
||||
if err != nil {
|
||||
return fmt.Errorf("Could not find sysctl binary: %s", err)
|
||||
return fmt.Errorf("could not find sysctl binary: %s", err)
|
||||
}
|
||||
|
||||
netnsPath, err := plugin.host.GetNetNS(id.ID)
|
||||
if err != nil {
|
||||
return fmt.Errorf("Failed to get netns: %v", err)
|
||||
return fmt.Errorf("failed to get netns: %v", err)
|
||||
}
|
||||
if netnsPath == "" {
|
||||
return fmt.Errorf("Pod has no network namespace")
|
||||
return fmt.Errorf("pod has no network namespace")
|
||||
}
|
||||
|
||||
// If the sysctl doesn't exist, it means ipv6 is disabled; log and move on
|
||||
if _, err := plugin.sysctl.GetSysctl(key); err != nil {
|
||||
return fmt.Errorf("Ipv6 not enabled: %v", err)
|
||||
return fmt.Errorf("ipv6 not enabled: %v", err)
|
||||
}
|
||||
|
||||
output, err := plugin.execer.Command(plugin.nsenterPath,
|
||||
@ -816,7 +812,7 @@ func (plugin *kubenetNetworkPlugin) disableContainerDAD(id kubecontainer.Contain
|
||||
sysctlBin, "-w", fmt.Sprintf("%s=%s", key, "0"),
|
||||
).CombinedOutput()
|
||||
if err != nil {
|
||||
return fmt.Errorf("Failed to write sysctl: output: %s error: %s",
|
||||
return fmt.Errorf("failed to write sysctl: output: %s error: %s",
|
||||
output, err)
|
||||
}
|
||||
return nil
|
||||
|
@ -161,12 +161,12 @@ func InitNetworkPlugin(plugins []NetworkPlugin, networkPluginName string, host H
|
||||
if chosenPlugin != nil {
|
||||
err := chosenPlugin.Init(host, hairpinMode, nonMasqueradeCIDR, mtu)
|
||||
if err != nil {
|
||||
allErrs = append(allErrs, fmt.Errorf("Network plugin %q failed init: %v", networkPluginName, err))
|
||||
allErrs = append(allErrs, fmt.Errorf("network plugin %q failed init: %v", networkPluginName, err))
|
||||
} else {
|
||||
klog.V(1).Infof("Loaded network plugin %q", networkPluginName)
|
||||
}
|
||||
} else {
|
||||
allErrs = append(allErrs, fmt.Errorf("Network plugin %q not found.", networkPluginName))
|
||||
allErrs = append(allErrs, fmt.Errorf("network plugin %q not found", networkPluginName))
|
||||
}
|
||||
|
||||
return chosenPlugin, utilerrors.NewAggregate(allErrs)
|
||||
@ -235,16 +235,16 @@ func getOnePodIP(execer utilexec.Interface, nsenterPath, netnsPath, interfaceNam
|
||||
output, err := execer.Command(nsenterPath, fmt.Sprintf("--net=%s", netnsPath), "-F", "--",
|
||||
"ip", "-o", addrType, "addr", "show", "dev", interfaceName, "scope", "global").CombinedOutput()
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("Unexpected command output %s with error: %v", output, err)
|
||||
return nil, fmt.Errorf("unexpected command output %s with error: %v", output, err)
|
||||
}
|
||||
|
||||
lines := strings.Split(string(output), "\n")
|
||||
if len(lines) < 1 {
|
||||
return nil, fmt.Errorf("Unexpected command output %s", output)
|
||||
return nil, fmt.Errorf("unexpected command output %s", output)
|
||||
}
|
||||
fields := strings.Fields(lines[0])
|
||||
if len(fields) < 4 {
|
||||
return nil, fmt.Errorf("Unexpected address output %s ", lines[0])
|
||||
return nil, fmt.Errorf("unexpected address output %s ", lines[0])
|
||||
}
|
||||
ip, _, err := net.ParseCIDR(fields[3])
|
||||
if err != nil {
|
||||
@ -390,7 +390,7 @@ func (pm *PluginManager) GetPodNetworkStatus(podNamespace, podName string, id ku
|
||||
|
||||
netStatus, err := pm.plugin.GetPodNetworkStatus(podNamespace, podName, id)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("NetworkPlugin %s failed on the status hook for pod %q: %v", pm.plugin.Name(), fullPodName, err)
|
||||
return nil, fmt.Errorf("networkPlugin %s failed on the status hook for pod %q: %v", pm.plugin.Name(), fullPodName, err)
|
||||
}
|
||||
|
||||
return netStatus, nil
|
||||
@ -404,7 +404,7 @@ func (pm *PluginManager) SetUpPod(podNamespace, podName string, id kubecontainer
|
||||
|
||||
klog.V(3).Infof("Calling network plugin %s to set up pod %q", pm.plugin.Name(), fullPodName)
|
||||
if err := pm.plugin.SetUpPod(podNamespace, podName, id, annotations, options); err != nil {
|
||||
return fmt.Errorf("NetworkPlugin %s failed to set up pod %q network: %v", pm.plugin.Name(), fullPodName, err)
|
||||
return fmt.Errorf("networkPlugin %s failed to set up pod %q network: %v", pm.plugin.Name(), fullPodName, err)
|
||||
}
|
||||
|
||||
return nil
|
||||
@ -418,7 +418,7 @@ func (pm *PluginManager) TearDownPod(podNamespace, podName string, id kubecontai
|
||||
|
||||
klog.V(3).Infof("Calling network plugin %s to tear down pod %q", pm.plugin.Name(), fullPodName)
|
||||
if err := pm.plugin.TearDownPod(podNamespace, podName, id); err != nil {
|
||||
return fmt.Errorf("NetworkPlugin %s failed to teardown pod %q network: %v", pm.plugin.Name(), fullPodName, err)
|
||||
return fmt.Errorf("networkPlugin %s failed to teardown pod %q network: %v", pm.plugin.Name(), fullPodName, err)
|
||||
}
|
||||
|
||||
return nil
|
||||
|
@ -90,7 +90,7 @@ func modifyContainerConfig(sc *runtimeapi.LinuxContainerSecurityContext, config
|
||||
user := config.User
|
||||
if sc.RunAsGroup != nil {
|
||||
if user == "" {
|
||||
return fmt.Errorf("runAsGroup is specified without a runAsUser.")
|
||||
return fmt.Errorf("runAsGroup is specified without a runAsUser")
|
||||
}
|
||||
user = fmt.Sprintf("%s:%d", config.User, sc.GetRunAsGroup().Value)
|
||||
}
|
||||
|
@ -497,7 +497,7 @@ func (m *managerImpl) podEphemeralStorageLimitEviction(podStats statsapi.PodStat
|
||||
}
|
||||
|
||||
podEphemeralStorageTotalUsage := &resource.Quantity{}
|
||||
fsStatsSet := []fsStatsType{}
|
||||
var fsStatsSet []fsStatsType
|
||||
if *m.dedicatedImageFs {
|
||||
fsStatsSet = []fsStatsType{fsStatsLogs, fsStatsLocalVolumeSource}
|
||||
} else {
|
||||
|
@ -135,8 +135,7 @@ func TestUpdateThreshold(t *testing.T) {
|
||||
notifier := &MockCgroupNotifier{}
|
||||
m := newTestMemoryThresholdNotifier(tc.evictionThreshold, notifierFactory, nil)
|
||||
notifierFactory.On("NewCgroupNotifier", testCgroupPath, memoryUsageAttribute, tc.expectedThreshold.Value()).Return(notifier, tc.updateThresholdErr)
|
||||
var events chan<- struct{}
|
||||
events = m.events
|
||||
var events chan<- struct{} = m.events
|
||||
notifier.On("Start", events).Return()
|
||||
err := m.UpdateThreshold(nodeSummary(tc.available, tc.workingSet, tc.usage, isAllocatableEvictionThreshold(tc.evictionThreshold)))
|
||||
if err != nil && !tc.expectErr {
|
||||
@ -169,8 +168,7 @@ func TestStart(t *testing.T) {
|
||||
wg.Done()
|
||||
})
|
||||
notifierFactory.On("NewCgroupNotifier", testCgroupPath, memoryUsageAttribute, int64(0)).Return(notifier, nil)
|
||||
var events chan<- struct{}
|
||||
events = m.events
|
||||
var events chan<- struct{} = m.events
|
||||
notifier.On("Start", events).Return()
|
||||
notifier.On("Stop").Return()
|
||||
|
||||
|
@ -159,11 +159,6 @@ func (kl *Kubelet) getPodResourcesDir() string {
|
||||
return filepath.Join(kl.getRootDir(), config.DefaultKubeletPodResourcesDirName)
|
||||
}
|
||||
|
||||
// getPluginsDirSELinuxLabel returns the selinux label to be applied on plugin directories
|
||||
func (kl *Kubelet) getPluginsDirSELinuxLabel() string {
|
||||
return config.KubeletPluginsDirSELinuxLabel
|
||||
}
|
||||
|
||||
// GetPods returns all pods bound to the kubelet and their spec, and the mirror
|
||||
// pods.
|
||||
func (kl *Kubelet) GetPods() []*v1.Pod {
|
||||
@ -292,7 +287,7 @@ func (kl *Kubelet) getPodVolumePathListFromDisk(podUID types.UID) ([]string, err
|
||||
podVolDir := kl.getPodVolumesDir(podUID)
|
||||
|
||||
if pathExists, pathErr := mount.PathExists(podVolDir); pathErr != nil {
|
||||
return volumes, fmt.Errorf("Error checking if path %q exists: %v", podVolDir, pathErr)
|
||||
return volumes, fmt.Errorf("error checking if path %q exists: %v", podVolDir, pathErr)
|
||||
} else if !pathExists {
|
||||
klog.Warningf("Path %q does not exist", podVolDir)
|
||||
return volumes, nil
|
||||
@ -308,7 +303,7 @@ func (kl *Kubelet) getPodVolumePathListFromDisk(podUID types.UID) ([]string, err
|
||||
volumePluginPath := filepath.Join(podVolDir, volumePluginName)
|
||||
volumeDirs, err := utilpath.ReadDirNoStat(volumePluginPath)
|
||||
if err != nil {
|
||||
return volumes, fmt.Errorf("Could not read directory %s: %v", volumePluginPath, err)
|
||||
return volumes, fmt.Errorf("could not read directory %s: %v", volumePluginPath, err)
|
||||
}
|
||||
for _, volumeDir := range volumeDirs {
|
||||
volumes = append(volumes, filepath.Join(volumePluginPath, volumeDir))
|
||||
@ -341,7 +336,7 @@ func (kl *Kubelet) podVolumeSubpathsDirExists(podUID types.UID) (bool, error) {
|
||||
podVolDir := kl.getPodVolumeSubpathsDir(podUID)
|
||||
|
||||
if pathExists, pathErr := mount.PathExists(podVolDir); pathErr != nil {
|
||||
return true, fmt.Errorf("Error checking if path %q exists: %v", podVolDir, pathErr)
|
||||
return true, fmt.Errorf("error checking if path %q exists: %v", podVolDir, pathErr)
|
||||
} else if !pathExists {
|
||||
return false, nil
|
||||
}
|
||||
|
@ -594,7 +594,7 @@ func validateNodeIP(nodeIP net.IP) error {
|
||||
return nil
|
||||
}
|
||||
}
|
||||
return fmt.Errorf("Node IP: %q not found in the host's network interfaces", nodeIP.String())
|
||||
return fmt.Errorf("node IP: %q not found in the host's network interfaces", nodeIP.String())
|
||||
}
|
||||
|
||||
// nodeStatusHasChanged compares the original node and current node's status and
|
||||
|
@ -160,15 +160,6 @@ func (lcm *localCM) GetCapacity() v1.ResourceList {
|
||||
return lcm.capacity
|
||||
}
|
||||
|
||||
// sortableNodeAddress is a type for sorting []v1.NodeAddress
|
||||
type sortableNodeAddress []v1.NodeAddress
|
||||
|
||||
func (s sortableNodeAddress) Len() int { return len(s) }
|
||||
func (s sortableNodeAddress) Less(i, j int) bool {
|
||||
return (string(s[i].Type) + s[i].Address) < (string(s[j].Type) + s[j].Address)
|
||||
}
|
||||
func (s sortableNodeAddress) Swap(i, j int) { s[j], s[i] = s[i], s[j] }
|
||||
|
||||
func TestUpdateNewNodeStatus(t *testing.T) {
|
||||
cases := []struct {
|
||||
desc string
|
||||
@ -547,6 +538,7 @@ func TestUpdateExistingNodeStatusTimeout(t *testing.T) {
|
||||
kubelet := testKubelet.kubelet
|
||||
kubelet.kubeClient = nil // ensure only the heartbeat client is used
|
||||
kubelet.heartbeatClient, err = clientset.NewForConfig(config)
|
||||
require.NoError(t, err)
|
||||
kubelet.onRepeatedHeartbeatFailure = func() {
|
||||
atomic.AddInt64(&failureCallbacks, 1)
|
||||
}
|
||||
@ -988,7 +980,7 @@ func TestUpdateNodeStatusWithLease(t *testing.T) {
|
||||
|
||||
updatedNode, err = applyNodeStatusPatch(updatedNode, patchAction.GetPatch())
|
||||
require.NoError(t, err)
|
||||
memCapacity, _ := updatedNode.Status.Capacity[v1.ResourceMemory]
|
||||
memCapacity := updatedNode.Status.Capacity[v1.ResourceMemory]
|
||||
updatedMemoryCapacity, _ := (&memCapacity).AsInt64()
|
||||
assert.Equal(t, newMemoryCapacity, updatedMemoryCapacity, "Memory capacity")
|
||||
|
||||
@ -2009,8 +2001,6 @@ func TestRegisterWithApiServerWithTaint(t *testing.T) {
|
||||
utilfeature.DefaultFeatureGate.Enabled(features.TaintNodesByCondition),
|
||||
taintutil.TaintExists(got.Spec.Taints, unschedulableTaint),
|
||||
"test unschedulable taint for TaintNodesByCondition")
|
||||
|
||||
return
|
||||
})
|
||||
}
|
||||
|
||||
|
@ -403,7 +403,7 @@ func (kl *Kubelet) GeneratePodHostNameAndDomain(pod *v1.Pod) (string, string, er
|
||||
hostname := pod.Name
|
||||
if len(pod.Spec.Hostname) > 0 {
|
||||
if msgs := utilvalidation.IsDNS1123Label(pod.Spec.Hostname); len(msgs) != 0 {
|
||||
return "", "", fmt.Errorf("Pod Hostname %q is not a valid DNS label: %s", pod.Spec.Hostname, strings.Join(msgs, ";"))
|
||||
return "", "", fmt.Errorf("pod Hostname %q is not a valid DNS label: %s", pod.Spec.Hostname, strings.Join(msgs, ";"))
|
||||
}
|
||||
hostname = pod.Spec.Hostname
|
||||
}
|
||||
@ -416,7 +416,7 @@ func (kl *Kubelet) GeneratePodHostNameAndDomain(pod *v1.Pod) (string, string, er
|
||||
hostDomain := ""
|
||||
if len(pod.Spec.Subdomain) > 0 {
|
||||
if msgs := utilvalidation.IsDNS1123Label(pod.Spec.Subdomain); len(msgs) != 0 {
|
||||
return "", "", fmt.Errorf("Pod Subdomain %q is not a valid DNS label: %s", pod.Spec.Subdomain, strings.Join(msgs, ";"))
|
||||
return "", "", fmt.Errorf("pod Subdomain %q is not a valid DNS label: %s", pod.Spec.Subdomain, strings.Join(msgs, ";"))
|
||||
}
|
||||
hostDomain = fmt.Sprintf("%s.%s.svc.%s", pod.Spec.Subdomain, pod.Namespace, clusterDomain)
|
||||
}
|
||||
@ -581,7 +581,7 @@ func (kl *Kubelet) makeEnvironmentVariables(pod *v1.Pod, container *v1.Container
|
||||
configMap, ok := configMaps[name]
|
||||
if !ok {
|
||||
if kl.kubeClient == nil {
|
||||
return result, fmt.Errorf("Couldn't get configMap %v/%v, no kubeClient defined", pod.Namespace, name)
|
||||
return result, fmt.Errorf("couldn't get configMap %v/%v, no kubeClient defined", pod.Namespace, name)
|
||||
}
|
||||
optional := cm.Optional != nil && *cm.Optional
|
||||
configMap, err = kl.configMapManager.GetConfigMap(pod.Namespace, name)
|
||||
@ -616,7 +616,7 @@ func (kl *Kubelet) makeEnvironmentVariables(pod *v1.Pod, container *v1.Container
|
||||
secret, ok := secrets[name]
|
||||
if !ok {
|
||||
if kl.kubeClient == nil {
|
||||
return result, fmt.Errorf("Couldn't get secret %v/%v, no kubeClient defined", pod.Namespace, name)
|
||||
return result, fmt.Errorf("couldn't get secret %v/%v, no kubeClient defined", pod.Namespace, name)
|
||||
}
|
||||
optional := s.Optional != nil && *s.Optional
|
||||
secret, err = kl.secretManager.GetSecret(pod.Namespace, name)
|
||||
@ -690,7 +690,7 @@ func (kl *Kubelet) makeEnvironmentVariables(pod *v1.Pod, container *v1.Container
|
||||
configMap, ok := configMaps[name]
|
||||
if !ok {
|
||||
if kl.kubeClient == nil {
|
||||
return result, fmt.Errorf("Couldn't get configMap %v/%v, no kubeClient defined", pod.Namespace, name)
|
||||
return result, fmt.Errorf("couldn't get configMap %v/%v, no kubeClient defined", pod.Namespace, name)
|
||||
}
|
||||
configMap, err = kl.configMapManager.GetConfigMap(pod.Namespace, name)
|
||||
if err != nil {
|
||||
@ -707,7 +707,7 @@ func (kl *Kubelet) makeEnvironmentVariables(pod *v1.Pod, container *v1.Container
|
||||
if optional {
|
||||
continue
|
||||
}
|
||||
return result, fmt.Errorf("Couldn't find key %v in ConfigMap %v/%v", key, pod.Namespace, name)
|
||||
return result, fmt.Errorf("couldn't find key %v in ConfigMap %v/%v", key, pod.Namespace, name)
|
||||
}
|
||||
case envVar.ValueFrom.SecretKeyRef != nil:
|
||||
s := envVar.ValueFrom.SecretKeyRef
|
||||
@ -717,7 +717,7 @@ func (kl *Kubelet) makeEnvironmentVariables(pod *v1.Pod, container *v1.Container
|
||||
secret, ok := secrets[name]
|
||||
if !ok {
|
||||
if kl.kubeClient == nil {
|
||||
return result, fmt.Errorf("Couldn't get secret %v/%v, no kubeClient defined", pod.Namespace, name)
|
||||
return result, fmt.Errorf("couldn't get secret %v/%v, no kubeClient defined", pod.Namespace, name)
|
||||
}
|
||||
secret, err = kl.secretManager.GetSecret(pod.Namespace, name)
|
||||
if err != nil {
|
||||
@ -734,7 +734,7 @@ func (kl *Kubelet) makeEnvironmentVariables(pod *v1.Pod, container *v1.Container
|
||||
if optional {
|
||||
continue
|
||||
}
|
||||
return result, fmt.Errorf("Couldn't find key %v in Secret %v/%v", key, pod.Namespace, name)
|
||||
return result, fmt.Errorf("couldn't find key %v in Secret %v/%v", key, pod.Namespace, name)
|
||||
}
|
||||
runtimeVal = string(runtimeValBytes)
|
||||
}
|
||||
|
@ -123,22 +123,22 @@ func (kl *Kubelet) cleanupOrphanedPodDirs(pods []*v1.Pod, runningPods []*kubecon
|
||||
// If there are still volume directories, do not delete directory
|
||||
volumePaths, err := kl.getPodVolumePathListFromDisk(uid)
|
||||
if err != nil {
|
||||
orphanVolumeErrors = append(orphanVolumeErrors, fmt.Errorf("Orphaned pod %q found, but error %v occurred during reading volume dir from disk", uid, err))
|
||||
orphanVolumeErrors = append(orphanVolumeErrors, fmt.Errorf("orphaned pod %q found, but error %v occurred during reading volume dir from disk", uid, err))
|
||||
continue
|
||||
}
|
||||
if len(volumePaths) > 0 {
|
||||
orphanVolumeErrors = append(orphanVolumeErrors, fmt.Errorf("Orphaned pod %q found, but volume paths are still present on disk", uid))
|
||||
orphanVolumeErrors = append(orphanVolumeErrors, fmt.Errorf("orphaned pod %q found, but volume paths are still present on disk", uid))
|
||||
continue
|
||||
}
|
||||
|
||||
// If there are any volume-subpaths, do not cleanup directories
|
||||
volumeSubpathExists, err := kl.podVolumeSubpathsDirExists(uid)
|
||||
if err != nil {
|
||||
orphanVolumeErrors = append(orphanVolumeErrors, fmt.Errorf("Orphaned pod %q found, but error %v occurred during reading of volume-subpaths dir from disk", uid, err))
|
||||
orphanVolumeErrors = append(orphanVolumeErrors, fmt.Errorf("orphaned pod %q found, but error %v occurred during reading of volume-subpaths dir from disk", uid, err))
|
||||
continue
|
||||
}
|
||||
if volumeSubpathExists {
|
||||
orphanVolumeErrors = append(orphanVolumeErrors, fmt.Errorf("Orphaned pod %q found, but volume subpaths are still present on disk", uid))
|
||||
orphanVolumeErrors = append(orphanVolumeErrors, fmt.Errorf("orphaned pod %q found, but volume subpaths are still present on disk", uid))
|
||||
continue
|
||||
}
|
||||
|
||||
|
@ -22,8 +22,6 @@ import (
|
||||
apiv1 "k8s.io/api/core/v1"
|
||||
)
|
||||
|
||||
const configMapConfigKey = "kubelet"
|
||||
|
||||
// configMapPayload implements Payload, backed by a v1/ConfigMap config source object
|
||||
type configMapPayload struct {
|
||||
cm *apiv1.ConfigMap
|
||||
|
@ -213,7 +213,7 @@ func latestNodeConfigSource(store cache.Store, nodeName string) (*apiv1.NodeConf
|
||||
utillog.Errorf(err.Error())
|
||||
return nil, err
|
||||
} else if !ok {
|
||||
err := fmt.Errorf("Node %q does not exist in the informer's store, can't sync config source", nodeName)
|
||||
err := fmt.Errorf("node %q does not exist in the informer's store, can't sync config source", nodeName)
|
||||
utillog.Errorf(err.Error())
|
||||
return nil, err
|
||||
}
|
||||
|
@ -68,7 +68,7 @@ go_library(
|
||||
"//staging/src/k8s.io/cri-api/pkg/apis/runtime/v1alpha2:go_default_library",
|
||||
"//vendor/github.com/armon/circbuf:go_default_library",
|
||||
"//vendor/github.com/google/cadvisor/info/v1:go_default_library",
|
||||
"//vendor/google.golang.org/grpc:go_default_library",
|
||||
"//vendor/google.golang.org/grpc/status:go_default_library",
|
||||
"//vendor/k8s.io/klog:go_default_library",
|
||||
] + select({
|
||||
"@io_bazel_rules_go//go/platform:linux": [
|
||||
@ -122,6 +122,7 @@ go_test(
|
||||
"//vendor/github.com/golang/mock/gomock:go_default_library",
|
||||
"//vendor/github.com/google/cadvisor/info/v1:go_default_library",
|
||||
"//vendor/github.com/prometheus/client_golang/prometheus:go_default_library",
|
||||
"//vendor/github.com/prometheus/client_golang/prometheus/promhttp:go_default_library",
|
||||
"//vendor/github.com/stretchr/testify/assert:go_default_library",
|
||||
"//vendor/github.com/stretchr/testify/require:go_default_library",
|
||||
"//vendor/k8s.io/utils/pointer:go_default_library",
|
||||
|
@ -23,6 +23,7 @@ import (
|
||||
"time"
|
||||
|
||||
"github.com/prometheus/client_golang/prometheus"
|
||||
"github.com/prometheus/client_golang/prometheus/promhttp"
|
||||
"github.com/stretchr/testify/assert"
|
||||
runtimeapi "k8s.io/cri-api/pkg/apis/runtime/v1alpha2"
|
||||
"k8s.io/kubernetes/pkg/kubelet/metrics"
|
||||
@ -40,7 +41,7 @@ func TestRecordOperation(t *testing.T) {
|
||||
|
||||
prometheusURL := "http://" + temporalServer + "/metrics"
|
||||
mux := http.NewServeMux()
|
||||
mux.Handle("/metrics", prometheus.Handler())
|
||||
mux.Handle("/metrics", promhttp.Handler())
|
||||
server := &http.Server{
|
||||
Addr: temporalServer,
|
||||
Handler: mux,
|
||||
|
@ -30,7 +30,7 @@ import (
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"google.golang.org/grpc"
|
||||
grpcstatus "google.golang.org/grpc/status"
|
||||
|
||||
"github.com/armon/circbuf"
|
||||
"k8s.io/klog"
|
||||
@ -94,7 +94,8 @@ func (m *kubeGenericRuntimeManager) startContainer(podSandboxID string, podSandb
|
||||
// Step 1: pull the image.
|
||||
imageRef, msg, err := m.imagePuller.EnsureImageExists(pod, container, pullSecrets, podSandboxConfig)
|
||||
if err != nil {
|
||||
m.recordContainerEvent(pod, container, "", v1.EventTypeWarning, events.FailedToCreateContainer, "Error: %v", grpc.ErrorDesc(err))
|
||||
s, _ := grpcstatus.FromError(err)
|
||||
m.recordContainerEvent(pod, container, "", v1.EventTypeWarning, events.FailedToCreateContainer, "Error: %v", s.Message())
|
||||
return msg, err
|
||||
}
|
||||
|
||||
@ -117,19 +118,22 @@ func (m *kubeGenericRuntimeManager) startContainer(podSandboxID string, podSandb
|
||||
defer cleanupAction()
|
||||
}
|
||||
if err != nil {
|
||||
m.recordContainerEvent(pod, container, "", v1.EventTypeWarning, events.FailedToCreateContainer, "Error: %v", grpc.ErrorDesc(err))
|
||||
return grpc.ErrorDesc(err), ErrCreateContainerConfig
|
||||
s, _ := grpcstatus.FromError(err)
|
||||
m.recordContainerEvent(pod, container, "", v1.EventTypeWarning, events.FailedToCreateContainer, "Error: %v", s.Message())
|
||||
return s.Message(), ErrCreateContainerConfig
|
||||
}
|
||||
|
||||
containerID, err := m.runtimeService.CreateContainer(podSandboxID, containerConfig, podSandboxConfig)
|
||||
if err != nil {
|
||||
m.recordContainerEvent(pod, container, containerID, v1.EventTypeWarning, events.FailedToCreateContainer, "Error: %v", grpc.ErrorDesc(err))
|
||||
return grpc.ErrorDesc(err), ErrCreateContainer
|
||||
s, _ := grpcstatus.FromError(err)
|
||||
m.recordContainerEvent(pod, container, containerID, v1.EventTypeWarning, events.FailedToCreateContainer, "Error: %v", s.Message())
|
||||
return s.Message(), ErrCreateContainer
|
||||
}
|
||||
err = m.internalLifecycle.PreStartContainer(pod, container, containerID)
|
||||
if err != nil {
|
||||
m.recordContainerEvent(pod, container, containerID, v1.EventTypeWarning, events.FailedToStartContainer, "Internal PreStartContainer hook failed: %v", grpc.ErrorDesc(err))
|
||||
return grpc.ErrorDesc(err), ErrPreStartHook
|
||||
s, _ := grpcstatus.FromError(err)
|
||||
m.recordContainerEvent(pod, container, containerID, v1.EventTypeWarning, events.FailedToStartContainer, "Internal PreStartContainer hook failed: %v", s.Message())
|
||||
return s.Message(), ErrPreStartHook
|
||||
}
|
||||
m.recordContainerEvent(pod, container, containerID, v1.EventTypeNormal, events.CreatedContainer, fmt.Sprintf("Created container %s", container.Name))
|
||||
|
||||
@ -143,8 +147,9 @@ func (m *kubeGenericRuntimeManager) startContainer(podSandboxID string, podSandb
|
||||
// Step 3: start the container.
|
||||
err = m.runtimeService.StartContainer(containerID)
|
||||
if err != nil {
|
||||
m.recordContainerEvent(pod, container, containerID, v1.EventTypeWarning, events.FailedToStartContainer, "Error: %v", grpc.ErrorDesc(err))
|
||||
return grpc.ErrorDesc(err), kubecontainer.ErrRunContainer
|
||||
s, _ := grpcstatus.FromError(err)
|
||||
m.recordContainerEvent(pod, container, containerID, v1.EventTypeWarning, events.FailedToStartContainer, "Error: %v", s.Message())
|
||||
return s.Message(), kubecontainer.ErrRunContainer
|
||||
}
|
||||
m.recordContainerEvent(pod, container, containerID, v1.EventTypeNormal, events.StartedContainer, fmt.Sprintf("Started container %s", container.Name))
|
||||
|
||||
@ -744,7 +749,7 @@ func (m *kubeGenericRuntimeManager) GetContainerLogs(ctx context.Context, pod *v
|
||||
status, err := m.runtimeService.ContainerStatus(containerID.ID)
|
||||
if err != nil {
|
||||
klog.V(4).Infof("failed to get container status for %v: %v", containerID.String(), err)
|
||||
return fmt.Errorf("Unable to retrieve container logs for %v", containerID.String())
|
||||
return fmt.Errorf("unable to retrieve container logs for %v", containerID.String())
|
||||
}
|
||||
return m.ReadLogs(ctx, status.GetLogPath(), containerID.ID, logOptions, stdout, stderr)
|
||||
}
|
||||
|
@ -23,6 +23,7 @@ import (
|
||||
"time"
|
||||
|
||||
"github.com/stretchr/testify/assert"
|
||||
"github.com/stretchr/testify/require"
|
||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||
|
||||
"k8s.io/api/core/v1"
|
||||
@ -35,6 +36,7 @@ import (
|
||||
// TestRemoveContainer tests removing the container and its corresponding container logs.
|
||||
func TestRemoveContainer(t *testing.T) {
|
||||
fakeRuntime, _, m, err := createTestRuntimeManager()
|
||||
require.NoError(t, err)
|
||||
pod := &v1.Pod{
|
||||
ObjectMeta: metav1.ObjectMeta{
|
||||
UID: "12345678",
|
||||
|
@ -68,7 +68,7 @@ const (
|
||||
|
||||
var (
|
||||
// ErrVersionNotSupported is returned when the api version of runtime interface is not supported
|
||||
ErrVersionNotSupported = errors.New("Runtime api version is not supported")
|
||||
ErrVersionNotSupported = errors.New("runtime api version is not supported")
|
||||
)
|
||||
|
||||
// podStateProvider can determine if a pod is deleted ir terminated
|
||||
@ -842,7 +842,7 @@ func (m *kubeGenericRuntimeManager) doBackOff(pod *v1.Pod, container *v1.Contain
|
||||
if ref, err := kubecontainer.GenerateContainerRef(pod, container); err == nil {
|
||||
m.recorder.Eventf(ref, v1.EventTypeWarning, events.BackOffStartContainer, "Back-off restarting failed container")
|
||||
}
|
||||
err := fmt.Errorf("Back-off %s restarting failed container=%s pod=%s", backOff.Get(key), container.Name, format.Pod(pod))
|
||||
err := fmt.Errorf("back-off %s restarting failed container=%s pod=%s", backOff.Get(key), container.Name, format.Pod(pod))
|
||||
klog.V(3).Infof("%s", err.Error())
|
||||
return true, err.Error(), kubecontainer.ErrCrashLoopBackOff
|
||||
}
|
||||
|
@ -49,8 +49,6 @@ import (
|
||||
const (
|
||||
// timeFormat is the time format used in the log.
|
||||
timeFormat = time.RFC3339Nano
|
||||
// blockSize is the block size used in tail.
|
||||
blockSize = 1024
|
||||
|
||||
// stateCheckPeriod is the period to check container state while following
|
||||
// the container log. Kubelet should not keep following the log when the
|
||||
|
@ -74,7 +74,7 @@ func (hr *HandlerRunner) Run(containerID kubecontainer.ContainerID, pod *v1.Pod,
|
||||
}
|
||||
return msg, err
|
||||
default:
|
||||
err := fmt.Errorf("Invalid handler: %v", handler)
|
||||
err := fmt.Errorf("invalid handler: %v", handler)
|
||||
msg := fmt.Sprintf("Cannot run handler: %v", err)
|
||||
klog.Errorf(msg)
|
||||
return msg, err
|
||||
|
@ -26,9 +26,7 @@ import (
|
||||
)
|
||||
|
||||
var (
|
||||
quantity = *resource.NewQuantity(1, resource.DecimalSI)
|
||||
extendedResourceName1 = "example.com/er1"
|
||||
extendedResourceName2 = "example.com/er2"
|
||||
quantity = *resource.NewQuantity(1, resource.DecimalSI)
|
||||
)
|
||||
|
||||
func TestRemoveMissingExtendedResources(t *testing.T) {
|
||||
|
@ -191,8 +191,6 @@ func (c *Configurer) CheckLimitsForResolvConf() {
|
||||
klog.V(4).Infof("CheckLimitsForResolvConf: " + log)
|
||||
return
|
||||
}
|
||||
|
||||
return
|
||||
}
|
||||
|
||||
// parseResolvConf reads a resolv.conf file from the given reader, and parses
|
||||
@ -396,10 +394,10 @@ func (c *Configurer) SetupDNSinContainerizedMounter(mounterPath string) {
|
||||
}
|
||||
if c.ResolverConfig != "" {
|
||||
f, err := os.Open(c.ResolverConfig)
|
||||
defer f.Close()
|
||||
if err != nil {
|
||||
klog.Error("Could not open resolverConf file")
|
||||
} else {
|
||||
defer f.Close()
|
||||
_, hostSearch, _, err := parseResolvConf(f)
|
||||
if err != nil {
|
||||
klog.Errorf("Error for parsing the resolv.conf file: %v", err)
|
||||
|
@ -487,7 +487,7 @@ func ReadyCondition(
|
||||
}
|
||||
}
|
||||
if len(missingCapacities) > 0 {
|
||||
errs = append(errs, fmt.Errorf("Missing node capacity for resources: %s", strings.Join(missingCapacities, ", ")))
|
||||
errs = append(errs, fmt.Errorf("missing node capacity for resources: %s", strings.Join(missingCapacities, ", ")))
|
||||
}
|
||||
if aggregatedErr := errors.NewAggregate(errs); aggregatedErr != nil {
|
||||
newNodeReadyCondition = v1.NodeCondition{
|
||||
|
@ -1016,7 +1016,7 @@ func TestReadyCondition(t *testing.T) {
|
||||
{
|
||||
desc: "new, not ready: missing capacities",
|
||||
node: &v1.Node{},
|
||||
expectConditions: []v1.NodeCondition{*makeReadyCondition(false, "Missing node capacity for resources: cpu, memory, pods, ephemeral-storage", now, now)},
|
||||
expectConditions: []v1.NodeCondition{*makeReadyCondition(false, "missing node capacity for resources: cpu, memory, pods, ephemeral-storage", now, now)},
|
||||
},
|
||||
// the transition tests ensure timestamps are set correctly, no need to test the entire condition matrix in this section
|
||||
{
|
||||
|
@ -253,7 +253,7 @@ func (g *GenericPLEG) relist() {
|
||||
needsReinspection[pid] = pod
|
||||
|
||||
continue
|
||||
} else if _, found := g.podsToReinspect[pid]; found {
|
||||
} else {
|
||||
// this pod was in the list to reinspect and we did so because it had events, so remove it
|
||||
// from the list (we don't want the reinspection code below to inspect it a second time in
|
||||
// this relist execution)
|
||||
|
@ -46,7 +46,6 @@ go_test(
|
||||
deps = [
|
||||
"//pkg/kubelet/apis/pluginregistration/v1:go_default_library",
|
||||
"//pkg/kubelet/config:go_default_library",
|
||||
"//pkg/kubelet/pluginmanager/cache:go_default_library",
|
||||
"//pkg/kubelet/pluginmanager/pluginwatcher:go_default_library",
|
||||
"//staging/src/k8s.io/apimachinery/pkg/util/sets:go_default_library",
|
||||
"//staging/src/k8s.io/apimachinery/pkg/util/wait:go_default_library",
|
||||
|
1
pkg/kubelet/pluginmanager/cache/BUILD
vendored
1
pkg/kubelet/pluginmanager/cache/BUILD
vendored
@ -19,6 +19,7 @@ go_test(
|
||||
"desired_state_of_world_test.go",
|
||||
],
|
||||
embed = [":go_default_library"],
|
||||
deps = ["//vendor/github.com/stretchr/testify/require:go_default_library"],
|
||||
)
|
||||
|
||||
filegroup(
|
||||
|
@ -87,7 +87,7 @@ func (asw *actualStateOfWorld) AddPlugin(pluginInfo PluginInfo) error {
|
||||
defer asw.Unlock()
|
||||
|
||||
if pluginInfo.SocketPath == "" {
|
||||
return fmt.Errorf("Socket path is empty")
|
||||
return fmt.Errorf("socket path is empty")
|
||||
}
|
||||
if _, ok := asw.socketFileToInfo[pluginInfo.SocketPath]; ok {
|
||||
klog.V(2).Infof("Plugin (Path %s) exists in actual state cache", pluginInfo.SocketPath)
|
||||
|
@ -19,6 +19,8 @@ package cache
|
||||
import (
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/stretchr/testify/require"
|
||||
)
|
||||
|
||||
// Calls AddPlugin() to add a plugin
|
||||
@ -63,10 +65,7 @@ func Test_ASW_AddPlugin_Negative_EmptySocketPath(t *testing.T) {
|
||||
Timestamp: time.Now(),
|
||||
}
|
||||
err := asw.AddPlugin(pluginInfo)
|
||||
// Assert
|
||||
if err == nil || err.Error() != "Socket path is empty" {
|
||||
t.Fatalf("AddOrUpdatePlugin failed. Expected: <Socket path is empty> Actual: <%v>", err)
|
||||
}
|
||||
require.EqualError(t, err, "socket path is empty")
|
||||
|
||||
// Get registered plugins and check the newly added plugin is there
|
||||
aswPlugins := asw.GetRegisteredPlugins()
|
||||
|
@ -125,7 +125,7 @@ func (dsw *desiredStateOfWorld) AddOrUpdatePlugin(socketPath string, foundInDepr
|
||||
defer dsw.Unlock()
|
||||
|
||||
if socketPath == "" {
|
||||
return fmt.Errorf("Socket path is empty")
|
||||
return fmt.Errorf("socket path is empty")
|
||||
}
|
||||
if _, ok := dsw.socketFileToInfo[socketPath]; ok {
|
||||
klog.V(2).Infof("Plugin (Path %s) exists in actual state cache, timestamp will be updated", socketPath)
|
||||
|
@ -18,6 +18,8 @@ package cache
|
||||
|
||||
import (
|
||||
"testing"
|
||||
|
||||
"github.com/stretchr/testify/require"
|
||||
)
|
||||
|
||||
// Calls AddOrUpdatePlugin() to add a plugin
|
||||
@ -96,10 +98,7 @@ func Test_DSW_AddOrUpdatePlugin_Negative_PluginMissingInfo(t *testing.T) {
|
||||
dsw := NewDesiredStateOfWorld()
|
||||
socketPath := ""
|
||||
err := dsw.AddOrUpdatePlugin(socketPath, false /* foundInDeprecatedDir */)
|
||||
// Assert
|
||||
if err == nil || err.Error() != "Socket path is empty" {
|
||||
t.Fatalf("AddOrUpdatePlugin failed. Expected: <Socket path is empty> Actual: <%v>", err)
|
||||
}
|
||||
require.EqualError(t, err, "socket path is empty")
|
||||
|
||||
// Get pluginsToRegister and check the newly added plugin is there
|
||||
dswPlugins := dsw.GetPluginsToRegister()
|
||||
|
@ -25,7 +25,6 @@ import (
|
||||
)
|
||||
|
||||
const (
|
||||
pluginNameNotAvailable = "N/A"
|
||||
// Metric keys for Plugin Manager.
|
||||
pluginManagerTotalPlugins = "plugin_manager_total_plugins"
|
||||
)
|
||||
|
@ -28,30 +28,17 @@ import (
|
||||
"k8s.io/apimachinery/pkg/util/sets"
|
||||
"k8s.io/apimachinery/pkg/util/wait"
|
||||
"k8s.io/client-go/tools/record"
|
||||
pluginwatcherapi "k8s.io/kubernetes/pkg/kubelet/apis/pluginregistration/v1"
|
||||
registerapi "k8s.io/kubernetes/pkg/kubelet/apis/pluginregistration/v1"
|
||||
"k8s.io/kubernetes/pkg/kubelet/config"
|
||||
"k8s.io/kubernetes/pkg/kubelet/pluginmanager/cache"
|
||||
"k8s.io/kubernetes/pkg/kubelet/pluginmanager/pluginwatcher"
|
||||
)
|
||||
|
||||
const (
|
||||
testHostname = "test-hostname"
|
||||
)
|
||||
|
||||
var (
|
||||
socketDir string
|
||||
deprecatedSocketDir string
|
||||
supportedVersions = []string{"v1beta1", "v1beta2"}
|
||||
)
|
||||
|
||||
// fake cache.PluginHandler
|
||||
type PluginHandler interface {
|
||||
ValidatePlugin(pluginName string, endpoint string, versions []string, foundInDeprecatedDir bool) error
|
||||
RegisterPlugin(pluginName, endpoint string, versions []string) error
|
||||
DeRegisterPlugin(pluginName string)
|
||||
}
|
||||
|
||||
type fakePluginHandler struct {
|
||||
validatePluginCalled bool
|
||||
registerPluginCalled bool
|
||||
@ -88,7 +75,6 @@ func (f *fakePluginHandler) DeRegisterPlugin(pluginName string) {
|
||||
f.Lock()
|
||||
defer f.Unlock()
|
||||
f.deregisterPluginCalled = true
|
||||
return
|
||||
}
|
||||
|
||||
func init() {
|
||||
@ -113,20 +99,6 @@ func cleanup(t *testing.T) {
|
||||
os.MkdirAll(deprecatedSocketDir, 0755)
|
||||
}
|
||||
|
||||
func newWatcher(
|
||||
t *testing.T, testDeprecatedDir bool,
|
||||
desiredStateOfWorldCache cache.DesiredStateOfWorld) *pluginwatcher.Watcher {
|
||||
|
||||
depSocketDir := ""
|
||||
if testDeprecatedDir {
|
||||
depSocketDir = deprecatedSocketDir
|
||||
}
|
||||
w := pluginwatcher.NewWatcher(socketDir, depSocketDir, desiredStateOfWorldCache)
|
||||
require.NoError(t, w.Start(wait.NeverStop))
|
||||
|
||||
return w
|
||||
}
|
||||
|
||||
func waitForRegistration(t *testing.T, fakePluginHandler *fakePluginHandler) {
|
||||
err := retryWithExponentialBackOff(
|
||||
time.Duration(500*time.Millisecond),
|
||||
@ -169,7 +141,7 @@ func TestPluginRegistration(t *testing.T) {
|
||||
|
||||
// Add handler for device plugin
|
||||
fakeHandler := newFakePluginHandler()
|
||||
pluginManager.AddHandler(pluginwatcherapi.DevicePlugin, fakeHandler)
|
||||
pluginManager.AddHandler(registerapi.DevicePlugin, fakeHandler)
|
||||
|
||||
// Add a new plugin
|
||||
socketPath := fmt.Sprintf("%s/plugin.sock", socketDir)
|
||||
|
@ -39,8 +39,7 @@ type exampleHandler struct {
|
||||
|
||||
eventChans map[string]chan examplePluginEvent // map[pluginName]eventChan
|
||||
|
||||
m sync.Mutex
|
||||
count int
|
||||
m sync.Mutex
|
||||
|
||||
permitDeprecatedDir bool
|
||||
}
|
||||
@ -95,7 +94,7 @@ func (p *exampleHandler) RegisterPlugin(pluginName, endpoint string, versions []
|
||||
// Verifies the grpcServer is ready to serve services.
|
||||
_, conn, err := dial(endpoint, time.Second)
|
||||
if err != nil {
|
||||
return fmt.Errorf("Failed dialing endpoint (%s): %v", endpoint, err)
|
||||
return fmt.Errorf("failed dialing endpoint (%s): %v", endpoint, err)
|
||||
}
|
||||
defer conn.Close()
|
||||
|
||||
@ -106,13 +105,13 @@ func (p *exampleHandler) RegisterPlugin(pluginName, endpoint string, versions []
|
||||
// Tests v1beta1 GetExampleInfo
|
||||
_, err = v1beta1Client.GetExampleInfo(context.Background(), &v1beta1.ExampleRequest{})
|
||||
if err != nil {
|
||||
return fmt.Errorf("Failed GetExampleInfo for v1beta2Client(%s): %v", endpoint, err)
|
||||
return fmt.Errorf("failed GetExampleInfo for v1beta2Client(%s): %v", endpoint, err)
|
||||
}
|
||||
|
||||
// Tests v1beta1 GetExampleInfo
|
||||
_, err = v1beta2Client.GetExampleInfo(context.Background(), &v1beta2.ExampleRequest{})
|
||||
if err != nil {
|
||||
return fmt.Errorf("Failed GetExampleInfo for v1beta2Client(%s): %v", endpoint, err)
|
||||
return fmt.Errorf("failed GetExampleInfo for v1beta2Client(%s): %v", endpoint, err)
|
||||
}
|
||||
|
||||
return nil
|
||||
|
@ -134,13 +134,11 @@ func (e *examplePlugin) Serve(services ...string) error {
|
||||
case "v1beta1":
|
||||
v1beta1 := &pluginServiceV1Beta1{server: e}
|
||||
v1beta1.RegisterService()
|
||||
break
|
||||
case "v1beta2":
|
||||
v1beta2 := &pluginServiceV1Beta2{server: e}
|
||||
v1beta2.RegisterService()
|
||||
break
|
||||
default:
|
||||
return fmt.Errorf("Unsupported service: '%s'", service)
|
||||
return fmt.Errorf("unsupported service: '%s'", service)
|
||||
}
|
||||
}
|
||||
|
||||
@ -171,7 +169,7 @@ func (e *examplePlugin) Stop() error {
|
||||
case <-c:
|
||||
break
|
||||
case <-time.After(time.Second):
|
||||
return errors.New("Timed out on waiting for stop completion")
|
||||
return errors.New("timed out on waiting for stop completion")
|
||||
}
|
||||
|
||||
if err := os.Remove(e.endpoint); err != nil && !os.IsNotExist(err) {
|
||||
|
@ -287,27 +287,6 @@ func TestPluginRegistrationAtKubeletStart(t *testing.T) {
|
||||
}
|
||||
}
|
||||
|
||||
func waitForPluginRegistrationStatus(t *testing.T, statusChan chan registerapi.RegistrationStatus) bool {
|
||||
select {
|
||||
case status := <-statusChan:
|
||||
return status.PluginRegistered
|
||||
case <-time.After(wait.ForeverTestTimeout):
|
||||
t.Fatalf("Timed out while waiting for registration status")
|
||||
}
|
||||
return false
|
||||
}
|
||||
|
||||
func waitForEvent(t *testing.T, expected examplePluginEvent, eventChan chan examplePluginEvent) bool {
|
||||
select {
|
||||
case event := <-eventChan:
|
||||
return event == expected
|
||||
case <-time.After(wait.ForeverTestTimeout):
|
||||
t.Fatalf("Timed out while waiting for registration status %v", expected)
|
||||
}
|
||||
|
||||
return false
|
||||
}
|
||||
|
||||
func newWatcher(t *testing.T, testDeprecatedDir bool, desiredStateOfWorldCache cache.DesiredStateOfWorld, stopCh <-chan struct{}) *Watcher {
|
||||
depSocketDir := ""
|
||||
if testDeprecatedDir {
|
||||
|
@ -26,7 +26,6 @@ import (
|
||||
"github.com/stretchr/testify/require"
|
||||
"k8s.io/apimachinery/pkg/util/wait"
|
||||
"k8s.io/client-go/tools/record"
|
||||
pluginwatcherapi "k8s.io/kubernetes/pkg/kubelet/apis/pluginregistration/v1"
|
||||
registerapi "k8s.io/kubernetes/pkg/kubelet/apis/pluginregistration/v1"
|
||||
"k8s.io/kubernetes/pkg/kubelet/pluginmanager/cache"
|
||||
"k8s.io/kubernetes/pkg/kubelet/pluginmanager/operationexecutor"
|
||||
@ -115,9 +114,7 @@ func retryWithExponentialBackOff(initialDuration time.Duration, fn wait.Conditio
|
||||
return wait.ExponentialBackoff(backoff, fn)
|
||||
}
|
||||
|
||||
type DummyImpl struct {
|
||||
dummy string
|
||||
}
|
||||
type DummyImpl struct{}
|
||||
|
||||
func NewDummyImpl() *DummyImpl {
|
||||
return &DummyImpl{}
|
||||
@ -185,7 +182,7 @@ func Test_Run_Positive_Register(t *testing.T) {
|
||||
dsw,
|
||||
asw,
|
||||
)
|
||||
reconciler.AddHandler(pluginwatcherapi.DevicePlugin, cache.PluginHandler(di))
|
||||
reconciler.AddHandler(registerapi.DevicePlugin, cache.PluginHandler(di))
|
||||
|
||||
// Start the reconciler to fill ASW.
|
||||
stopChan := make(chan struct{})
|
||||
@ -230,7 +227,7 @@ func Test_Run_Positive_RegisterThenUnregister(t *testing.T) {
|
||||
dsw,
|
||||
asw,
|
||||
)
|
||||
reconciler.AddHandler(pluginwatcherapi.DevicePlugin, cache.PluginHandler(di))
|
||||
reconciler.AddHandler(registerapi.DevicePlugin, cache.PluginHandler(di))
|
||||
|
||||
// Start the reconciler to fill ASW.
|
||||
stopChan := make(chan struct{})
|
||||
@ -285,7 +282,7 @@ func Test_Run_Positive_ReRegister(t *testing.T) {
|
||||
dsw,
|
||||
asw,
|
||||
)
|
||||
reconciler.AddHandler(pluginwatcherapi.DevicePlugin, cache.PluginHandler(di))
|
||||
reconciler.AddHandler(registerapi.DevicePlugin, cache.PluginHandler(di))
|
||||
|
||||
// Start the reconciler to fill ASW.
|
||||
stopChan := make(chan struct{})
|
||||
|
@ -92,7 +92,6 @@ func (mc *basicMirrorClient) DeleteMirrorPod(podFullName string, uid *types.UID)
|
||||
}
|
||||
klog.V(2).Infof("Deleting a mirror pod %q (uid %#v)", podFullName, uid)
|
||||
var GracePeriodSeconds int64
|
||||
GracePeriodSeconds = 0
|
||||
if err := mc.apiserverClient.CoreV1().Pods(namespace).Delete(name, &metav1.DeleteOptions{GracePeriodSeconds: &GracePeriodSeconds, Preconditions: &metav1.Preconditions{UID: uid}}); err != nil {
|
||||
// Unfortunately, there's no generic error for failing a precondition
|
||||
if !(errors.IsNotFound(err) || errors.IsConflict(err)) {
|
||||
|
@ -241,9 +241,7 @@ func (p *podWorkers) removeWorker(uid types.UID) {
|
||||
// If there is an undelivered work update for this pod we need to remove it
|
||||
// since per-pod goroutine won't be able to put it to the already closed
|
||||
// channel when it finishes processing the current work update.
|
||||
if _, cached := p.lastUndeliveredWorkUpdate[uid]; cached {
|
||||
delete(p.lastUndeliveredWorkUpdate, uid)
|
||||
}
|
||||
delete(p.lastUndeliveredWorkUpdate, uid)
|
||||
}
|
||||
}
|
||||
func (p *podWorkers) ForgetWorker(uid types.UID) {
|
||||
|
@ -87,7 +87,7 @@ func (pb *prober) probe(probeType probeType, pod *v1.Pod, status v1.PodStatus, c
|
||||
case liveness:
|
||||
probeSpec = container.LivenessProbe
|
||||
default:
|
||||
return results.Failure, fmt.Errorf("Unknown probe type: %q", probeType)
|
||||
return results.Failure, fmt.Errorf("unknown probe type: %q", probeType)
|
||||
}
|
||||
|
||||
ctrName := fmt.Sprintf("%s:%s", format.Pod(pod), container.Name)
|
||||
@ -193,7 +193,7 @@ func (pb *prober) runProbe(probeType probeType, p *v1.Probe, pod *v1.Pod, status
|
||||
return pb.tcp.Probe(host, port, timeout)
|
||||
}
|
||||
klog.Warningf("Failed to find probe builder for container: %v", container)
|
||||
return probe.Unknown, "", fmt.Errorf("Missing probe handler for %s:%s", format.Pod(pod), container.Name)
|
||||
return probe.Unknown, "", fmt.Errorf("missing probe handler for %s:%s", format.Pod(pod), container.Name)
|
||||
}
|
||||
|
||||
func extractPort(param intstr.IntOrString, container v1.Container) (int, error) {
|
||||
@ -210,7 +210,7 @@ func extractPort(param intstr.IntOrString, container v1.Container) (int, error)
|
||||
}
|
||||
}
|
||||
default:
|
||||
return port, fmt.Errorf("IntOrString had no kind: %+v", param)
|
||||
return port, fmt.Errorf("intOrString had no kind: %+v", param)
|
||||
}
|
||||
if port > 0 && port < 65536 {
|
||||
return port, nil
|
||||
|
@ -208,8 +208,6 @@ func (f *RemoteRuntime) ExecSync(ctx context.Context, req *kubeapi.ExecSyncReque
|
||||
return nil, err
|
||||
}
|
||||
exitCode = int32(exitError.ExitStatus())
|
||||
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return &kubeapi.ExecSyncResponse{
|
||||
|
@ -116,7 +116,7 @@ func (kl *Kubelet) runPod(pod *v1.Pod, retryDelay time.Duration) error {
|
||||
for {
|
||||
status, err := kl.containerRuntime.GetPodStatus(pod.UID, pod.Name, pod.Namespace)
|
||||
if err != nil {
|
||||
return fmt.Errorf("Unable to get status for pod %q: %v", format.Pod(pod), err)
|
||||
return fmt.Errorf("unable to get status for pod %q: %v", format.Pod(pod), err)
|
||||
}
|
||||
|
||||
if kl.isPodRunning(pod, status) {
|
||||
|
@ -71,7 +71,7 @@ func (m *Manager) LookupRuntimeHandler(runtimeClassName *string) (string, error)
|
||||
if errors.IsNotFound(err) {
|
||||
return "", err
|
||||
}
|
||||
return "", fmt.Errorf("Failed to lookup RuntimeClass %s: %v", name, err)
|
||||
return "", fmt.Errorf("failed to lookup RuntimeClass %s: %v", name, err)
|
||||
}
|
||||
|
||||
return rc.Handler, nil
|
||||
|
@ -46,7 +46,7 @@ func handleHTTPStreams(req *http.Request, w http.ResponseWriter, portForwarder P
|
||||
upgrader := spdy.NewResponseUpgrader()
|
||||
conn := upgrader.UpgradeResponse(w, req, httpStreamReceived(streamChan))
|
||||
if conn == nil {
|
||||
return errors.New("Unable to upgrade httpstream connection")
|
||||
return errors.New("unable to upgrade httpstream connection")
|
||||
}
|
||||
defer conn.Close()
|
||||
|
||||
|
@ -115,7 +115,7 @@ func handleWebSocketStreams(req *http.Request, w http.ResponseWriter, portForwar
|
||||
conn.SetIdleTimeout(idleTimeout)
|
||||
_, streams, err := conn.Open(httplog.Unlogged(req, w), req)
|
||||
if err != nil {
|
||||
err = fmt.Errorf("Unable to upgrade websocket connection: %v", err)
|
||||
err = fmt.Errorf("unable to upgrade websocket connection: %v", err)
|
||||
return err
|
||||
}
|
||||
defer conn.Close()
|
||||
|
@ -232,7 +232,7 @@ WaitForStreams:
|
||||
ctx.resizeStream = stream
|
||||
go waitStreamReply(stream.replySent, replyChan, stop)
|
||||
default:
|
||||
runtime.HandleError(fmt.Errorf("Unexpected stream type: %q", streamType))
|
||||
runtime.HandleError(fmt.Errorf("unexpected stream type: %q", streamType))
|
||||
}
|
||||
case <-replyChan:
|
||||
receivedStreams++
|
||||
@ -283,7 +283,7 @@ WaitForStreams:
|
||||
ctx.resizeStream = stream
|
||||
go waitStreamReply(stream.replySent, replyChan, stop)
|
||||
default:
|
||||
runtime.HandleError(fmt.Errorf("Unexpected stream type: %q", streamType))
|
||||
runtime.HandleError(fmt.Errorf("unexpected stream type: %q", streamType))
|
||||
}
|
||||
case <-replyChan:
|
||||
receivedStreams++
|
||||
@ -331,7 +331,7 @@ WaitForStreams:
|
||||
ctx.stderrStream = stream
|
||||
go waitStreamReply(stream.replySent, replyChan, stop)
|
||||
default:
|
||||
runtime.HandleError(fmt.Errorf("Unexpected stream type: %q", streamType))
|
||||
runtime.HandleError(fmt.Errorf("unexpected stream type: %q", streamType))
|
||||
}
|
||||
case <-replyChan:
|
||||
receivedStreams++
|
||||
@ -385,7 +385,7 @@ WaitForStreams:
|
||||
ctx.stderrStream = stream
|
||||
go waitStreamReply(stream.replySent, replyChan, stop)
|
||||
default:
|
||||
runtime.HandleError(fmt.Errorf("Unexpected stream type: %q", streamType))
|
||||
runtime.HandleError(fmt.Errorf("unexpected stream type: %q", streamType))
|
||||
}
|
||||
case <-replyChan:
|
||||
receivedStreams++
|
||||
|
@ -97,7 +97,7 @@ func createWebSocketStreams(req *http.Request, w http.ResponseWriter, opts *Opti
|
||||
conn.SetIdleTimeout(idleTimeout)
|
||||
negotiatedProtocol, streams, err := conn.Open(httplog.Unlogged(req, w), req)
|
||||
if err != nil {
|
||||
runtime.HandleError(fmt.Errorf("Unable to upgrade websocket connection: %v", err))
|
||||
runtime.HandleError(fmt.Errorf("unable to upgrade websocket connection: %v", err))
|
||||
return nil, false
|
||||
}
|
||||
|
||||
|
@ -296,6 +296,7 @@ func (s *Server) InstallDefaultHandlers(enableCAdvisorJSONEndpoints bool) {
|
||||
s.restfulCont.Add(ws)
|
||||
|
||||
s.restfulCont.Add(stats.CreateHandlers(statsPath, s.host, s.resourceAnalyzer, enableCAdvisorJSONEndpoints))
|
||||
//lint:ignore SA1019 https://github.com/kubernetes/enhancements/issues/1206
|
||||
s.restfulCont.Handle(metricsPath, prometheus.Handler())
|
||||
|
||||
// cAdvisor metrics are exposed under the secured handler as well
|
||||
@ -661,9 +662,7 @@ func getPortForwardRequestParams(req *restful.Request) portForwardRequestParams
|
||||
}
|
||||
}
|
||||
|
||||
type responder struct {
|
||||
errorMessage string
|
||||
}
|
||||
type responder struct{}
|
||||
|
||||
func (r *responder) Error(w http.ResponseWriter, req *http.Request, err error) {
|
||||
klog.Errorf("Error while proxying request: %v", err)
|
||||
|
@ -288,13 +288,11 @@ func (f *fakeAuth) Authorize(a authorizer.Attributes) (authorized authorizer.Dec
|
||||
}
|
||||
|
||||
type serverTestFramework struct {
|
||||
serverUnderTest *Server
|
||||
fakeKubelet *fakeKubelet
|
||||
fakeAuth *fakeAuth
|
||||
testHTTPServer *httptest.Server
|
||||
fakeRuntime *fakeRuntime
|
||||
testStreamingHTTPServer *httptest.Server
|
||||
criHandler *utiltesting.FakeHandler
|
||||
serverUnderTest *Server
|
||||
fakeKubelet *fakeKubelet
|
||||
fakeAuth *fakeAuth
|
||||
testHTTPServer *httptest.Server
|
||||
criHandler *utiltesting.FakeHandler
|
||||
}
|
||||
|
||||
func newServerTest() *serverTestFramework {
|
||||
@ -676,11 +674,6 @@ func assertHealthFails(t *testing.T, httpURL string, expectedErrorCode int) {
|
||||
}
|
||||
}
|
||||
|
||||
type authTestCase struct {
|
||||
Method string
|
||||
Path string
|
||||
}
|
||||
|
||||
// Ensure all registered handlers & services have an associated testcase.
|
||||
func TestAuthzCoverage(t *testing.T) {
|
||||
fw := newServerTest()
|
||||
|
@ -23,7 +23,6 @@ go_library(
|
||||
"//staging/src/k8s.io/client-go/tools/remotecommand:go_default_library",
|
||||
"//staging/src/k8s.io/cri-api/pkg/apis/runtime/v1alpha2:go_default_library",
|
||||
"//vendor/github.com/emicklei/go-restful:go_default_library",
|
||||
"//vendor/google.golang.org/grpc:go_default_library",
|
||||
"//vendor/google.golang.org/grpc/codes:go_default_library",
|
||||
"//vendor/google.golang.org/grpc/status:go_default_library",
|
||||
],
|
||||
|
@ -20,25 +20,25 @@ import (
|
||||
"net/http"
|
||||
"strconv"
|
||||
|
||||
"google.golang.org/grpc"
|
||||
"google.golang.org/grpc/codes"
|
||||
"google.golang.org/grpc/status"
|
||||
grpcstatus "google.golang.org/grpc/status"
|
||||
)
|
||||
|
||||
// NewErrorStreamingDisabled creates an error for disabled streaming method.
|
||||
func NewErrorStreamingDisabled(method string) error {
|
||||
return status.Errorf(codes.NotFound, "streaming method %s disabled", method)
|
||||
return grpcstatus.Errorf(codes.NotFound, "streaming method %s disabled", method)
|
||||
}
|
||||
|
||||
// NewErrorTooManyInFlight creates an error for exceeding the maximum number of in-flight requests.
|
||||
func NewErrorTooManyInFlight() error {
|
||||
return status.Error(codes.ResourceExhausted, "maximum number of in-flight requests exceeded")
|
||||
return grpcstatus.Error(codes.ResourceExhausted, "maximum number of in-flight requests exceeded")
|
||||
}
|
||||
|
||||
// WriteError translates a CRI streaming error into an appropriate HTTP response.
|
||||
func WriteError(err error, w http.ResponseWriter) error {
|
||||
s, _ := grpcstatus.FromError(err)
|
||||
var status int
|
||||
switch grpc.Code(err) {
|
||||
switch s.Code() {
|
||||
case codes.NotFound:
|
||||
status = http.StatusNotFound
|
||||
case codes.ResourceExhausted:
|
||||
|
@ -68,10 +68,11 @@ func TestInsert(t *testing.T) {
|
||||
// Insert again (should evict)
|
||||
_, err = c.Insert(nextRequest())
|
||||
assert.Error(t, err, "should reject further requests")
|
||||
errResponse := httptest.NewRecorder()
|
||||
require.NoError(t, WriteError(err, errResponse))
|
||||
assert.Equal(t, errResponse.Code, http.StatusTooManyRequests)
|
||||
assert.Equal(t, strconv.Itoa(int(cacheTTL.Seconds())), errResponse.HeaderMap.Get("Retry-After"))
|
||||
recorder := httptest.NewRecorder()
|
||||
require.NoError(t, WriteError(err, recorder))
|
||||
errResponse := recorder.Result()
|
||||
assert.Equal(t, errResponse.StatusCode, http.StatusTooManyRequests)
|
||||
assert.Equal(t, strconv.Itoa(int(cacheTTL.Seconds())), errResponse.Header.Get("Retry-After"))
|
||||
|
||||
assertCacheSize(t, c, maxInFlight)
|
||||
_, ok = c.Consume(oldestTok)
|
||||
|
@ -316,29 +316,29 @@ func (p *criStatsProvider) ImageFsStats() (*statsapi.FsStats, error) {
|
||||
// return the first one.
|
||||
//
|
||||
// TODO(yguo0905): Support returning stats of multiple image filesystems.
|
||||
for _, fs := range resp {
|
||||
s := &statsapi.FsStats{
|
||||
Time: metav1.NewTime(time.Unix(0, fs.Timestamp)),
|
||||
UsedBytes: &fs.UsedBytes.Value,
|
||||
}
|
||||
if fs.InodesUsed != nil {
|
||||
s.InodesUsed = &fs.InodesUsed.Value
|
||||
}
|
||||
imageFsInfo := p.getFsInfo(fs.GetFsId())
|
||||
if imageFsInfo != nil {
|
||||
// The image filesystem id is unknown to the local node or there's
|
||||
// an error on retrieving the stats. In these cases, we omit those
|
||||
// stats and return the best-effort partial result. See
|
||||
// https://github.com/kubernetes/heapster/issues/1793.
|
||||
s.AvailableBytes = &imageFsInfo.Available
|
||||
s.CapacityBytes = &imageFsInfo.Capacity
|
||||
s.InodesFree = imageFsInfo.InodesFree
|
||||
s.Inodes = imageFsInfo.Inodes
|
||||
}
|
||||
return s, nil
|
||||
if len(resp) == 0 {
|
||||
return nil, fmt.Errorf("imageFs information is unavailable")
|
||||
}
|
||||
|
||||
return nil, fmt.Errorf("imageFs information is unavailable")
|
||||
fs := resp[0]
|
||||
s := &statsapi.FsStats{
|
||||
Time: metav1.NewTime(time.Unix(0, fs.Timestamp)),
|
||||
UsedBytes: &fs.UsedBytes.Value,
|
||||
}
|
||||
if fs.InodesUsed != nil {
|
||||
s.InodesUsed = &fs.InodesUsed.Value
|
||||
}
|
||||
imageFsInfo := p.getFsInfo(fs.GetFsId())
|
||||
if imageFsInfo != nil {
|
||||
// The image filesystem id is unknown to the local node or there's
|
||||
// an error on retrieving the stats. In these cases, we omit those
|
||||
// stats and return the best-effort partial result. See
|
||||
// https://github.com/kubernetes/heapster/issues/1793.
|
||||
s.AvailableBytes = &imageFsInfo.Available
|
||||
s.CapacityBytes = &imageFsInfo.Capacity
|
||||
s.InodesFree = imageFsInfo.InodesFree
|
||||
s.Inodes = imageFsInfo.Inodes
|
||||
}
|
||||
return s, nil
|
||||
}
|
||||
|
||||
// ImageFsDevice returns name of the device where the image filesystem locates,
|
||||
|
@ -150,6 +150,7 @@ func (m *manager) Start() {
|
||||
}
|
||||
|
||||
klog.Info("Starting to sync pod status with apiserver")
|
||||
//lint:ignore SA1015 Ticker can link since this is only called once and doesn't handle termination.
|
||||
syncTicker := time.Tick(syncPeriod)
|
||||
// syncPod and syncBatch share the same go routine to avoid sync races.
|
||||
go wait.Forever(func() {
|
||||
|
@ -88,9 +88,8 @@ func GetValidatedSources(sources []string) ([]string, error) {
|
||||
return []string{FileSource, HTTPSource, ApiserverSource}, nil
|
||||
case FileSource, HTTPSource, ApiserverSource:
|
||||
validated = append(validated, source)
|
||||
break
|
||||
case "":
|
||||
break
|
||||
// Skip
|
||||
default:
|
||||
return []string{}, fmt.Errorf("unknown pod source %q", source)
|
||||
}
|
||||
@ -165,10 +164,7 @@ func Preemptable(preemptor, preemptee *v1.Pod) bool {
|
||||
|
||||
// IsCriticalPodBasedOnPriority checks if the given pod is a critical pod based on priority resolved from pod Spec.
|
||||
func IsCriticalPodBasedOnPriority(priority int32) bool {
|
||||
if priority >= scheduling.SystemCriticalPriority {
|
||||
return true
|
||||
}
|
||||
return false
|
||||
return priority >= scheduling.SystemCriticalPriority
|
||||
}
|
||||
|
||||
// IsStaticPod returns true if the pod is a static pod.
|
||||
|
@ -42,7 +42,7 @@ func TestGetValidatedSources(t *testing.T) {
|
||||
require.Len(t, sources, 3)
|
||||
|
||||
// Unknown source.
|
||||
sources, err = GetValidatedSources([]string{"taco"})
|
||||
_, err = GetValidatedSources([]string{"taco"})
|
||||
require.Error(t, err)
|
||||
}
|
||||
|
||||
|
@ -28,6 +28,3 @@ func dirExists(path string) bool {
|
||||
}
|
||||
return s.IsDir()
|
||||
}
|
||||
|
||||
// empty is a placeholder type used to implement a set
|
||||
type empty struct{}
|
||||
|
@ -68,5 +68,5 @@ func aggregatePods(pods []*v1.Pod, handler podHandler) string {
|
||||
for _, pod := range pods {
|
||||
podStrings = append(podStrings, handler(pod))
|
||||
}
|
||||
return fmt.Sprintf(strings.Join(podStrings, ", "))
|
||||
return strings.Join(podStrings, ", ")
|
||||
}
|
||||
|
@ -120,7 +120,7 @@ func parseEndpoint(endpoint string) (string, string, error) {
|
||||
return "unix", u.Path, nil
|
||||
|
||||
case "":
|
||||
return "", "", fmt.Errorf("Using %q as endpoint is deprecated, please consider using full url format", endpoint)
|
||||
return "", "", fmt.Errorf("using %q as endpoint is deprecated, please consider using full url format", endpoint)
|
||||
|
||||
default:
|
||||
return u.Scheme, "", fmt.Errorf("protocol %q not supported", u.Scheme)
|
||||
|
@ -97,7 +97,7 @@ func NewInitializedVolumePluginMgr(
|
||||
|
||||
if err := kvh.volumePluginMgr.InitPlugins(plugins, prober, kvh); err != nil {
|
||||
return nil, fmt.Errorf(
|
||||
"Could not initialize volume plugins for KubeletVolumePluginMgr: %v",
|
||||
"could not initialize volume plugins for KubeletVolumePluginMgr: %v",
|
||||
err)
|
||||
}
|
||||
|
||||
|
1
pkg/kubelet/volumemanager/cache/BUILD
vendored
1
pkg/kubelet/volumemanager/cache/BUILD
vendored
@ -43,6 +43,7 @@ go_test(
|
||||
"//pkg/volume/util/types:go_default_library",
|
||||
"//staging/src/k8s.io/api/core/v1:go_default_library",
|
||||
"//staging/src/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library",
|
||||
"//vendor/github.com/stretchr/testify/require:go_default_library",
|
||||
],
|
||||
)
|
||||
|
||||
|
@ -19,6 +19,7 @@ package cache
|
||||
import (
|
||||
"testing"
|
||||
|
||||
"github.com/stretchr/testify/require"
|
||||
"k8s.io/api/core/v1"
|
||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||
"k8s.io/kubernetes/pkg/volume"
|
||||
@ -359,8 +360,10 @@ func Test_AddTwoPodsToVolume_Positive(t *testing.T) {
|
||||
volumeSpec2 := &volume.Spec{Volume: &pod2.Spec.Volumes[0]}
|
||||
generatedVolumeName1, err := util.GetUniqueVolumeNameFromSpec(
|
||||
plugin, volumeSpec1)
|
||||
require.NoError(t, err)
|
||||
generatedVolumeName2, err := util.GetUniqueVolumeNameFromSpec(
|
||||
plugin, volumeSpec2)
|
||||
require.NoError(t, err)
|
||||
|
||||
if generatedVolumeName1 != generatedVolumeName2 {
|
||||
t.Fatalf(
|
||||
@ -466,6 +469,7 @@ func Test_AddPodToVolume_Negative_VolumeDoesntExist(t *testing.T) {
|
||||
|
||||
volumeName, err := util.GetUniqueVolumeNameFromSpec(
|
||||
plugin, volumeSpec)
|
||||
require.NoError(t, err)
|
||||
|
||||
podName := util.GetUniquePodName(pod)
|
||||
|
||||
|
@ -125,7 +125,6 @@ type reconciler struct {
|
||||
kubeClient clientset.Interface
|
||||
controllerAttachDetachEnabled bool
|
||||
loopSleepDuration time.Duration
|
||||
syncDuration time.Duration
|
||||
waitForAttachTimeout time.Duration
|
||||
nodeName types.NodeName
|
||||
desiredStateOfWorld cache.DesiredStateOfWorld
|
||||
@ -355,7 +354,6 @@ type reconstructedVolume struct {
|
||||
attachablePlugin volumepkg.AttachableVolumePlugin
|
||||
volumeGidValue string
|
||||
devicePath string
|
||||
reportedInUse bool
|
||||
mounter volumepkg.Mounter
|
||||
blockVolumeMapper volumepkg.BlockVolumeMapper
|
||||
}
|
||||
@ -472,7 +470,7 @@ func (rc *reconciler) reconstructVolume(volume podVolume) (*reconstructedVolume,
|
||||
}
|
||||
// TODO: remove feature gate check after no longer needed
|
||||
if utilfeature.DefaultFeatureGate.Enabled(features.BlockVolume) && volume.volumeMode == v1.PersistentVolumeBlock && mapperPlugin == nil {
|
||||
return nil, fmt.Errorf("Could not find block volume plugin %q (spec.Name: %q) pod %q (UID: %q)", volume.pluginName, volume.volumeSpecName, volume.podName, pod.UID)
|
||||
return nil, fmt.Errorf("could not find block volume plugin %q (spec.Name: %q) pod %q (UID: %q)", volume.pluginName, volume.volumeSpecName, volume.podName, pod.UID)
|
||||
}
|
||||
|
||||
volumeSpec, err := rc.operationExecutor.ReconstructVolumeOperation(
|
||||
@ -545,7 +543,7 @@ func (rc *reconciler) reconstructVolume(volume podVolume) (*reconstructedVolume,
|
||||
}
|
||||
// If mount or symlink doesn't exist, volume reconstruction should be failed
|
||||
if !isExist {
|
||||
return nil, fmt.Errorf("Volume: %q is not mounted", uniqueVolumeName)
|
||||
return nil, fmt.Errorf("volume: %q is not mounted", uniqueVolumeName)
|
||||
}
|
||||
|
||||
reconstructedVolume := &reconstructedVolume{
|
||||
|
Some files were not shown because too many files have changed in this diff Show More
Loading…
Reference in New Issue
Block a user