mirror of
https://github.com/k3s-io/kubernetes.git
synced 2025-07-31 15:25:57 +00:00
Merge pull request #122147 from pohly/kubelet-context-support
kubelet: enhance context support
This commit is contained in:
commit
7abf6770fd
@ -524,11 +524,11 @@ func initConfigz(kc *kubeletconfiginternal.KubeletConfiguration) error {
|
||||
}
|
||||
|
||||
// makeEventRecorder sets up kubeDeps.Recorder if it's nil. It's a no-op otherwise.
|
||||
func makeEventRecorder(kubeDeps *kubelet.Dependencies, nodeName types.NodeName) {
|
||||
func makeEventRecorder(ctx context.Context, kubeDeps *kubelet.Dependencies, nodeName types.NodeName) {
|
||||
if kubeDeps.Recorder != nil {
|
||||
return
|
||||
}
|
||||
eventBroadcaster := record.NewBroadcaster()
|
||||
eventBroadcaster := record.NewBroadcaster(record.WithContext(ctx))
|
||||
kubeDeps.Recorder = eventBroadcaster.NewRecorder(legacyscheme.Scheme, v1.EventSource{Component: componentKubelet, Host: string(nodeName)})
|
||||
eventBroadcaster.StartStructuredLogging(3)
|
||||
if kubeDeps.EventClient != nil {
|
||||
@ -738,7 +738,7 @@ func run(ctx context.Context, s *options.KubeletServer, kubeDeps *kubelet.Depend
|
||||
}
|
||||
|
||||
// Setup event recorder if required.
|
||||
makeEventRecorder(kubeDeps, nodeName)
|
||||
makeEventRecorder(ctx, kubeDeps, nodeName)
|
||||
|
||||
if kubeDeps.ContainerManager == nil {
|
||||
if s.CgroupsPerQOS && s.CgroupRoot == "" {
|
||||
@ -1206,7 +1206,7 @@ func RunKubelet(kubeServer *options.KubeletServer, kubeDeps *kubelet.Dependencie
|
||||
}
|
||||
hostnameOverridden := len(kubeServer.HostnameOverride) > 0
|
||||
// Setup event recorder if required.
|
||||
makeEventRecorder(kubeDeps, nodeName)
|
||||
makeEventRecorder(context.TODO(), kubeDeps, nodeName)
|
||||
|
||||
nodeIPs, err := nodeutil.ParseNodeIPArgument(kubeServer.NodeIP, kubeServer.CloudProvider, utilfeature.DefaultFeatureGate.Enabled(features.CloudDualStackNodeIPs))
|
||||
if err != nil {
|
||||
|
@ -26,7 +26,7 @@ import (
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"k8s.io/api/core/v1"
|
||||
v1 "k8s.io/api/core/v1"
|
||||
apiequality "k8s.io/apimachinery/pkg/api/equality"
|
||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||
"k8s.io/apimachinery/pkg/types"
|
||||
@ -35,6 +35,7 @@ import (
|
||||
"k8s.io/client-go/tools/record"
|
||||
kubetypes "k8s.io/kubernetes/pkg/kubelet/types"
|
||||
"k8s.io/kubernetes/pkg/securitycontext"
|
||||
"k8s.io/kubernetes/test/utils/ktesting"
|
||||
)
|
||||
|
||||
const (
|
||||
@ -93,7 +94,7 @@ func CreatePodUpdate(op kubetypes.PodOperation, source string, pods ...*v1.Pod)
|
||||
}
|
||||
|
||||
func createPodConfigTester(ctx context.Context, mode PodConfigNotificationMode) (chan<- interface{}, <-chan kubetypes.PodUpdate, *PodConfig) {
|
||||
eventBroadcaster := record.NewBroadcaster()
|
||||
eventBroadcaster := record.NewBroadcaster(record.WithContext(ctx))
|
||||
config := NewPodConfig(mode, eventBroadcaster.NewRecorder(scheme.Scheme, v1.EventSource{Component: "kubelet"}), &mockPodStartupSLIObserver{})
|
||||
channel := config.Channel(ctx, TestSource)
|
||||
ch := config.Updates()
|
||||
@ -136,7 +137,8 @@ func expectNoPodUpdate(t *testing.T, ch <-chan kubetypes.PodUpdate) {
|
||||
}
|
||||
|
||||
func TestNewPodAdded(t *testing.T) {
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
_, ctx := ktesting.NewTestContext(t)
|
||||
ctx, cancel := context.WithCancel(ctx)
|
||||
defer cancel()
|
||||
|
||||
channel, ch, config := createPodConfigTester(ctx, PodConfigNotificationIncremental)
|
||||
@ -151,7 +153,8 @@ func TestNewPodAdded(t *testing.T) {
|
||||
}
|
||||
|
||||
func TestNewPodAddedInvalidNamespace(t *testing.T) {
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
_, ctx := ktesting.NewTestContext(t)
|
||||
ctx, cancel := context.WithCancel(ctx)
|
||||
defer cancel()
|
||||
|
||||
channel, ch, config := createPodConfigTester(ctx, PodConfigNotificationIncremental)
|
||||
@ -166,7 +169,8 @@ func TestNewPodAddedInvalidNamespace(t *testing.T) {
|
||||
}
|
||||
|
||||
func TestNewPodAddedDefaultNamespace(t *testing.T) {
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
_, ctx := ktesting.NewTestContext(t)
|
||||
ctx, cancel := context.WithCancel(ctx)
|
||||
defer cancel()
|
||||
|
||||
channel, ch, config := createPodConfigTester(ctx, PodConfigNotificationIncremental)
|
||||
@ -181,7 +185,8 @@ func TestNewPodAddedDefaultNamespace(t *testing.T) {
|
||||
}
|
||||
|
||||
func TestNewPodAddedDifferentNamespaces(t *testing.T) {
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
_, ctx := ktesting.NewTestContext(t)
|
||||
ctx, cancel := context.WithCancel(ctx)
|
||||
defer cancel()
|
||||
|
||||
channel, ch, config := createPodConfigTester(ctx, PodConfigNotificationIncremental)
|
||||
@ -201,7 +206,8 @@ func TestNewPodAddedDifferentNamespaces(t *testing.T) {
|
||||
}
|
||||
|
||||
func TestInvalidPodFiltered(t *testing.T) {
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
_, ctx := ktesting.NewTestContext(t)
|
||||
ctx, cancel := context.WithCancel(ctx)
|
||||
defer cancel()
|
||||
|
||||
channel, ch, _ := createPodConfigTester(ctx, PodConfigNotificationIncremental)
|
||||
@ -218,7 +224,8 @@ func TestInvalidPodFiltered(t *testing.T) {
|
||||
}
|
||||
|
||||
func TestNewPodAddedSnapshotAndUpdates(t *testing.T) {
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
_, ctx := ktesting.NewTestContext(t)
|
||||
ctx, cancel := context.WithCancel(ctx)
|
||||
defer cancel()
|
||||
|
||||
channel, ch, config := createPodConfigTester(ctx, PodConfigNotificationSnapshotAndUpdates)
|
||||
@ -239,7 +246,8 @@ func TestNewPodAddedSnapshotAndUpdates(t *testing.T) {
|
||||
}
|
||||
|
||||
func TestNewPodAddedSnapshot(t *testing.T) {
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
_, ctx := ktesting.NewTestContext(t)
|
||||
ctx, cancel := context.WithCancel(ctx)
|
||||
defer cancel()
|
||||
|
||||
channel, ch, config := createPodConfigTester(ctx, PodConfigNotificationSnapshot)
|
||||
@ -260,7 +268,8 @@ func TestNewPodAddedSnapshot(t *testing.T) {
|
||||
}
|
||||
|
||||
func TestNewPodAddedUpdatedRemoved(t *testing.T) {
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
_, ctx := ktesting.NewTestContext(t)
|
||||
ctx, cancel := context.WithCancel(ctx)
|
||||
defer cancel()
|
||||
|
||||
channel, ch, _ := createPodConfigTester(ctx, PodConfigNotificationIncremental)
|
||||
@ -286,7 +295,8 @@ func TestNewPodAddedUpdatedRemoved(t *testing.T) {
|
||||
}
|
||||
|
||||
func TestNewPodAddedDelete(t *testing.T) {
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
_, ctx := ktesting.NewTestContext(t)
|
||||
ctx, cancel := context.WithCancel(ctx)
|
||||
defer cancel()
|
||||
|
||||
channel, ch, _ := createPodConfigTester(ctx, PodConfigNotificationIncremental)
|
||||
@ -308,7 +318,8 @@ func TestNewPodAddedDelete(t *testing.T) {
|
||||
}
|
||||
|
||||
func TestNewPodAddedUpdatedSet(t *testing.T) {
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
_, ctx := ktesting.NewTestContext(t)
|
||||
ctx, cancel := context.WithCancel(ctx)
|
||||
defer cancel()
|
||||
|
||||
channel, ch, _ := createPodConfigTester(ctx, PodConfigNotificationIncremental)
|
||||
@ -333,7 +344,8 @@ func TestNewPodAddedUpdatedSet(t *testing.T) {
|
||||
}
|
||||
|
||||
func TestNewPodAddedSetReconciled(t *testing.T) {
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
_, ctx := ktesting.NewTestContext(t)
|
||||
ctx, cancel := context.WithCancel(ctx)
|
||||
defer cancel()
|
||||
|
||||
// Create and touch new test pods, return the new pods and touched pod. We should create new pod list
|
||||
@ -381,7 +393,8 @@ func TestNewPodAddedSetReconciled(t *testing.T) {
|
||||
}
|
||||
|
||||
func TestInitialEmptySet(t *testing.T) {
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
_, ctx := ktesting.NewTestContext(t)
|
||||
ctx, cancel := context.WithCancel(ctx)
|
||||
defer cancel()
|
||||
|
||||
for _, test := range []struct {
|
||||
@ -409,7 +422,8 @@ func TestInitialEmptySet(t *testing.T) {
|
||||
}
|
||||
|
||||
func TestPodUpdateAnnotations(t *testing.T) {
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
_, ctx := ktesting.NewTestContext(t)
|
||||
ctx, cancel := context.WithCancel(ctx)
|
||||
defer cancel()
|
||||
|
||||
channel, ch, _ := createPodConfigTester(ctx, PodConfigNotificationIncremental)
|
||||
@ -441,7 +455,8 @@ func TestPodUpdateAnnotations(t *testing.T) {
|
||||
}
|
||||
|
||||
func TestPodUpdateLabels(t *testing.T) {
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
_, ctx := ktesting.NewTestContext(t)
|
||||
ctx, cancel := context.WithCancel(ctx)
|
||||
defer cancel()
|
||||
|
||||
channel, ch, _ := createPodConfigTester(ctx, PodConfigNotificationIncremental)
|
||||
@ -464,7 +479,11 @@ func TestPodUpdateLabels(t *testing.T) {
|
||||
}
|
||||
|
||||
func TestPodConfigRace(t *testing.T) {
|
||||
eventBroadcaster := record.NewBroadcaster()
|
||||
_, ctx := ktesting.NewTestContext(t)
|
||||
ctx, cancel := context.WithCancel(ctx)
|
||||
defer cancel()
|
||||
|
||||
eventBroadcaster := record.NewBroadcaster(record.WithContext(ctx))
|
||||
config := NewPodConfig(PodConfigNotificationIncremental, eventBroadcaster.NewRecorder(scheme.Scheme, v1.EventSource{Component: "kubelet"}), &mockPodStartupSLIObserver{})
|
||||
seenSources := sets.NewString(TestSource)
|
||||
var wg sync.WaitGroup
|
||||
@ -472,7 +491,7 @@ func TestPodConfigRace(t *testing.T) {
|
||||
wg.Add(2)
|
||||
|
||||
go func() {
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
ctx, cancel := context.WithCancel(ctx)
|
||||
defer cancel()
|
||||
defer wg.Done()
|
||||
for i := 0; i < iterations; i++ {
|
||||
|
Loading…
Reference in New Issue
Block a user