mirror of
https://github.com/k3s-io/kubernetes.git
synced 2025-08-09 03:57:41 +00:00
Merge pull request #116930 from fatsheep9146/contextual-logging-cleanup
contextual logging cleanup
This commit is contained in:
commit
8c1bf4f461
@ -377,6 +377,7 @@ func startEndpointController(ctx context.Context, controllerCtx ControllerContex
|
||||
|
||||
func startReplicationController(ctx context.Context, controllerContext ControllerContext) (controller.Interface, bool, error) {
|
||||
go replicationcontroller.NewReplicationManager(
|
||||
klog.FromContext(ctx),
|
||||
controllerContext.InformerFactory.Core().V1().Pods(),
|
||||
controllerContext.InformerFactory.Core().V1().ReplicationControllers(),
|
||||
controllerContext.ClientBuilder.ClientOrDie("replication-controller"),
|
||||
|
@ -30,7 +30,6 @@ structured k8s.io/apiserver/pkg/server/options/encryptionconfig/.*
|
||||
# A few files involved in startup migrated already to contextual
|
||||
# We can't enable contextual logcheck until all are migrated
|
||||
contextual k8s.io/dynamic-resource-allocation/.*
|
||||
contextual k8s.io/kubernetes/cmd/kube-controller-manager/.*
|
||||
contextual k8s.io/kubernetes/cmd/kube-scheduler/.*
|
||||
contextual k8s.io/kubernetes/pkg/controller/.*
|
||||
contextual k8s.io/kubernetes/test/e2e/dra/.*
|
||||
@ -39,7 +38,6 @@ contextual k8s.io/kubernetes/test/e2e/dra/.*
|
||||
# this point it is easier to list the exceptions.
|
||||
-contextual k8s.io/kubernetes/pkg/controller/controller_ref_manager.go
|
||||
-contextual k8s.io/kubernetes/pkg/controller/controller_utils.go
|
||||
-contextual k8s.io/kubernetes/pkg/controller/deployment/.*
|
||||
-contextual k8s.io/kubernetes/pkg/controller/disruption/.*
|
||||
-contextual k8s.io/kubernetes/pkg/controller/endpoint/.*
|
||||
-contextual k8s.io/kubernetes/pkg/controller/endpointslice/.*
|
||||
@ -48,12 +46,6 @@ contextual k8s.io/kubernetes/test/e2e/dra/.*
|
||||
-contextual k8s.io/kubernetes/pkg/controller/nodeipam/.*
|
||||
-contextual k8s.io/kubernetes/pkg/controller/podgc/.*
|
||||
-contextual k8s.io/kubernetes/pkg/controller/replicaset/.*
|
||||
-contextual k8s.io/kubernetes/pkg/controller/util/.*
|
||||
-contextual k8s.io/kubernetes/pkg/controller/volume/attachdetach/attach_detach_controller.go
|
||||
-contextual k8s.io/kubernetes/pkg/controller/volume/attachdetach/testing/testvolumespec.go
|
||||
-contextual k8s.io/kubernetes/pkg/controller/volume/expand/expand_controller.go
|
||||
-contextual k8s.io/kubernetes/pkg/controller/volume/persistentvolume/pv_controller_test.go
|
||||
-contextual k8s.io/kubernetes/pkg/controller/volume/persistentvolume/volume_host.go
|
||||
-contextual k8s.io/kubernetes/pkg/controller/volume/pvcprotection/pvc_protection_controller_test.go
|
||||
-contextual k8s.io/kubernetes/pkg/controller/volume/pvprotection/pv_protection_controller_test.go
|
||||
|
||||
|
@ -27,7 +27,7 @@ import (
|
||||
"time"
|
||||
|
||||
apps "k8s.io/api/apps/v1"
|
||||
"k8s.io/api/core/v1"
|
||||
v1 "k8s.io/api/core/v1"
|
||||
"k8s.io/apimachinery/pkg/api/errors"
|
||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||
"k8s.io/apimachinery/pkg/labels"
|
||||
@ -582,7 +582,7 @@ func (dc *DeploymentController) syncDeployment(ctx context.Context, key string)
|
||||
logger := klog.FromContext(ctx)
|
||||
namespace, name, err := cache.SplitMetaNamespaceKey(key)
|
||||
if err != nil {
|
||||
klog.ErrorS(err, "Failed to split meta namespace cache key", "cacheKey", key)
|
||||
logger.Error(err, "Failed to split meta namespace cache key", "cacheKey", key)
|
||||
return err
|
||||
}
|
||||
|
||||
|
@ -22,7 +22,7 @@ import (
|
||||
"strconv"
|
||||
|
||||
apps "k8s.io/api/apps/v1"
|
||||
"k8s.io/api/core/v1"
|
||||
v1 "k8s.io/api/core/v1"
|
||||
extensions "k8s.io/api/extensions/v1beta1"
|
||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||
"k8s.io/klog/v2"
|
||||
@ -41,7 +41,7 @@ func (dc *DeploymentController) rollback(ctx context.Context, d *apps.Deployment
|
||||
rollbackTo := getRollbackTo(d)
|
||||
// If rollback revision is 0, rollback to the last revision
|
||||
if rollbackTo.Revision == 0 {
|
||||
if rollbackTo.Revision = deploymentutil.LastRevision(allRSs); rollbackTo.Revision == 0 {
|
||||
if rollbackTo.Revision = deploymentutil.LastRevision(logger, allRSs); rollbackTo.Revision == 0 {
|
||||
// If we still can't find the last revision, gives up rollback
|
||||
dc.emitRollbackWarningEvent(d, deploymentutil.RollbackRevisionNotFound, "Unable to find last revision.")
|
||||
// Gives up rollback
|
||||
|
@ -24,7 +24,7 @@ import (
|
||||
"strconv"
|
||||
|
||||
apps "k8s.io/api/apps/v1"
|
||||
"k8s.io/api/core/v1"
|
||||
v1 "k8s.io/api/core/v1"
|
||||
"k8s.io/apimachinery/pkg/api/errors"
|
||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||
"k8s.io/klog/v2"
|
||||
@ -140,7 +140,7 @@ func (dc *DeploymentController) getNewReplicaSet(ctx context.Context, d *apps.De
|
||||
existingNewRS := deploymentutil.FindNewReplicaSet(d, rsList)
|
||||
|
||||
// Calculate the max revision number among all old RSes
|
||||
maxOldRevision := deploymentutil.MaxRevision(oldRSs)
|
||||
maxOldRevision := deploymentutil.MaxRevision(logger, oldRSs)
|
||||
// Calculate revision number for this new replica set
|
||||
newRevision := strconv.FormatInt(maxOldRevision+1, 10)
|
||||
|
||||
|
@ -184,12 +184,12 @@ func SetDeploymentRevision(deployment *apps.Deployment, revision string) bool {
|
||||
}
|
||||
|
||||
// MaxRevision finds the highest revision in the replica sets
|
||||
func MaxRevision(allRSs []*apps.ReplicaSet) int64 {
|
||||
func MaxRevision(logger klog.Logger, allRSs []*apps.ReplicaSet) int64 {
|
||||
max := int64(0)
|
||||
for _, rs := range allRSs {
|
||||
if v, err := Revision(rs); err != nil {
|
||||
// Skip the replica sets when it failed to parse their revision information
|
||||
klog.V(4).Info("Couldn't parse revision for replica set, deployment controller will skip it when reconciling revisions", "replicaSet", klog.KObj(rs), "err", err)
|
||||
logger.V(4).Info("Couldn't parse revision for replica set, deployment controller will skip it when reconciling revisions", "replicaSet", klog.KObj(rs), "err", err)
|
||||
} else if v > max {
|
||||
max = v
|
||||
}
|
||||
@ -198,12 +198,12 @@ func MaxRevision(allRSs []*apps.ReplicaSet) int64 {
|
||||
}
|
||||
|
||||
// LastRevision finds the second max revision number in all replica sets (the last revision)
|
||||
func LastRevision(allRSs []*apps.ReplicaSet) int64 {
|
||||
func LastRevision(logger klog.Logger, allRSs []*apps.ReplicaSet) int64 {
|
||||
max, secMax := int64(0), int64(0)
|
||||
for _, rs := range allRSs {
|
||||
if v, err := Revision(rs); err != nil {
|
||||
// Skip the replica sets when it failed to parse their revision information
|
||||
klog.V(4).Info("Couldn't parse revision for replica set, deployment controller will skip it when reconciling revisions", "replicaSet", klog.KObj(rs), "err", err)
|
||||
logger.V(4).Info("Couldn't parse revision for replica set, deployment controller will skip it when reconciling revisions", "replicaSet", klog.KObj(rs), "err", err)
|
||||
} else if v >= max {
|
||||
secMax = max
|
||||
max = v
|
||||
|
@ -187,7 +187,7 @@ func (gc *GarbageCollector) Sync(ctx context.Context, discoveryClient discovery.
|
||||
logger := klog.FromContext(ctx)
|
||||
|
||||
// Get the current resource list from discovery.
|
||||
newResources := GetDeletableResources(discoveryClient)
|
||||
newResources := GetDeletableResources(logger, discoveryClient)
|
||||
|
||||
// This can occur if there is an internal error in GetDeletableResources.
|
||||
if len(newResources) == 0 {
|
||||
@ -214,7 +214,7 @@ func (gc *GarbageCollector) Sync(ctx context.Context, discoveryClient discovery.
|
||||
|
||||
// On a reattempt, check if available resources have changed
|
||||
if attempt > 1 {
|
||||
newResources = GetDeletableResources(discoveryClient)
|
||||
newResources = GetDeletableResources(logger, discoveryClient)
|
||||
if len(newResources) == 0 {
|
||||
logger.V(2).Info("no resources reported by discovery", "attempt", attempt)
|
||||
metrics.GarbageCollectorResourcesSyncError.Inc()
|
||||
@ -809,13 +809,13 @@ func (gc *GarbageCollector) GraphHasUID(u types.UID) bool {
|
||||
// All discovery errors are considered temporary. Upon encountering any error,
|
||||
// GetDeletableResources will log and return any discovered resources it was
|
||||
// able to process (which may be none).
|
||||
func GetDeletableResources(discoveryClient discovery.ServerResourcesInterface) map[schema.GroupVersionResource]struct{} {
|
||||
func GetDeletableResources(logger klog.Logger, discoveryClient discovery.ServerResourcesInterface) map[schema.GroupVersionResource]struct{} {
|
||||
preferredResources, err := discoveryClient.ServerPreferredResources()
|
||||
if err != nil {
|
||||
if discovery.IsGroupDiscoveryFailedError(err) {
|
||||
klog.Warningf("failed to discover some groups: %v", err.(*discovery.ErrGroupDiscoveryFailed).Groups)
|
||||
logger.Info("failed to discover some groups", "groups", err.(*discovery.ErrGroupDiscoveryFailed).Groups)
|
||||
} else {
|
||||
klog.Warningf("failed to discover preferred resources: %v", err)
|
||||
logger.Info("failed to discover preferred resources", "error", err)
|
||||
}
|
||||
}
|
||||
if preferredResources == nil {
|
||||
@ -829,7 +829,7 @@ func GetDeletableResources(discoveryClient discovery.ServerResourcesInterface) m
|
||||
for _, rl := range deletableResources {
|
||||
gv, err := schema.ParseGroupVersion(rl.GroupVersion)
|
||||
if err != nil {
|
||||
klog.Warningf("ignoring invalid discovered resource %q: %v", rl.GroupVersion, err)
|
||||
logger.Info("ignoring invalid discovered resource", "groupversion", rl.GroupVersion, "error", err)
|
||||
continue
|
||||
}
|
||||
for i := range rl.APIResources {
|
||||
|
@ -798,13 +798,14 @@ func TestGetDeletableResources(t *testing.T) {
|
||||
},
|
||||
}
|
||||
|
||||
logger, _ := ktesting.NewTestContext(t)
|
||||
for name, test := range tests {
|
||||
t.Logf("testing %q", name)
|
||||
client := &fakeServerResources{
|
||||
PreferredResources: test.serverResources,
|
||||
Error: test.err,
|
||||
}
|
||||
actual := GetDeletableResources(client)
|
||||
actual := GetDeletableResources(logger, client)
|
||||
if !reflect.DeepEqual(test.deletableResources, actual) {
|
||||
t.Errorf("expected resources:\n%v\ngot:\n%v", test.deletableResources, actual)
|
||||
}
|
||||
|
@ -131,7 +131,7 @@ func NewCloudCIDRAllocator(logger klog.Logger, client clientset.Interface, cloud
|
||||
}
|
||||
return nil
|
||||
}),
|
||||
DeleteFunc: controllerutil.CreateDeleteNodeHandler(func(node *v1.Node) error {
|
||||
DeleteFunc: controllerutil.CreateDeleteNodeHandler(logger, func(node *v1.Node) error {
|
||||
return ca.ReleaseCIDR(logger, node)
|
||||
}),
|
||||
})
|
||||
@ -272,11 +272,11 @@ func (ca *cloudCIDRAllocator) updateCIDRAllocation(logger klog.Logger, nodeName
|
||||
|
||||
cidrStrings, err := ca.cloud.AliasRangesByProviderID(node.Spec.ProviderID)
|
||||
if err != nil {
|
||||
controllerutil.RecordNodeStatusChange(ca.recorder, node, "CIDRNotAvailable")
|
||||
controllerutil.RecordNodeStatusChange(logger, ca.recorder, node, "CIDRNotAvailable")
|
||||
return fmt.Errorf("failed to get cidr(s) from provider: %v", err)
|
||||
}
|
||||
if len(cidrStrings) == 0 {
|
||||
controllerutil.RecordNodeStatusChange(ca.recorder, node, "CIDRNotAvailable")
|
||||
controllerutil.RecordNodeStatusChange(logger, ca.recorder, node, "CIDRNotAvailable")
|
||||
return fmt.Errorf("failed to allocate cidr: Node %v has no CIDRs", node.Name)
|
||||
}
|
||||
//Can have at most 2 ips (one for v4 and one for v6)
|
||||
@ -311,7 +311,7 @@ func (ca *cloudCIDRAllocator) updateCIDRAllocation(logger klog.Logger, nodeName
|
||||
}
|
||||
}
|
||||
if err != nil {
|
||||
controllerutil.RecordNodeStatusChange(ca.recorder, node, "CIDRAssignmentFailed")
|
||||
controllerutil.RecordNodeStatusChange(logger, ca.recorder, node, "CIDRAssignmentFailed")
|
||||
logger.Error(err, "Failed to update the node PodCIDR after multiple attempts", "node", klog.KObj(node), "cidrStrings", cidrStrings)
|
||||
return err
|
||||
}
|
||||
|
@ -150,7 +150,7 @@ func (c *Controller) Start(logger klog.Logger, nodeInformer informers.NodeInform
|
||||
UpdateFunc: controllerutil.CreateUpdateNodeHandler(func(_, newNode *v1.Node) error {
|
||||
return c.onUpdate(logger, newNode)
|
||||
}),
|
||||
DeleteFunc: controllerutil.CreateDeleteNodeHandler(func(node *v1.Node) error {
|
||||
DeleteFunc: controllerutil.CreateDeleteNodeHandler(logger, func(node *v1.Node) error {
|
||||
return c.onDelete(logger, node)
|
||||
}),
|
||||
})
|
||||
|
@ -286,6 +286,7 @@ func (r *multiCIDRRangeAllocator) runCIDRWorker(ctx context.Context) {
|
||||
// processNextWorkItem will read a single work item off the cidrQueue and
|
||||
// attempt to process it, by calling the syncHandler.
|
||||
func (r *multiCIDRRangeAllocator) processNextCIDRWorkItem(ctx context.Context) bool {
|
||||
logger := klog.FromContext(ctx)
|
||||
obj, shutdown := r.cidrQueue.Get()
|
||||
if shutdown {
|
||||
return false
|
||||
@ -325,7 +326,7 @@ func (r *multiCIDRRangeAllocator) processNextCIDRWorkItem(ctx context.Context) b
|
||||
// Finally, if no error occurs we Forget this item so it does not
|
||||
// get cidrQueued again until another change happens.
|
||||
r.cidrQueue.Forget(obj)
|
||||
klog.Infof("Successfully synced '%s'", key)
|
||||
logger.Info("Successfully synced", "key", key)
|
||||
return nil
|
||||
}(ctx, obj)
|
||||
|
||||
@ -384,7 +385,7 @@ func (r *multiCIDRRangeAllocator) processNextNodeWorkItem(ctx context.Context) b
|
||||
// Finally, if no error occurs we Forget this item so it does not
|
||||
// get nodeQueue again until another change happens.
|
||||
r.nodeQueue.Forget(obj)
|
||||
klog.Infof("Successfully synced '%s'", key)
|
||||
logger.Info("Successfully synced", "key", key)
|
||||
return nil
|
||||
}(klog.FromContext(ctx), obj)
|
||||
|
||||
@ -399,12 +400,12 @@ func (r *multiCIDRRangeAllocator) processNextNodeWorkItem(ctx context.Context) b
|
||||
func (r *multiCIDRRangeAllocator) syncNode(logger klog.Logger, key string) error {
|
||||
startTime := time.Now()
|
||||
defer func() {
|
||||
klog.V(4).Infof("Finished syncing Node request %q (%v)", key, time.Since(startTime))
|
||||
logger.V(4).Info("Finished syncing Node request", "node", key, "elapsed", time.Since(startTime))
|
||||
}()
|
||||
|
||||
node, err := r.nodeLister.Get(key)
|
||||
if apierrors.IsNotFound(err) {
|
||||
klog.V(3).Infof("node has been deleted: %v", key)
|
||||
logger.V(3).Info("node has been deleted", "node", key)
|
||||
// TODO: obtain the node object information to call ReleaseCIDR from here
|
||||
// and retry if there is an error.
|
||||
return nil
|
||||
@ -414,7 +415,7 @@ func (r *multiCIDRRangeAllocator) syncNode(logger klog.Logger, key string) error
|
||||
}
|
||||
// Check the DeletionTimestamp to determine if object is under deletion.
|
||||
if !node.DeletionTimestamp.IsZero() {
|
||||
klog.V(3).Infof("node is being deleted: %v", key)
|
||||
logger.V(3).Info("node is being deleted", "node", key)
|
||||
return r.ReleaseCIDR(logger, node)
|
||||
}
|
||||
return r.AllocateOrOccupyCIDR(logger, node)
|
||||
@ -557,12 +558,12 @@ func (r *multiCIDRRangeAllocator) AllocateOrOccupyCIDR(logger klog.Logger, node
|
||||
|
||||
cidrs, clusterCIDR, err := r.prioritizedCIDRs(logger, node)
|
||||
if err != nil {
|
||||
controllerutil.RecordNodeStatusChange(r.recorder, node, "CIDRNotAvailable")
|
||||
controllerutil.RecordNodeStatusChange(logger, r.recorder, node, "CIDRNotAvailable")
|
||||
return fmt.Errorf("failed to get cidrs for node %s", node.Name)
|
||||
}
|
||||
|
||||
if len(cidrs) == 0 {
|
||||
controllerutil.RecordNodeStatusChange(r.recorder, node, "CIDRNotAvailable")
|
||||
controllerutil.RecordNodeStatusChange(logger, r.recorder, node, "CIDRNotAvailable")
|
||||
return fmt.Errorf("no cidrSets with matching labels found for node %s", node.Name)
|
||||
}
|
||||
|
||||
@ -696,7 +697,7 @@ func (r *multiCIDRRangeAllocator) updateCIDRsAllocation(logger klog.Logger, data
|
||||
}
|
||||
// failed release back to the pool.
|
||||
logger.Error(err, "Failed to update node PodCIDR after attempts", "node", klog.KObj(node), "podCIDR", cidrsString, "retries", cidrUpdateRetries)
|
||||
controllerutil.RecordNodeStatusChange(r.recorder, node, "CIDRAssignmentFailed")
|
||||
controllerutil.RecordNodeStatusChange(logger, r.recorder, node, "CIDRAssignmentFailed")
|
||||
// We accept the fact that we may leak CIDRs here. This is safer than releasing
|
||||
// them in case when we don't know if request went through.
|
||||
// NodeController restart will return all falsely allocated CIDRs to the pool.
|
||||
@ -1151,7 +1152,7 @@ func (r *multiCIDRRangeAllocator) reconcileDelete(ctx context.Context, clusterCI
|
||||
if slice.ContainsString(clusterCIDR.GetFinalizers(), clusterCIDRFinalizer, nil) {
|
||||
logger.V(2).Info("Releasing ClusterCIDR", "clusterCIDR", clusterCIDR.Name)
|
||||
if err := r.deleteClusterCIDR(logger, clusterCIDR); err != nil {
|
||||
klog.V(2).Info("Error while deleting ClusterCIDR", "err", err)
|
||||
logger.V(2).Info("Error while deleting ClusterCIDR", "err", err)
|
||||
return err
|
||||
}
|
||||
// Remove the finalizer as delete is successful.
|
||||
|
@ -157,7 +157,7 @@ func NewCIDRRangeAllocator(logger klog.Logger, client clientset.Interface, nodeI
|
||||
}
|
||||
return nil
|
||||
}),
|
||||
DeleteFunc: controllerutil.CreateDeleteNodeHandler(func(node *v1.Node) error {
|
||||
DeleteFunc: controllerutil.CreateDeleteNodeHandler(logger, func(node *v1.Node) error {
|
||||
return ra.ReleaseCIDR(logger, node)
|
||||
}),
|
||||
})
|
||||
@ -274,7 +274,7 @@ func (r *rangeAllocator) AllocateOrOccupyCIDR(logger klog.Logger, node *v1.Node)
|
||||
podCIDR, err := r.cidrSets[idx].AllocateNext()
|
||||
if err != nil {
|
||||
r.removeNodeFromProcessing(node.Name)
|
||||
controllerutil.RecordNodeStatusChange(r.recorder, node, "CIDRNotAvailable")
|
||||
controllerutil.RecordNodeStatusChange(logger, r.recorder, node, "CIDRNotAvailable")
|
||||
return fmt.Errorf("failed to allocate cidr from cluster cidr at idx:%v: %v", idx, err)
|
||||
}
|
||||
allocated.allocatedCIDRs[idx] = podCIDR
|
||||
@ -383,7 +383,7 @@ func (r *rangeAllocator) updateCIDRsAllocation(logger klog.Logger, data nodeRese
|
||||
}
|
||||
// failed release back to the pool
|
||||
logger.Error(err, "Failed to update node PodCIDR after multiple attempts", "node", klog.KObj(node), "podCIDRs", cidrsString)
|
||||
controllerutil.RecordNodeStatusChange(r.recorder, node, "CIDRAssignmentFailed")
|
||||
controllerutil.RecordNodeStatusChange(logger, r.recorder, node, "CIDRAssignmentFailed")
|
||||
// We accept the fact that we may leak CIDRs here. This is safer than releasing
|
||||
// them in case when we don't know if request went through.
|
||||
// NodeController restart will return all falsely allocated CIDRs to the pool.
|
||||
|
@ -58,6 +58,8 @@ type fakeAPIs struct {
|
||||
calls []string
|
||||
events []fakeEvent
|
||||
results []error
|
||||
|
||||
logger klog.Logger
|
||||
}
|
||||
|
||||
func (f *fakeAPIs) Alias(ctx context.Context, node *v1.Node) (*net.IPNet, error) {
|
||||
@ -90,7 +92,7 @@ func (f *fakeAPIs) EmitNodeWarningEvent(nodeName, reason, fmtStr string, args ..
|
||||
}
|
||||
|
||||
func (f *fakeAPIs) ReportResult(err error) {
|
||||
klog.V(2).Infof("ReportResult %v", err)
|
||||
f.logger.V(2).Info("ReportResult", "err", err)
|
||||
f.results = append(f.results, err)
|
||||
if f.reportChan != nil {
|
||||
f.reportChan <- struct{}{}
|
||||
@ -106,7 +108,7 @@ func (f *fakeAPIs) ResyncTimeout() time.Duration {
|
||||
|
||||
func (f *fakeAPIs) dumpTrace() {
|
||||
for i, x := range f.calls {
|
||||
klog.Infof("trace %v: %v", i, x)
|
||||
f.logger.Info("trace", "index", i, "call", x)
|
||||
}
|
||||
}
|
||||
|
||||
@ -196,12 +198,13 @@ func TestNodeSyncUpdate(t *testing.T) {
|
||||
wantError: false,
|
||||
},
|
||||
} {
|
||||
logger, _ := ktesting.NewTestContext(t)
|
||||
cidr, _ := cidrset.NewCIDRSet(clusterCIDRRange, 24)
|
||||
tc.fake.logger = logger
|
||||
sync := New(&tc.fake, &tc.fake, &tc.fake, tc.mode, "node1", cidr)
|
||||
doneChan := make(chan struct{})
|
||||
|
||||
// Do a single step of the loop.
|
||||
logger, _ := ktesting.NewTestContext(t)
|
||||
go sync.Loop(logger, doneChan)
|
||||
sync.Update(tc.node)
|
||||
close(sync.opChan)
|
||||
@ -224,15 +227,16 @@ func TestNodeSyncUpdate(t *testing.T) {
|
||||
}
|
||||
|
||||
func TestNodeSyncResync(t *testing.T) {
|
||||
logger, _ := ktesting.NewTestContext(t)
|
||||
fake := &fakeAPIs{
|
||||
nodeRet: nodeWithCIDRRange,
|
||||
resyncTimeout: time.Millisecond,
|
||||
reportChan: make(chan struct{}),
|
||||
logger: logger,
|
||||
}
|
||||
cidr, _ := cidrset.NewCIDRSet(clusterCIDRRange, 24)
|
||||
sync := New(fake, fake, fake, SyncFromCluster, "node1", cidr)
|
||||
doneChan := make(chan struct{})
|
||||
logger, _ := ktesting.NewTestContext(t)
|
||||
go sync.Loop(logger, doneChan)
|
||||
<-fake.reportChan
|
||||
close(sync.opChan)
|
||||
@ -272,12 +276,13 @@ func TestNodeSyncDelete(t *testing.T) {
|
||||
},
|
||||
},
|
||||
} {
|
||||
logger, _ := ktesting.NewTestContext(t)
|
||||
cidr, _ := cidrset.NewCIDRSet(clusterCIDRRange, 24)
|
||||
tc.fake.logger = logger
|
||||
sync := New(&tc.fake, &tc.fake, &tc.fake, tc.mode, "node1", cidr)
|
||||
doneChan := make(chan struct{})
|
||||
|
||||
// Do a single step of the loop.
|
||||
logger, _ := ktesting.NewTestContext(t)
|
||||
go sync.Loop(logger, doneChan)
|
||||
sync.Delete(tc.node)
|
||||
<-doneChan
|
||||
|
@ -422,7 +422,7 @@ func NewNodeLifecycleController(
|
||||
nc.taintManager.NodeUpdated(oldNode, newNode)
|
||||
return nil
|
||||
}),
|
||||
DeleteFunc: controllerutil.CreateDeleteNodeHandler(func(node *v1.Node) error {
|
||||
DeleteFunc: controllerutil.CreateDeleteNodeHandler(logger, func(node *v1.Node) error {
|
||||
nc.taintManager.NodeUpdated(node, nil)
|
||||
return nil
|
||||
}),
|
||||
@ -438,7 +438,7 @@ func NewNodeLifecycleController(
|
||||
nc.nodeUpdateQueue.Add(newNode.Name)
|
||||
return nil
|
||||
}),
|
||||
DeleteFunc: controllerutil.CreateDeleteNodeHandler(func(node *v1.Node) error {
|
||||
DeleteFunc: controllerutil.CreateDeleteNodeHandler(logger, func(node *v1.Node) error {
|
||||
nc.nodesToRetry.Delete(node.Name)
|
||||
return nil
|
||||
}),
|
||||
@ -744,7 +744,7 @@ func (nc *Controller) monitorNodeHealth(ctx context.Context) error {
|
||||
switch {
|
||||
case currentReadyCondition.Status != v1.ConditionTrue && observedReadyCondition.Status == v1.ConditionTrue:
|
||||
// Report node event only once when status changed.
|
||||
controllerutil.RecordNodeStatusChange(nc.recorder, node, "NodeNotReady")
|
||||
controllerutil.RecordNodeStatusChange(logger, nc.recorder, node, "NodeNotReady")
|
||||
fallthrough
|
||||
case needsRetry && observedReadyCondition.Status != v1.ConditionTrue:
|
||||
if err = controllerutil.MarkPodsNotReady(ctx, nc.kubeClient, nc.recorder, pods, node.Name); err != nil {
|
||||
|
@ -121,7 +121,7 @@ func NewReplicaSetController(logger klog.Logger, rsInformer appsinformers.Replic
|
||||
if err := metrics.Register(legacyregistry.Register); err != nil {
|
||||
logger.Error(err, "unable to register metrics")
|
||||
}
|
||||
return NewBaseController(rsInformer, podInformer, kubeClient, burstReplicas,
|
||||
return NewBaseController(logger, rsInformer, podInformer, kubeClient, burstReplicas,
|
||||
apps.SchemeGroupVersion.WithKind("ReplicaSet"),
|
||||
"replicaset_controller",
|
||||
"replicaset",
|
||||
@ -135,7 +135,7 @@ func NewReplicaSetController(logger klog.Logger, rsInformer appsinformers.Replic
|
||||
|
||||
// NewBaseController is the implementation of NewReplicaSetController with additional injected
|
||||
// parameters so that it can also serve as the implementation of NewReplicationController.
|
||||
func NewBaseController(rsInformer appsinformers.ReplicaSetInformer, podInformer coreinformers.PodInformer, kubeClient clientset.Interface, burstReplicas int,
|
||||
func NewBaseController(logger klog.Logger, rsInformer appsinformers.ReplicaSetInformer, podInformer coreinformers.PodInformer, kubeClient clientset.Interface, burstReplicas int,
|
||||
gvk schema.GroupVersionKind, metricOwnerName, queueName string, podControl controller.PodControlInterface, eventBroadcaster record.EventBroadcaster) *ReplicaSetController {
|
||||
|
||||
rsc := &ReplicaSetController{
|
||||
@ -149,9 +149,15 @@ func NewBaseController(rsInformer appsinformers.ReplicaSetInformer, podInformer
|
||||
}
|
||||
|
||||
rsInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
|
||||
AddFunc: rsc.addRS,
|
||||
UpdateFunc: rsc.updateRS,
|
||||
DeleteFunc: rsc.deleteRS,
|
||||
AddFunc: func(obj interface{}) {
|
||||
rsc.addRS(logger, obj)
|
||||
},
|
||||
UpdateFunc: func(oldObj, newObj interface{}) {
|
||||
rsc.updateRS(logger, oldObj, newObj)
|
||||
},
|
||||
DeleteFunc: func(obj interface{}) {
|
||||
rsc.deleteRS(logger, obj)
|
||||
},
|
||||
})
|
||||
rsInformer.Informer().AddIndexers(cache.Indexers{
|
||||
controllerUIDIndex: func(obj interface{}) ([]string, error) {
|
||||
@ -171,12 +177,18 @@ func NewBaseController(rsInformer appsinformers.ReplicaSetInformer, podInformer
|
||||
rsc.rsListerSynced = rsInformer.Informer().HasSynced
|
||||
|
||||
podInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
|
||||
AddFunc: rsc.addPod,
|
||||
AddFunc: func(obj interface{}) {
|
||||
rsc.addPod(logger, obj)
|
||||
},
|
||||
// This invokes the ReplicaSet for every pod change, eg: host assignment. Though this might seem like
|
||||
// overkill the most frequent pod update is status, and the associated ReplicaSet will only list from
|
||||
// local storage, so it should be ok.
|
||||
UpdateFunc: rsc.updatePod,
|
||||
DeleteFunc: rsc.deletePod,
|
||||
UpdateFunc: func(oldObj, newObj interface{}) {
|
||||
rsc.updatePod(logger, oldObj, newObj)
|
||||
},
|
||||
DeleteFunc: func(obj interface{}) {
|
||||
rsc.deletePod(logger, obj)
|
||||
},
|
||||
})
|
||||
rsc.podLister = podInformer.Lister()
|
||||
rsc.podListerSynced = podInformer.Informer().HasSynced
|
||||
@ -293,14 +305,14 @@ func (rsc *ReplicaSetController) enqueueRSAfter(rs *apps.ReplicaSet, duration ti
|
||||
rsc.queue.AddAfter(key, duration)
|
||||
}
|
||||
|
||||
func (rsc *ReplicaSetController) addRS(obj interface{}) {
|
||||
func (rsc *ReplicaSetController) addRS(logger klog.Logger, obj interface{}) {
|
||||
rs := obj.(*apps.ReplicaSet)
|
||||
klog.V(4).Infof("Adding %s %s/%s", rsc.Kind, rs.Namespace, rs.Name)
|
||||
logger.V(4).Info("Adding", "replicaSet", klog.KObj(rs))
|
||||
rsc.enqueueRS(rs)
|
||||
}
|
||||
|
||||
// callback when RS is updated
|
||||
func (rsc *ReplicaSetController) updateRS(old, cur interface{}) {
|
||||
func (rsc *ReplicaSetController) updateRS(logger klog.Logger, old, cur interface{}) {
|
||||
oldRS := old.(*apps.ReplicaSet)
|
||||
curRS := cur.(*apps.ReplicaSet)
|
||||
|
||||
@ -311,7 +323,7 @@ func (rsc *ReplicaSetController) updateRS(old, cur interface{}) {
|
||||
utilruntime.HandleError(fmt.Errorf("couldn't get key for object %#v: %v", oldRS, err))
|
||||
return
|
||||
}
|
||||
rsc.deleteRS(cache.DeletedFinalStateUnknown{
|
||||
rsc.deleteRS(logger, cache.DeletedFinalStateUnknown{
|
||||
Key: key,
|
||||
Obj: oldRS,
|
||||
})
|
||||
@ -330,12 +342,12 @@ func (rsc *ReplicaSetController) updateRS(old, cur interface{}) {
|
||||
// that bad as ReplicaSets that haven't met expectations yet won't
|
||||
// sync, and all the listing is done using local stores.
|
||||
if *(oldRS.Spec.Replicas) != *(curRS.Spec.Replicas) {
|
||||
klog.V(4).Infof("%v %v updated. Desired pod count change: %d->%d", rsc.Kind, curRS.Name, *(oldRS.Spec.Replicas), *(curRS.Spec.Replicas))
|
||||
logger.V(4).Info("replicaSet updated. Desired pod count change.", "replicaSet", klog.KObj(oldRS), "oldReplicas", *(oldRS.Spec.Replicas), "newReplicas", *(curRS.Spec.Replicas))
|
||||
}
|
||||
rsc.enqueueRS(curRS)
|
||||
}
|
||||
|
||||
func (rsc *ReplicaSetController) deleteRS(obj interface{}) {
|
||||
func (rsc *ReplicaSetController) deleteRS(logger klog.Logger, obj interface{}) {
|
||||
rs, ok := obj.(*apps.ReplicaSet)
|
||||
if !ok {
|
||||
tombstone, ok := obj.(cache.DeletedFinalStateUnknown)
|
||||
@ -356,7 +368,7 @@ func (rsc *ReplicaSetController) deleteRS(obj interface{}) {
|
||||
return
|
||||
}
|
||||
|
||||
klog.V(4).Infof("Deleting %s %q", rsc.Kind, key)
|
||||
logger.V(4).Info("Deleting", "replicaSet", klog.KObj(rs))
|
||||
|
||||
// Delete expectations for the ReplicaSet so if we create a new one with the same name it starts clean
|
||||
rsc.expectations.DeleteExpectations(key)
|
||||
@ -365,13 +377,13 @@ func (rsc *ReplicaSetController) deleteRS(obj interface{}) {
|
||||
}
|
||||
|
||||
// When a pod is created, enqueue the replica set that manages it and update its expectations.
|
||||
func (rsc *ReplicaSetController) addPod(obj interface{}) {
|
||||
func (rsc *ReplicaSetController) addPod(logger klog.Logger, obj interface{}) {
|
||||
pod := obj.(*v1.Pod)
|
||||
|
||||
if pod.DeletionTimestamp != nil {
|
||||
// on a restart of the controller manager, it's possible a new pod shows up in a state that
|
||||
// is already pending deletion. Prevent the pod from being a creation observation.
|
||||
rsc.deletePod(pod)
|
||||
rsc.deletePod(logger, pod)
|
||||
return
|
||||
}
|
||||
|
||||
@ -385,7 +397,7 @@ func (rsc *ReplicaSetController) addPod(obj interface{}) {
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
klog.V(4).Infof("Pod %s created: %#v.", pod.Name, pod)
|
||||
logger.V(4).Info("Pod created", "pod", klog.KObj(pod), "detail", pod)
|
||||
rsc.expectations.CreationObserved(rsKey)
|
||||
rsc.queue.Add(rsKey)
|
||||
return
|
||||
@ -399,7 +411,7 @@ func (rsc *ReplicaSetController) addPod(obj interface{}) {
|
||||
if len(rss) == 0 {
|
||||
return
|
||||
}
|
||||
klog.V(4).Infof("Orphan Pod %s created: %#v.", pod.Name, pod)
|
||||
logger.V(4).Info("Orphan Pod created", "pod", klog.KObj(pod), "detail", pod)
|
||||
for _, rs := range rss {
|
||||
rsc.enqueueRS(rs)
|
||||
}
|
||||
@ -408,7 +420,7 @@ func (rsc *ReplicaSetController) addPod(obj interface{}) {
|
||||
// When a pod is updated, figure out what replica set/s manage it and wake them
|
||||
// up. If the labels of the pod have changed we need to awaken both the old
|
||||
// and new replica set. old and cur must be *v1.Pod types.
|
||||
func (rsc *ReplicaSetController) updatePod(old, cur interface{}) {
|
||||
func (rsc *ReplicaSetController) updatePod(logger klog.Logger, old, cur interface{}) {
|
||||
curPod := cur.(*v1.Pod)
|
||||
oldPod := old.(*v1.Pod)
|
||||
if curPod.ResourceVersion == oldPod.ResourceVersion {
|
||||
@ -424,10 +436,10 @@ func (rsc *ReplicaSetController) updatePod(old, cur interface{}) {
|
||||
// for modification of the deletion timestamp and expect an rs to create more replicas asap, not wait
|
||||
// until the kubelet actually deletes the pod. This is different from the Phase of a pod changing, because
|
||||
// an rs never initiates a phase change, and so is never asleep waiting for the same.
|
||||
rsc.deletePod(curPod)
|
||||
rsc.deletePod(logger, curPod)
|
||||
if labelChanged {
|
||||
// we don't need to check the oldPod.DeletionTimestamp because DeletionTimestamp cannot be unset.
|
||||
rsc.deletePod(oldPod)
|
||||
rsc.deletePod(logger, oldPod)
|
||||
}
|
||||
return
|
||||
}
|
||||
@ -448,7 +460,7 @@ func (rsc *ReplicaSetController) updatePod(old, cur interface{}) {
|
||||
if rs == nil {
|
||||
return
|
||||
}
|
||||
klog.V(4).Infof("Pod %s updated, objectMeta %+v -> %+v.", curPod.Name, oldPod.ObjectMeta, curPod.ObjectMeta)
|
||||
logger.V(4).Info("Pod objectMeta updated.", "pod", klog.KObj(oldPod), "oldObjectMeta", oldPod.ObjectMeta, "curObjectMeta", curPod.ObjectMeta)
|
||||
rsc.enqueueRS(rs)
|
||||
// TODO: MinReadySeconds in the Pod will generate an Available condition to be added in
|
||||
// the Pod status which in turn will trigger a requeue of the owning replica set thus
|
||||
@ -458,7 +470,7 @@ func (rsc *ReplicaSetController) updatePod(old, cur interface{}) {
|
||||
// Note that this still suffers from #29229, we are just moving the problem one level
|
||||
// "closer" to kubelet (from the deployment to the replica set controller).
|
||||
if !podutil.IsPodReady(oldPod) && podutil.IsPodReady(curPod) && rs.Spec.MinReadySeconds > 0 {
|
||||
klog.V(2).Infof("%v %q will be enqueued after %ds for availability check", rsc.Kind, rs.Name, rs.Spec.MinReadySeconds)
|
||||
logger.V(2).Info("pod will be enqueued after a while for availability check", "duration", rs.Spec.MinReadySeconds, "kind", rsc.Kind, "pod", klog.KObj(oldPod))
|
||||
// Add a second to avoid milliseconds skew in AddAfter.
|
||||
// See https://github.com/kubernetes/kubernetes/issues/39785#issuecomment-279959133 for more info.
|
||||
rsc.enqueueRSAfter(rs, (time.Duration(rs.Spec.MinReadySeconds)*time.Second)+time.Second)
|
||||
@ -473,7 +485,7 @@ func (rsc *ReplicaSetController) updatePod(old, cur interface{}) {
|
||||
if len(rss) == 0 {
|
||||
return
|
||||
}
|
||||
klog.V(4).Infof("Orphan Pod %s updated, objectMeta %+v -> %+v.", curPod.Name, oldPod.ObjectMeta, curPod.ObjectMeta)
|
||||
logger.V(4).Info("Orphan Pod objectMeta updated.", "pod", klog.KObj(oldPod), "oldObjectMeta", oldPod.ObjectMeta, "curObjectMeta", curPod.ObjectMeta)
|
||||
for _, rs := range rss {
|
||||
rsc.enqueueRS(rs)
|
||||
}
|
||||
@ -482,7 +494,7 @@ func (rsc *ReplicaSetController) updatePod(old, cur interface{}) {
|
||||
|
||||
// When a pod is deleted, enqueue the replica set that manages the pod and update its expectations.
|
||||
// obj could be an *v1.Pod, or a DeletionFinalStateUnknown marker item.
|
||||
func (rsc *ReplicaSetController) deletePod(obj interface{}) {
|
||||
func (rsc *ReplicaSetController) deletePod(logger klog.Logger, obj interface{}) {
|
||||
pod, ok := obj.(*v1.Pod)
|
||||
|
||||
// When a delete is dropped, the relist will notice a pod in the store not
|
||||
@ -516,7 +528,7 @@ func (rsc *ReplicaSetController) deletePod(obj interface{}) {
|
||||
utilruntime.HandleError(fmt.Errorf("couldn't get key for object %#v: %v", rs, err))
|
||||
return
|
||||
}
|
||||
klog.V(4).Infof("Pod %s/%s deleted through %v, timestamp %+v: %#v.", pod.Namespace, pod.Name, utilruntime.GetCaller(), pod.DeletionTimestamp, pod)
|
||||
logger.V(4).Info("Pod deleted", "delete_by", utilruntime.GetCaller(), "deletion_timestamp", pod.DeletionTimestamp, "pod", klog.KObj(pod))
|
||||
rsc.expectations.DeletionObserved(rsKey, controller.PodKey(pod))
|
||||
rsc.queue.Add(rsKey)
|
||||
}
|
||||
|
@ -182,7 +182,7 @@ func newPodList(store cache.Store, count int, status v1.PodPhase, labelMap map[s
|
||||
|
||||
// processSync initiates a sync via processNextWorkItem() to test behavior that
|
||||
// depends on both functions (such as re-queueing on sync error).
|
||||
func processSync(rsc *ReplicaSetController, key string) error {
|
||||
func processSync(ctx context.Context, rsc *ReplicaSetController, key string) error {
|
||||
// Save old syncHandler and replace with one that captures the error.
|
||||
oldSyncHandler := rsc.syncHandler
|
||||
defer func() {
|
||||
@ -194,7 +194,7 @@ func processSync(rsc *ReplicaSetController, key string) error {
|
||||
return syncErr
|
||||
}
|
||||
rsc.queue.Add(key)
|
||||
rsc.processNextWorkItem(context.TODO())
|
||||
rsc.processNextWorkItem(ctx)
|
||||
return syncErr
|
||||
}
|
||||
|
||||
@ -215,6 +215,7 @@ func validateSyncReplicaSet(fakePodControl *controller.FakePodControl, expectedC
|
||||
}
|
||||
|
||||
func TestSyncReplicaSetDoesNothing(t *testing.T) {
|
||||
_, ctx := ktesting.NewTestContext(t)
|
||||
client := clientset.NewForConfigOrDie(&restclient.Config{Host: "", ContentConfig: restclient.ContentConfig{GroupVersion: &schema.GroupVersion{Group: "", Version: "v1"}}})
|
||||
fakePodControl := controller.FakePodControl{}
|
||||
stopCh := make(chan struct{})
|
||||
@ -228,7 +229,7 @@ func TestSyncReplicaSetDoesNothing(t *testing.T) {
|
||||
newPodList(informers.Core().V1().Pods().Informer().GetIndexer(), 2, v1.PodRunning, labelMap, rsSpec, "pod")
|
||||
|
||||
manager.podControl = &fakePodControl
|
||||
manager.syncReplicaSet(context.TODO(), GetKey(rsSpec, t))
|
||||
manager.syncReplicaSet(ctx, GetKey(rsSpec, t))
|
||||
err := validateSyncReplicaSet(&fakePodControl, 0, 0, 0)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
@ -236,6 +237,7 @@ func TestSyncReplicaSetDoesNothing(t *testing.T) {
|
||||
}
|
||||
|
||||
func TestDeleteFinalStateUnknown(t *testing.T) {
|
||||
logger, ctx := ktesting.NewTestContext(t)
|
||||
client := clientset.NewForConfigOrDie(&restclient.Config{Host: "", ContentConfig: restclient.ContentConfig{GroupVersion: &schema.GroupVersion{Group: "", Version: "v1"}}})
|
||||
fakePodControl := controller.FakePodControl{}
|
||||
stopCh := make(chan struct{})
|
||||
@ -255,9 +257,9 @@ func TestDeleteFinalStateUnknown(t *testing.T) {
|
||||
rsSpec := newReplicaSet(1, labelMap)
|
||||
informers.Apps().V1().ReplicaSets().Informer().GetIndexer().Add(rsSpec)
|
||||
pods := newPodList(nil, 1, v1.PodRunning, labelMap, rsSpec, "pod")
|
||||
manager.deletePod(cache.DeletedFinalStateUnknown{Key: "foo", Obj: &pods.Items[0]})
|
||||
manager.deletePod(logger, cache.DeletedFinalStateUnknown{Key: "foo", Obj: &pods.Items[0]})
|
||||
|
||||
go manager.worker(context.TODO())
|
||||
go manager.worker(ctx)
|
||||
|
||||
expected := GetKey(rsSpec, t)
|
||||
select {
|
||||
@ -273,6 +275,7 @@ func TestDeleteFinalStateUnknown(t *testing.T) {
|
||||
// Tell the rs to create 100 replicas, but simulate a limit (like a quota limit)
|
||||
// of 10, and verify that the rs doesn't make 100 create calls per sync pass
|
||||
func TestSyncReplicaSetCreateFailures(t *testing.T) {
|
||||
_, ctx := ktesting.NewTestContext(t)
|
||||
fakePodControl := controller.FakePodControl{}
|
||||
fakePodControl.CreateLimit = 10
|
||||
|
||||
@ -286,7 +289,7 @@ func TestSyncReplicaSetCreateFailures(t *testing.T) {
|
||||
informers.Apps().V1().ReplicaSets().Informer().GetIndexer().Add(rs)
|
||||
|
||||
manager.podControl = &fakePodControl
|
||||
manager.syncReplicaSet(context.TODO(), GetKey(rs, t))
|
||||
manager.syncReplicaSet(ctx, GetKey(rs, t))
|
||||
err := validateSyncReplicaSet(&fakePodControl, fakePodControl.CreateLimit, 0, 0)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
@ -302,6 +305,7 @@ func TestSyncReplicaSetCreateFailures(t *testing.T) {
|
||||
|
||||
func TestSyncReplicaSetDormancy(t *testing.T) {
|
||||
// Setup a test server so we can lie about the current state of pods
|
||||
_, ctx := ktesting.NewTestContext(t)
|
||||
fakeHandler := utiltesting.FakeHandler{
|
||||
StatusCode: 200,
|
||||
ResponseBody: "{}",
|
||||
@ -328,7 +332,7 @@ func TestSyncReplicaSetDormancy(t *testing.T) {
|
||||
rsSpec.Status.Replicas = 1
|
||||
rsSpec.Status.ReadyReplicas = 1
|
||||
rsSpec.Status.AvailableReplicas = 1
|
||||
manager.syncReplicaSet(context.TODO(), GetKey(rsSpec, t))
|
||||
manager.syncReplicaSet(ctx, GetKey(rsSpec, t))
|
||||
err := validateSyncReplicaSet(&fakePodControl, 1, 0, 0)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
@ -339,7 +343,7 @@ func TestSyncReplicaSetDormancy(t *testing.T) {
|
||||
rsSpec.Status.ReadyReplicas = 0
|
||||
rsSpec.Status.AvailableReplicas = 0
|
||||
fakePodControl.Clear()
|
||||
manager.syncReplicaSet(context.TODO(), GetKey(rsSpec, t))
|
||||
manager.syncReplicaSet(ctx, GetKey(rsSpec, t))
|
||||
err = validateSyncReplicaSet(&fakePodControl, 0, 0, 0)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
@ -360,7 +364,7 @@ func TestSyncReplicaSetDormancy(t *testing.T) {
|
||||
fakePodControl.Clear()
|
||||
fakePodControl.Err = fmt.Errorf("fake Error")
|
||||
|
||||
manager.syncReplicaSet(context.TODO(), GetKey(rsSpec, t))
|
||||
manager.syncReplicaSet(ctx, GetKey(rsSpec, t))
|
||||
err = validateSyncReplicaSet(&fakePodControl, 1, 0, 0)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
@ -369,7 +373,7 @@ func TestSyncReplicaSetDormancy(t *testing.T) {
|
||||
// This replica should not need a Lowering of expectations, since the previous create failed
|
||||
fakePodControl.Clear()
|
||||
fakePodControl.Err = nil
|
||||
manager.syncReplicaSet(context.TODO(), GetKey(rsSpec, t))
|
||||
manager.syncReplicaSet(ctx, GetKey(rsSpec, t))
|
||||
err = validateSyncReplicaSet(&fakePodControl, 1, 0, 0)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
@ -597,7 +601,7 @@ func TestRelatedPodsLookup(t *testing.T) {
|
||||
}
|
||||
for _, pod := range c.pods {
|
||||
informers.Core().V1().Pods().Informer().GetIndexer().Add(pod)
|
||||
manager.addPod(pod)
|
||||
manager.addPod(logger, pod)
|
||||
}
|
||||
actualPods, err := manager.getIndirectlyRelatedPods(logger, c.rs)
|
||||
if err != nil {
|
||||
@ -616,13 +620,14 @@ func TestRelatedPodsLookup(t *testing.T) {
|
||||
}
|
||||
|
||||
func TestWatchControllers(t *testing.T) {
|
||||
|
||||
fakeWatch := watch.NewFake()
|
||||
client := fake.NewSimpleClientset()
|
||||
client.PrependWatchReactor("replicasets", core.DefaultWatchReactor(fakeWatch, nil))
|
||||
stopCh := make(chan struct{})
|
||||
defer close(stopCh)
|
||||
informers := informers.NewSharedInformerFactory(client, controller.NoResyncPeriodFunc())
|
||||
logger, _ := ktesting.NewTestContext(t)
|
||||
logger, ctx := ktesting.NewTestContext(t)
|
||||
manager := NewReplicaSetController(
|
||||
logger,
|
||||
informers.Apps().V1().ReplicaSets(),
|
||||
@ -653,7 +658,7 @@ func TestWatchControllers(t *testing.T) {
|
||||
}
|
||||
// Start only the ReplicaSet watcher and the workqueue, send a watch event,
|
||||
// and make sure it hits the sync method.
|
||||
go wait.UntilWithContext(context.TODO(), manager.worker, 10*time.Millisecond)
|
||||
go wait.UntilWithContext(ctx, manager.worker, 10*time.Millisecond)
|
||||
|
||||
testRSSpec.Name = "foo"
|
||||
fakeWatch.Add(&testRSSpec)
|
||||
@ -666,6 +671,7 @@ func TestWatchControllers(t *testing.T) {
|
||||
}
|
||||
|
||||
func TestWatchPods(t *testing.T) {
|
||||
_, ctx := ktesting.NewTestContext(t)
|
||||
client := fake.NewSimpleClientset()
|
||||
|
||||
fakeWatch := watch.NewFake()
|
||||
@ -703,7 +709,7 @@ func TestWatchPods(t *testing.T) {
|
||||
// Start only the pod watcher and the workqueue, send a watch event,
|
||||
// and make sure it hits the sync method for the right ReplicaSet.
|
||||
go informers.Core().V1().Pods().Informer().Run(stopCh)
|
||||
go manager.Run(context.TODO(), 1)
|
||||
go manager.Run(ctx, 1)
|
||||
|
||||
pods := newPodList(nil, 1, v1.PodRunning, labelMap, testRSSpec, "pod")
|
||||
testPod := pods.Items[0]
|
||||
@ -718,6 +724,7 @@ func TestWatchPods(t *testing.T) {
|
||||
}
|
||||
|
||||
func TestUpdatePods(t *testing.T) {
|
||||
logger, ctx := ktesting.NewTestContext(t)
|
||||
stopCh := make(chan struct{})
|
||||
defer close(stopCh)
|
||||
manager, informers := testNewReplicaSetControllerFromClient(t, fake.NewSimpleClientset(), stopCh, BurstReplicas)
|
||||
@ -737,7 +744,7 @@ func TestUpdatePods(t *testing.T) {
|
||||
return nil
|
||||
}
|
||||
|
||||
go wait.UntilWithContext(context.TODO(), manager.worker, 10*time.Millisecond)
|
||||
go wait.UntilWithContext(ctx, manager.worker, 10*time.Millisecond)
|
||||
|
||||
// Put 2 ReplicaSets and one pod into the informers
|
||||
labelMap1 := map[string]string{"foo": "bar"}
|
||||
@ -760,7 +767,7 @@ func TestUpdatePods(t *testing.T) {
|
||||
pod2 := pod1
|
||||
pod2.Labels = labelMap2
|
||||
pod2.ResourceVersion = "2"
|
||||
manager.updatePod(&pod1, &pod2)
|
||||
manager.updatePod(logger, &pod1, &pod2)
|
||||
expected := sets.NewString(testRSSpec1.Name)
|
||||
for _, name := range expected.List() {
|
||||
t.Logf("Expecting update for %+v", name)
|
||||
@ -782,7 +789,7 @@ func TestUpdatePods(t *testing.T) {
|
||||
pod2 = pod1
|
||||
pod2.OwnerReferences = nil
|
||||
pod2.ResourceVersion = "2"
|
||||
manager.updatePod(&pod1, &pod2)
|
||||
manager.updatePod(logger, &pod1, &pod2)
|
||||
expected = sets.NewString(testRSSpec2.Name)
|
||||
for _, name := range expected.List() {
|
||||
t.Logf("Expecting update for %+v", name)
|
||||
@ -805,7 +812,7 @@ func TestUpdatePods(t *testing.T) {
|
||||
pod2 = pod1
|
||||
pod2.OwnerReferences = nil
|
||||
pod2.ResourceVersion = "2"
|
||||
manager.updatePod(&pod1, &pod2)
|
||||
manager.updatePod(logger, &pod1, &pod2)
|
||||
expected = sets.NewString(testRSSpec1.Name, testRSSpec2.Name)
|
||||
for _, name := range expected.List() {
|
||||
t.Logf("Expecting update for %+v", name)
|
||||
@ -827,7 +834,7 @@ func TestUpdatePods(t *testing.T) {
|
||||
pod2 = pod1
|
||||
pod2.Labels = labelMap2
|
||||
pod2.ResourceVersion = "2"
|
||||
manager.updatePod(&pod1, &pod2)
|
||||
manager.updatePod(logger, &pod1, &pod2)
|
||||
expected = sets.NewString(testRSSpec2.Name)
|
||||
for _, name := range expected.List() {
|
||||
t.Logf("Expecting update for %+v", name)
|
||||
@ -843,6 +850,7 @@ func TestUpdatePods(t *testing.T) {
|
||||
}
|
||||
|
||||
func TestControllerUpdateRequeue(t *testing.T) {
|
||||
_, ctx := ktesting.NewTestContext(t)
|
||||
// This server should force a requeue of the controller because it fails to update status.Replicas.
|
||||
labelMap := map[string]string{"foo": "bar"}
|
||||
rs := newReplicaSet(1, labelMap)
|
||||
@ -868,7 +876,7 @@ func TestControllerUpdateRequeue(t *testing.T) {
|
||||
// Enqueue once. Then process it. Disable rate-limiting for this.
|
||||
manager.queue = workqueue.NewRateLimitingQueue(workqueue.NewMaxOfRateLimiter())
|
||||
manager.enqueueRS(rs)
|
||||
manager.processNextWorkItem(context.TODO())
|
||||
manager.processNextWorkItem(ctx)
|
||||
// It should have been requeued.
|
||||
if got, want := manager.queue.Len(), 1; got != want {
|
||||
t.Errorf("queue.Len() = %v, want %v", got, want)
|
||||
@ -923,6 +931,7 @@ func TestControllerUpdateStatusWithFailure(t *testing.T) {
|
||||
|
||||
// TODO: This test is too hairy for a unittest. It should be moved to an E2E suite.
|
||||
func doTestControllerBurstReplicas(t *testing.T, burstReplicas, numReplicas int) {
|
||||
logger, ctx := ktesting.NewTestContext(t)
|
||||
labelMap := map[string]string{"foo": "bar"}
|
||||
rsSpec := newReplicaSet(numReplicas, labelMap)
|
||||
client := fake.NewSimpleClientset(rsSpec)
|
||||
@ -949,7 +958,7 @@ func doTestControllerBurstReplicas(t *testing.T, burstReplicas, numReplicas int)
|
||||
informers.Apps().V1().ReplicaSets().Informer().GetIndexer().Add(rsSpec)
|
||||
|
||||
for i := 0; i < numReplicas; i += burstReplicas {
|
||||
manager.syncReplicaSet(context.TODO(), GetKey(rsSpec, t))
|
||||
manager.syncReplicaSet(ctx, GetKey(rsSpec, t))
|
||||
|
||||
// The store accrues active pods. It's also used by the ReplicaSet to determine how many
|
||||
// replicas to create.
|
||||
@ -972,7 +981,7 @@ func doTestControllerBurstReplicas(t *testing.T, burstReplicas, numReplicas int)
|
||||
// None of these should wake the controller because it has expectations==BurstReplicas.
|
||||
for i := int32(0); i < expectedPods-1; i++ {
|
||||
informers.Core().V1().Pods().Informer().GetIndexer().Add(&pods.Items[i])
|
||||
manager.addPod(&pods.Items[i])
|
||||
manager.addPod(logger, &pods.Items[i])
|
||||
}
|
||||
|
||||
podExp, exists, err := manager.expectations.GetExpectations(rsKey)
|
||||
@ -1015,7 +1024,7 @@ func doTestControllerBurstReplicas(t *testing.T, burstReplicas, numReplicas int)
|
||||
// don't double delete.
|
||||
for i := range podsToDelete[1:] {
|
||||
informers.Core().V1().Pods().Informer().GetIndexer().Delete(podsToDelete[i])
|
||||
manager.deletePod(podsToDelete[i])
|
||||
manager.deletePod(logger, podsToDelete[i])
|
||||
}
|
||||
podExp, exists, err := manager.expectations.GetExpectations(rsKey)
|
||||
if !exists || err != nil {
|
||||
@ -1028,7 +1037,7 @@ func doTestControllerBurstReplicas(t *testing.T, burstReplicas, numReplicas int)
|
||||
|
||||
// Check that the ReplicaSet didn't take any action for all the above pods
|
||||
fakePodControl.Clear()
|
||||
manager.syncReplicaSet(context.TODO(), GetKey(rsSpec, t))
|
||||
manager.syncReplicaSet(ctx, GetKey(rsSpec, t))
|
||||
err := validateSyncReplicaSet(&fakePodControl, 0, 0, 0)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
@ -1039,7 +1048,7 @@ func doTestControllerBurstReplicas(t *testing.T, burstReplicas, numReplicas int)
|
||||
// which will cause it to create/delete the remaining replicas up to burstReplicas.
|
||||
if replicas != 0 {
|
||||
informers.Core().V1().Pods().Informer().GetIndexer().Add(&pods.Items[expectedPods-1])
|
||||
manager.addPod(&pods.Items[expectedPods-1])
|
||||
manager.addPod(logger, &pods.Items[expectedPods-1])
|
||||
} else {
|
||||
expectedDel := manager.expectations.GetUIDs(GetKey(rsSpec, t))
|
||||
if expectedDel.Len() != 1 {
|
||||
@ -1058,7 +1067,7 @@ func doTestControllerBurstReplicas(t *testing.T, burstReplicas, numReplicas int)
|
||||
},
|
||||
}
|
||||
informers.Core().V1().Pods().Informer().GetIndexer().Delete(lastPod)
|
||||
manager.deletePod(lastPod)
|
||||
manager.deletePod(logger, lastPod)
|
||||
}
|
||||
pods.Items = pods.Items[expectedPods:]
|
||||
}
|
||||
@ -1093,6 +1102,7 @@ func (fe FakeRSExpectations) SatisfiedExpectations(controllerKey string) bool {
|
||||
// TestRSSyncExpectations tests that a pod cannot sneak in between counting active pods
|
||||
// and checking expectations.
|
||||
func TestRSSyncExpectations(t *testing.T) {
|
||||
_, ctx := ktesting.NewTestContext(t)
|
||||
client := clientset.NewForConfigOrDie(&restclient.Config{Host: "", ContentConfig: restclient.ContentConfig{GroupVersion: &schema.GroupVersion{Group: "", Version: "v1"}}})
|
||||
fakePodControl := controller.FakePodControl{}
|
||||
stopCh := make(chan struct{})
|
||||
@ -1115,7 +1125,7 @@ func TestRSSyncExpectations(t *testing.T) {
|
||||
informers.Core().V1().Pods().Informer().GetIndexer().Add(&postExpectationsPod)
|
||||
},
|
||||
})
|
||||
manager.syncReplicaSet(context.TODO(), GetKey(rsSpec, t))
|
||||
manager.syncReplicaSet(ctx, GetKey(rsSpec, t))
|
||||
err := validateSyncReplicaSet(&fakePodControl, 0, 0, 0)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
@ -1123,6 +1133,7 @@ func TestRSSyncExpectations(t *testing.T) {
|
||||
}
|
||||
|
||||
func TestDeleteControllerAndExpectations(t *testing.T) {
|
||||
logger, ctx := ktesting.NewTestContext(t)
|
||||
rs := newReplicaSet(1, map[string]string{"foo": "bar"})
|
||||
client := fake.NewSimpleClientset(rs)
|
||||
stopCh := make(chan struct{})
|
||||
@ -1135,7 +1146,7 @@ func TestDeleteControllerAndExpectations(t *testing.T) {
|
||||
manager.podControl = &fakePodControl
|
||||
|
||||
// This should set expectations for the ReplicaSet
|
||||
manager.syncReplicaSet(context.TODO(), GetKey(rs, t))
|
||||
manager.syncReplicaSet(ctx, GetKey(rs, t))
|
||||
err := validateSyncReplicaSet(&fakePodControl, 1, 0, 0)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
@ -1155,8 +1166,8 @@ func TestDeleteControllerAndExpectations(t *testing.T) {
|
||||
t.Errorf("No expectations found for ReplicaSet")
|
||||
}
|
||||
informers.Apps().V1().ReplicaSets().Informer().GetIndexer().Delete(rs)
|
||||
manager.deleteRS(rs)
|
||||
manager.syncReplicaSet(context.TODO(), GetKey(rs, t))
|
||||
manager.deleteRS(logger, rs)
|
||||
manager.syncReplicaSet(ctx, GetKey(rs, t))
|
||||
|
||||
_, exists, err = manager.expectations.GetExpectations(rsKey)
|
||||
if err != nil {
|
||||
@ -1169,7 +1180,7 @@ func TestDeleteControllerAndExpectations(t *testing.T) {
|
||||
// This should have no effect, since we've deleted the ReplicaSet.
|
||||
podExp.Add(-1, 0)
|
||||
informers.Core().V1().Pods().Informer().GetIndexer().Replace(make([]interface{}, 0), "0")
|
||||
manager.syncReplicaSet(context.TODO(), GetKey(rs, t))
|
||||
manager.syncReplicaSet(ctx, GetKey(rs, t))
|
||||
err = validateSyncReplicaSet(&fakePodControl, 0, 0, 0)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
@ -1177,6 +1188,7 @@ func TestDeleteControllerAndExpectations(t *testing.T) {
|
||||
}
|
||||
|
||||
func TestExpectationsOnRecreate(t *testing.T) {
|
||||
_, ctx := ktesting.NewTestContext(t)
|
||||
client := fake.NewSimpleClientset()
|
||||
stopCh := make(chan struct{})
|
||||
defer close(stopCh)
|
||||
@ -1200,7 +1212,7 @@ func TestExpectationsOnRecreate(t *testing.T) {
|
||||
}
|
||||
|
||||
oldRS := newReplicaSet(1, map[string]string{"foo": "bar"})
|
||||
oldRS, err := client.AppsV1().ReplicaSets(oldRS.Namespace).Create(context.TODO(), oldRS, metav1.CreateOptions{})
|
||||
oldRS, err := client.AppsV1().ReplicaSets(oldRS.Namespace).Create(ctx, oldRS, metav1.CreateOptions{})
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
@ -1213,7 +1225,7 @@ func TestExpectationsOnRecreate(t *testing.T) {
|
||||
t.Fatalf("initial RS didn't result in new item in the queue: %v", err)
|
||||
}
|
||||
|
||||
ok := manager.processNextWorkItem(context.TODO())
|
||||
ok := manager.processNextWorkItem(ctx)
|
||||
if !ok {
|
||||
t.Fatal("queue is shutting down")
|
||||
}
|
||||
@ -1244,7 +1256,7 @@ func TestExpectationsOnRecreate(t *testing.T) {
|
||||
t.Fatal("Unexpected item in the queue")
|
||||
}
|
||||
|
||||
err = client.AppsV1().ReplicaSets(oldRS.Namespace).Delete(context.TODO(), oldRS.Name, metav1.DeleteOptions{})
|
||||
err = client.AppsV1().ReplicaSets(oldRS.Namespace).Delete(ctx, oldRS.Name, metav1.DeleteOptions{})
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
@ -1281,7 +1293,7 @@ func TestExpectationsOnRecreate(t *testing.T) {
|
||||
|
||||
newRS := oldRS.DeepCopy()
|
||||
newRS.UID = uuid.NewUUID()
|
||||
newRS, err = client.AppsV1().ReplicaSets(newRS.Namespace).Create(context.TODO(), newRS, metav1.CreateOptions{})
|
||||
newRS, err = client.AppsV1().ReplicaSets(newRS.Namespace).Create(ctx, newRS, metav1.CreateOptions{})
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
@ -1299,7 +1311,7 @@ func TestExpectationsOnRecreate(t *testing.T) {
|
||||
t.Fatalf("Re-creating RS didn't result in new item in the queue: %v", err)
|
||||
}
|
||||
|
||||
ok = manager.processNextWorkItem(context.TODO())
|
||||
ok = manager.processNextWorkItem(ctx)
|
||||
if !ok {
|
||||
t.Fatal("Queue is shutting down!")
|
||||
}
|
||||
@ -1338,6 +1350,7 @@ func shuffle(controllers []*apps.ReplicaSet) []*apps.ReplicaSet {
|
||||
}
|
||||
|
||||
func TestOverlappingRSs(t *testing.T) {
|
||||
logger, _ := ktesting.NewTestContext(t)
|
||||
client := clientset.NewForConfigOrDie(&restclient.Config{Host: "", ContentConfig: restclient.ContentConfig{GroupVersion: &schema.GroupVersion{Group: "", Version: "v1"}}})
|
||||
labelMap := map[string]string{"foo": "bar"}
|
||||
|
||||
@ -1373,7 +1386,7 @@ func TestOverlappingRSs(t *testing.T) {
|
||||
}
|
||||
rsKey := GetKey(rs, t)
|
||||
|
||||
manager.addPod(pod)
|
||||
manager.addPod(logger, pod)
|
||||
queueRS, _ := manager.queue.Get()
|
||||
if queueRS != rsKey {
|
||||
t.Fatalf("Expected to find key %v in queue, found %v", rsKey, queueRS)
|
||||
@ -1381,6 +1394,7 @@ func TestOverlappingRSs(t *testing.T) {
|
||||
}
|
||||
|
||||
func TestDeletionTimestamp(t *testing.T) {
|
||||
logger, _ := ktesting.NewTestContext(t)
|
||||
c := clientset.NewForConfigOrDie(&restclient.Config{Host: "", ContentConfig: restclient.ContentConfig{GroupVersion: &schema.GroupVersion{Group: "", Version: "v1"}}})
|
||||
labelMap := map[string]string{"foo": "bar"}
|
||||
stopCh := make(chan struct{})
|
||||
@ -1399,7 +1413,7 @@ func TestDeletionTimestamp(t *testing.T) {
|
||||
manager.expectations.ExpectDeletions(rsKey, []string{controller.PodKey(&pod)})
|
||||
|
||||
// A pod added with a deletion timestamp should decrement deletions, not creations.
|
||||
manager.addPod(&pod)
|
||||
manager.addPod(logger, &pod)
|
||||
|
||||
queueRS, _ := manager.queue.Get()
|
||||
if queueRS != rsKey {
|
||||
@ -1417,7 +1431,7 @@ func TestDeletionTimestamp(t *testing.T) {
|
||||
oldPod := newPodList(nil, 1, v1.PodPending, labelMap, rs, "pod").Items[0]
|
||||
oldPod.ResourceVersion = "2"
|
||||
manager.expectations.ExpectDeletions(rsKey, []string{controller.PodKey(&pod)})
|
||||
manager.updatePod(&oldPod, &pod)
|
||||
manager.updatePod(logger, &oldPod, &pod)
|
||||
|
||||
queueRS, _ = manager.queue.Get()
|
||||
if queueRS != rsKey {
|
||||
@ -1446,7 +1460,7 @@ func TestDeletionTimestamp(t *testing.T) {
|
||||
manager.expectations.ExpectDeletions(rsKey, []string{controller.PodKey(secondPod)})
|
||||
oldPod.DeletionTimestamp = &metav1.Time{Time: time.Now()}
|
||||
oldPod.ResourceVersion = "2"
|
||||
manager.updatePod(&oldPod, &pod)
|
||||
manager.updatePod(logger, &oldPod, &pod)
|
||||
|
||||
podExp, exists, err = manager.expectations.GetExpectations(rsKey)
|
||||
if !exists || err != nil || podExp.Fulfilled() {
|
||||
@ -1455,14 +1469,14 @@ func TestDeletionTimestamp(t *testing.T) {
|
||||
|
||||
// A pod with a non-nil deletion timestamp should also be ignored by the
|
||||
// delete handler, because it's already been counted in the update.
|
||||
manager.deletePod(&pod)
|
||||
manager.deletePod(logger, &pod)
|
||||
podExp, exists, err = manager.expectations.GetExpectations(rsKey)
|
||||
if !exists || err != nil || podExp.Fulfilled() {
|
||||
t.Fatalf("Wrong expectations %#v", podExp)
|
||||
}
|
||||
|
||||
// Deleting the second pod should clear expectations.
|
||||
manager.deletePod(secondPod)
|
||||
manager.deletePod(logger, secondPod)
|
||||
|
||||
queueRS, _ = manager.queue.Get()
|
||||
if queueRS != rsKey {
|
||||
@ -1487,6 +1501,7 @@ func setupManagerWithGCEnabled(t *testing.T, stopCh chan struct{}, objs ...runti
|
||||
}
|
||||
|
||||
func TestDoNotPatchPodWithOtherControlRef(t *testing.T) {
|
||||
_, ctx := ktesting.NewTestContext(t)
|
||||
labelMap := map[string]string{"foo": "bar"}
|
||||
rs := newReplicaSet(2, labelMap)
|
||||
stopCh := make(chan struct{})
|
||||
@ -1499,7 +1514,7 @@ func TestDoNotPatchPodWithOtherControlRef(t *testing.T) {
|
||||
pod := newPod("pod", rs, v1.PodRunning, nil, true)
|
||||
pod.OwnerReferences = []metav1.OwnerReference{otherControllerReference}
|
||||
informers.Core().V1().Pods().Informer().GetIndexer().Add(pod)
|
||||
err := manager.syncReplicaSet(context.TODO(), GetKey(rs, t))
|
||||
err := manager.syncReplicaSet(ctx, GetKey(rs, t))
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
@ -1511,6 +1526,7 @@ func TestDoNotPatchPodWithOtherControlRef(t *testing.T) {
|
||||
}
|
||||
|
||||
func TestPatchPodFails(t *testing.T) {
|
||||
_, ctx := ktesting.NewTestContext(t)
|
||||
labelMap := map[string]string{"foo": "bar"}
|
||||
rs := newReplicaSet(2, labelMap)
|
||||
stopCh := make(chan struct{})
|
||||
@ -1525,7 +1541,7 @@ func TestPatchPodFails(t *testing.T) {
|
||||
// control of the pods and requeue to try again.
|
||||
fakePodControl.Err = fmt.Errorf("fake Error")
|
||||
rsKey := GetKey(rs, t)
|
||||
err := processSync(manager, rsKey)
|
||||
err := processSync(ctx, manager, rsKey)
|
||||
if err == nil || !strings.Contains(err.Error(), "fake Error") {
|
||||
t.Errorf("expected fake Error, got %+v", err)
|
||||
}
|
||||
@ -1544,6 +1560,7 @@ func TestPatchPodFails(t *testing.T) {
|
||||
// RS controller shouldn't adopt or create more pods if the rc is about to be
|
||||
// deleted.
|
||||
func TestDoNotAdoptOrCreateIfBeingDeleted(t *testing.T) {
|
||||
_, ctx := ktesting.NewTestContext(t)
|
||||
labelMap := map[string]string{"foo": "bar"}
|
||||
rs := newReplicaSet(2, labelMap)
|
||||
now := metav1.Now()
|
||||
@ -1556,7 +1573,7 @@ func TestDoNotAdoptOrCreateIfBeingDeleted(t *testing.T) {
|
||||
informers.Core().V1().Pods().Informer().GetIndexer().Add(pod1)
|
||||
|
||||
// no patch, no create
|
||||
err := manager.syncReplicaSet(context.TODO(), GetKey(rs, t))
|
||||
err := manager.syncReplicaSet(ctx, GetKey(rs, t))
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
@ -1567,6 +1584,7 @@ func TestDoNotAdoptOrCreateIfBeingDeleted(t *testing.T) {
|
||||
}
|
||||
|
||||
func TestDoNotAdoptOrCreateIfBeingDeletedRace(t *testing.T) {
|
||||
_, ctx := ktesting.NewTestContext(t)
|
||||
labelMap := map[string]string{"foo": "bar"}
|
||||
// Bare client says it IS deleted.
|
||||
rs := newReplicaSet(2, labelMap)
|
||||
@ -1585,7 +1603,7 @@ func TestDoNotAdoptOrCreateIfBeingDeletedRace(t *testing.T) {
|
||||
informers.Core().V1().Pods().Informer().GetIndexer().Add(pod1)
|
||||
|
||||
// sync should abort.
|
||||
err := manager.syncReplicaSet(context.TODO(), GetKey(rs, t))
|
||||
err := manager.syncReplicaSet(ctx, GetKey(rs, t))
|
||||
if err == nil {
|
||||
t.Error("syncReplicaSet() err = nil, expected non-nil")
|
||||
}
|
||||
|
@ -31,6 +31,7 @@ import (
|
||||
clientset "k8s.io/client-go/kubernetes"
|
||||
"k8s.io/client-go/kubernetes/scheme"
|
||||
"k8s.io/client-go/tools/record"
|
||||
"k8s.io/klog/v2"
|
||||
"k8s.io/kubernetes/pkg/controller"
|
||||
"k8s.io/kubernetes/pkg/controller/replicaset"
|
||||
)
|
||||
@ -47,10 +48,10 @@ type ReplicationManager struct {
|
||||
}
|
||||
|
||||
// NewReplicationManager configures a replication manager with the specified event recorder
|
||||
func NewReplicationManager(podInformer coreinformers.PodInformer, rcInformer coreinformers.ReplicationControllerInformer, kubeClient clientset.Interface, burstReplicas int) *ReplicationManager {
|
||||
func NewReplicationManager(logger klog.Logger, podInformer coreinformers.PodInformer, rcInformer coreinformers.ReplicationControllerInformer, kubeClient clientset.Interface, burstReplicas int) *ReplicationManager {
|
||||
eventBroadcaster := record.NewBroadcaster()
|
||||
return &ReplicationManager{
|
||||
*replicaset.NewBaseController(informerAdapter{rcInformer}, podInformer, clientsetAdapter{kubeClient}, burstReplicas,
|
||||
*replicaset.NewBaseController(logger, informerAdapter{rcInformer}, podInformer, clientsetAdapter{kubeClient}, burstReplicas,
|
||||
v1.SchemeGroupVersion.WithKind("ReplicationController"),
|
||||
"replication_controller",
|
||||
"replicationmanager",
|
||||
|
@ -175,7 +175,7 @@ func RecordNodeEvent(ctx context.Context, recorder record.EventRecorder, nodeNam
|
||||
}
|
||||
|
||||
// RecordNodeStatusChange records a event related to a node status change. (Common to lifecycle and ipam)
|
||||
func RecordNodeStatusChange(recorder record.EventRecorder, node *v1.Node, newStatus string) {
|
||||
func RecordNodeStatusChange(logger klog.Logger, recorder record.EventRecorder, node *v1.Node, newStatus string) {
|
||||
ref := &v1.ObjectReference{
|
||||
APIVersion: "v1",
|
||||
Kind: "Node",
|
||||
@ -183,7 +183,7 @@ func RecordNodeStatusChange(recorder record.EventRecorder, node *v1.Node, newSta
|
||||
UID: node.UID,
|
||||
Namespace: "",
|
||||
}
|
||||
klog.V(2).InfoS("Recording status change event message for node", "status", newStatus, "node", node.Name)
|
||||
logger.V(2).Info("Recording status change event message for node", "status", newStatus, "node", node.Name)
|
||||
// TODO: This requires a transaction, either both node status is updated
|
||||
// and event is recorded or neither should happen, see issue #6055.
|
||||
recorder.Eventf(ref, v1.EventTypeNormal, newStatus, "Node %s status is now: %s", node.Name, newStatus)
|
||||
@ -265,7 +265,7 @@ func CreateUpdateNodeHandler(f func(oldNode, newNode *v1.Node) error) func(oldOb
|
||||
}
|
||||
|
||||
// CreateDeleteNodeHandler creates a delete node handler. (Common to lifecycle and ipam)
|
||||
func CreateDeleteNodeHandler(f func(node *v1.Node) error) func(obj interface{}) {
|
||||
func CreateDeleteNodeHandler(logger klog.Logger, f func(node *v1.Node) error) func(obj interface{}) {
|
||||
return func(originalObj interface{}) {
|
||||
originalNode, isNode := originalObj.(*v1.Node)
|
||||
// We can get DeletedFinalStateUnknown instead of *v1.Node here and
|
||||
@ -273,12 +273,12 @@ func CreateDeleteNodeHandler(f func(node *v1.Node) error) func(obj interface{})
|
||||
if !isNode {
|
||||
deletedState, ok := originalObj.(cache.DeletedFinalStateUnknown)
|
||||
if !ok {
|
||||
klog.ErrorS(nil, "Received unexpected object", "object", originalObj)
|
||||
logger.Error(nil, "Received unexpected object", "object", originalObj)
|
||||
return
|
||||
}
|
||||
originalNode, ok = deletedState.Obj.(*v1.Node)
|
||||
if !ok {
|
||||
klog.ErrorS(nil, "DeletedFinalStateUnknown contained non-Node object", "object", deletedState.Obj)
|
||||
logger.Error(nil, "DeletedFinalStateUnknown contained non-Node object", "object", deletedState.Obj)
|
||||
return
|
||||
}
|
||||
}
|
||||
|
@ -874,6 +874,7 @@ func (adc *attachDetachController) GetServiceAccountTokenFunc() func(_, _ string
|
||||
|
||||
func (adc *attachDetachController) DeleteServiceAccountTokenFunc() func(types.UID) {
|
||||
return func(types.UID) {
|
||||
// nolint:logcheck
|
||||
klog.ErrorS(nil, "DeleteServiceAccountToken unsupported in attachDetachController")
|
||||
}
|
||||
}
|
||||
|
@ -29,7 +29,6 @@ import (
|
||||
"k8s.io/apimachinery/pkg/watch"
|
||||
"k8s.io/client-go/kubernetes/fake"
|
||||
core "k8s.io/client-go/testing"
|
||||
"k8s.io/klog/v2"
|
||||
"k8s.io/kubernetes/pkg/volume"
|
||||
"k8s.io/kubernetes/pkg/volume/util"
|
||||
)
|
||||
@ -376,7 +375,6 @@ func (plugin *TestPlugin) GetVolumeName(spec *volume.Spec) (string, error) {
|
||||
plugin.pluginLock.Lock()
|
||||
defer plugin.pluginLock.Unlock()
|
||||
if spec == nil {
|
||||
klog.ErrorS(nil, "GetVolumeName called with nil volume spec")
|
||||
plugin.ErrorEncountered = true
|
||||
return "", fmt.Errorf("GetVolumeName called with nil volume spec")
|
||||
}
|
||||
@ -400,7 +398,6 @@ func (plugin *TestPlugin) CanSupport(spec *volume.Spec) bool {
|
||||
plugin.pluginLock.Lock()
|
||||
defer plugin.pluginLock.Unlock()
|
||||
if spec == nil {
|
||||
klog.ErrorS(nil, "CanSupport called with nil volume spec")
|
||||
plugin.ErrorEncountered = true
|
||||
}
|
||||
return true
|
||||
@ -414,7 +411,6 @@ func (plugin *TestPlugin) NewMounter(spec *volume.Spec, podRef *v1.Pod, opts vol
|
||||
plugin.pluginLock.Lock()
|
||||
defer plugin.pluginLock.Unlock()
|
||||
if spec == nil {
|
||||
klog.ErrorS(nil, "NewMounter called with nil volume spec")
|
||||
plugin.ErrorEncountered = true
|
||||
}
|
||||
return nil, nil
|
||||
@ -540,7 +536,6 @@ func (attacher *testPluginAttacher) Attach(spec *volume.Spec, nodeName types.Nod
|
||||
defer attacher.pluginLock.Unlock()
|
||||
if spec == nil {
|
||||
*attacher.ErrorEncountered = true
|
||||
klog.ErrorS(nil, "Attach called with nil volume spec")
|
||||
return "", fmt.Errorf("Attach called with nil volume spec")
|
||||
}
|
||||
attacher.attachedVolumeMap[string(nodeName)] = append(attacher.attachedVolumeMap[string(nodeName)], spec.Name())
|
||||
@ -556,7 +551,6 @@ func (attacher *testPluginAttacher) WaitForAttach(spec *volume.Spec, devicePath
|
||||
defer attacher.pluginLock.Unlock()
|
||||
if spec == nil {
|
||||
*attacher.ErrorEncountered = true
|
||||
klog.ErrorS(nil, "WaitForAttach called with nil volume spec")
|
||||
return "", fmt.Errorf("WaitForAttach called with nil volume spec")
|
||||
}
|
||||
fakePath := fmt.Sprintf("%s/%s", devicePath, spec.Name())
|
||||
@ -568,7 +562,6 @@ func (attacher *testPluginAttacher) GetDeviceMountPath(spec *volume.Spec) (strin
|
||||
defer attacher.pluginLock.Unlock()
|
||||
if spec == nil {
|
||||
*attacher.ErrorEncountered = true
|
||||
klog.ErrorS(nil, "GetDeviceMountPath called with nil volume spec")
|
||||
return "", fmt.Errorf("GetDeviceMountPath called with nil volume spec")
|
||||
}
|
||||
return "", nil
|
||||
@ -579,7 +572,6 @@ func (attacher *testPluginAttacher) MountDevice(spec *volume.Spec, devicePath st
|
||||
defer attacher.pluginLock.Unlock()
|
||||
if spec == nil {
|
||||
*attacher.ErrorEncountered = true
|
||||
klog.ErrorS(nil, "MountDevice called with nil volume spec")
|
||||
return fmt.Errorf("MountDevice called with nil volume spec")
|
||||
}
|
||||
return nil
|
||||
|
@ -454,6 +454,7 @@ func (expc *expandController) GetServiceAccountTokenFunc() func(_, _ string, _ *
|
||||
|
||||
func (expc *expandController) DeleteServiceAccountTokenFunc() func(types.UID) {
|
||||
return func(types.UID) {
|
||||
//nolint:logcheck
|
||||
klog.ErrorS(nil, "DeleteServiceAccountToken unsupported in expandController")
|
||||
}
|
||||
}
|
||||
|
@ -37,7 +37,6 @@ import (
|
||||
featuregatetesting "k8s.io/component-base/featuregate/testing"
|
||||
"k8s.io/component-helpers/storage/volume"
|
||||
csitrans "k8s.io/csi-translation-lib"
|
||||
"k8s.io/klog/v2"
|
||||
"k8s.io/klog/v2/ktesting"
|
||||
"k8s.io/kubernetes/pkg/controller"
|
||||
pvtesting "k8s.io/kubernetes/pkg/controller/volume/persistentvolume/testing"
|
||||
@ -311,7 +310,7 @@ func TestControllerSync(t *testing.T) {
|
||||
},
|
||||
},
|
||||
}
|
||||
_, ctx := ktesting.NewTestContext(t)
|
||||
logger, ctx := ktesting.NewTestContext(t)
|
||||
doit := func(test controllerTest) {
|
||||
// Initialize the controller
|
||||
client := &fake.Clientset{}
|
||||
@ -372,7 +371,7 @@ func TestControllerSync(t *testing.T) {
|
||||
if err != nil {
|
||||
t.Errorf("Test %q controller sync failed: %v", test.name, err)
|
||||
}
|
||||
klog.V(4).Infof("controller synced, starting test")
|
||||
logger.V(4).Info("controller synced, starting test")
|
||||
|
||||
// Call the tested function
|
||||
err = test.test(ctrl, reactor.VolumeReactor, test)
|
||||
|
@ -118,6 +118,7 @@ func (ctrl *PersistentVolumeController) GetServiceAccountTokenFunc() func(_, _ s
|
||||
|
||||
func (ctrl *PersistentVolumeController) DeleteServiceAccountTokenFunc() func(types.UID) {
|
||||
return func(types.UID) {
|
||||
//nolint:logcheck
|
||||
klog.ErrorS(nil, "DeleteServiceAccountToken unsupported in PersistentVolumeController")
|
||||
}
|
||||
}
|
||||
|
@ -60,7 +60,7 @@ const (
|
||||
// quota_test.go:100: Took 4.196205966s to scale up without quota
|
||||
// quota_test.go:115: Took 12.021640372s to scale up with quota
|
||||
func TestQuota(t *testing.T) {
|
||||
_, ctx := ktesting.NewTestContext(t)
|
||||
logger, ctx := ktesting.NewTestContext(t)
|
||||
ctx, cancel := context.WithCancel(ctx)
|
||||
defer cancel()
|
||||
|
||||
@ -82,6 +82,7 @@ func TestQuota(t *testing.T) {
|
||||
|
||||
informers := informers.NewSharedInformerFactory(clientset, controller.NoResyncPeriodFunc())
|
||||
rm := replicationcontroller.NewReplicationManager(
|
||||
logger,
|
||||
informers.Core().V1().Pods(),
|
||||
informers.Core().V1().ReplicationControllers(),
|
||||
clientset,
|
||||
@ -290,7 +291,7 @@ plugins:
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
_, ctx := ktesting.NewTestContext(t)
|
||||
logger, ctx := ktesting.NewTestContext(t)
|
||||
ctx, cancel := context.WithCancel(ctx)
|
||||
defer cancel()
|
||||
|
||||
@ -312,6 +313,7 @@ plugins:
|
||||
|
||||
informers := informers.NewSharedInformerFactory(clientset, controller.NoResyncPeriodFunc())
|
||||
rm := replicationcontroller.NewReplicationManager(
|
||||
logger,
|
||||
informers.Core().V1().Pods(),
|
||||
informers.Core().V1().ReplicationControllers(),
|
||||
clientset,
|
||||
@ -417,7 +419,7 @@ plugins:
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
_, ctx := ktesting.NewTestContext(t)
|
||||
logger, ctx := ktesting.NewTestContext(t)
|
||||
ctx, cancel := context.WithCancel(ctx)
|
||||
defer cancel()
|
||||
|
||||
@ -439,6 +441,7 @@ plugins:
|
||||
|
||||
informers := informers.NewSharedInformerFactory(clientset, controller.NoResyncPeriodFunc())
|
||||
rm := replicationcontroller.NewReplicationManager(
|
||||
logger,
|
||||
informers.Core().V1().Pods(),
|
||||
informers.Core().V1().ReplicationControllers(),
|
||||
clientset,
|
||||
|
@ -23,7 +23,7 @@ import (
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"k8s.io/api/core/v1"
|
||||
v1 "k8s.io/api/core/v1"
|
||||
apiequality "k8s.io/apimachinery/pkg/api/equality"
|
||||
apierrors "k8s.io/apimachinery/pkg/api/errors"
|
||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||
@ -38,6 +38,7 @@ import (
|
||||
"k8s.io/client-go/tools/cache"
|
||||
"k8s.io/client-go/util/retry"
|
||||
featuregatetesting "k8s.io/component-base/featuregate/testing"
|
||||
"k8s.io/klog/v2/ktesting"
|
||||
kubeapiservertesting "k8s.io/kubernetes/cmd/kube-apiserver/app/testing"
|
||||
podutil "k8s.io/kubernetes/pkg/api/v1/pod"
|
||||
"k8s.io/kubernetes/pkg/controller/replication"
|
||||
@ -122,7 +123,9 @@ func rmSetup(t *testing.T) (kubeapiservertesting.TearDownFunc, *replication.Repl
|
||||
resyncPeriod := 12 * time.Hour
|
||||
informers := informers.NewSharedInformerFactory(clientset.NewForConfigOrDie(restclient.AddUserAgent(config, "rc-informers")), resyncPeriod)
|
||||
|
||||
logger, _ := ktesting.NewTestContext(t)
|
||||
rm := replication.NewReplicationManager(
|
||||
logger,
|
||||
informers.Core().V1().Pods(),
|
||||
informers.Core().V1().ReplicationControllers(),
|
||||
clientset.NewForConfigOrDie(restclient.AddUserAgent(config, "replication-controller")),
|
||||
|
Loading…
Reference in New Issue
Block a user