Compare commits

..

1 Commits

Author SHA1 Message Date
Kubernetes Publisher
9b108793f0 Update dependencies to v0.27.0-alpha.1 tag 2023-01-25 01:07:15 +00:00
39 changed files with 196 additions and 66093 deletions

View File

@@ -25,29 +25,28 @@ import (
// ContainerApplyConfiguration represents an declarative configuration of the Container type for use
// with apply.
type ContainerApplyConfiguration struct {
Name *string `json:"name,omitempty"`
Image *string `json:"image,omitempty"`
Command []string `json:"command,omitempty"`
Args []string `json:"args,omitempty"`
WorkingDir *string `json:"workingDir,omitempty"`
Ports []ContainerPortApplyConfiguration `json:"ports,omitempty"`
EnvFrom []EnvFromSourceApplyConfiguration `json:"envFrom,omitempty"`
Env []EnvVarApplyConfiguration `json:"env,omitempty"`
Resources *ResourceRequirementsApplyConfiguration `json:"resources,omitempty"`
ResizePolicy []ContainerResizePolicyApplyConfiguration `json:"resizePolicy,omitempty"`
VolumeMounts []VolumeMountApplyConfiguration `json:"volumeMounts,omitempty"`
VolumeDevices []VolumeDeviceApplyConfiguration `json:"volumeDevices,omitempty"`
LivenessProbe *ProbeApplyConfiguration `json:"livenessProbe,omitempty"`
ReadinessProbe *ProbeApplyConfiguration `json:"readinessProbe,omitempty"`
StartupProbe *ProbeApplyConfiguration `json:"startupProbe,omitempty"`
Lifecycle *LifecycleApplyConfiguration `json:"lifecycle,omitempty"`
TerminationMessagePath *string `json:"terminationMessagePath,omitempty"`
TerminationMessagePolicy *corev1.TerminationMessagePolicy `json:"terminationMessagePolicy,omitempty"`
ImagePullPolicy *corev1.PullPolicy `json:"imagePullPolicy,omitempty"`
SecurityContext *SecurityContextApplyConfiguration `json:"securityContext,omitempty"`
Stdin *bool `json:"stdin,omitempty"`
StdinOnce *bool `json:"stdinOnce,omitempty"`
TTY *bool `json:"tty,omitempty"`
Name *string `json:"name,omitempty"`
Image *string `json:"image,omitempty"`
Command []string `json:"command,omitempty"`
Args []string `json:"args,omitempty"`
WorkingDir *string `json:"workingDir,omitempty"`
Ports []ContainerPortApplyConfiguration `json:"ports,omitempty"`
EnvFrom []EnvFromSourceApplyConfiguration `json:"envFrom,omitempty"`
Env []EnvVarApplyConfiguration `json:"env,omitempty"`
Resources *ResourceRequirementsApplyConfiguration `json:"resources,omitempty"`
VolumeMounts []VolumeMountApplyConfiguration `json:"volumeMounts,omitempty"`
VolumeDevices []VolumeDeviceApplyConfiguration `json:"volumeDevices,omitempty"`
LivenessProbe *ProbeApplyConfiguration `json:"livenessProbe,omitempty"`
ReadinessProbe *ProbeApplyConfiguration `json:"readinessProbe,omitempty"`
StartupProbe *ProbeApplyConfiguration `json:"startupProbe,omitempty"`
Lifecycle *LifecycleApplyConfiguration `json:"lifecycle,omitempty"`
TerminationMessagePath *string `json:"terminationMessagePath,omitempty"`
TerminationMessagePolicy *corev1.TerminationMessagePolicy `json:"terminationMessagePolicy,omitempty"`
ImagePullPolicy *corev1.PullPolicy `json:"imagePullPolicy,omitempty"`
SecurityContext *SecurityContextApplyConfiguration `json:"securityContext,omitempty"`
Stdin *bool `json:"stdin,omitempty"`
StdinOnce *bool `json:"stdinOnce,omitempty"`
TTY *bool `json:"tty,omitempty"`
}
// ContainerApplyConfiguration constructs an declarative configuration of the Container type for use with
@@ -147,19 +146,6 @@ func (b *ContainerApplyConfiguration) WithResources(value *ResourceRequirementsA
return b
}
// WithResizePolicy adds the given value to the ResizePolicy field in the declarative configuration
// and returns the receiver, so that objects can be build by chaining "With" function invocations.
// If called multiple times, values provided by each call will be appended to the ResizePolicy field.
func (b *ContainerApplyConfiguration) WithResizePolicy(values ...*ContainerResizePolicyApplyConfiguration) *ContainerApplyConfiguration {
for i := range values {
if values[i] == nil {
panic("nil value passed to WithResizePolicy")
}
b.ResizePolicy = append(b.ResizePolicy, *values[i])
}
return b
}
// WithVolumeMounts adds the given value to the VolumeMounts field in the declarative configuration
// and returns the receiver, so that objects can be build by chaining "With" function invocations.
// If called multiple times, values provided by each call will be appended to the VolumeMounts field.

View File

@@ -1,52 +0,0 @@
/*
Copyright The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
// Code generated by applyconfiguration-gen. DO NOT EDIT.
package v1
import (
v1 "k8s.io/api/core/v1"
)
// ContainerResizePolicyApplyConfiguration represents an declarative configuration of the ContainerResizePolicy type for use
// with apply.
type ContainerResizePolicyApplyConfiguration struct {
ResourceName *v1.ResourceName `json:"resourceName,omitempty"`
Policy *v1.ResourceResizePolicy `json:"policy,omitempty"`
}
// ContainerResizePolicyApplyConfiguration constructs an declarative configuration of the ContainerResizePolicy type for use with
// apply.
func ContainerResizePolicy() *ContainerResizePolicyApplyConfiguration {
return &ContainerResizePolicyApplyConfiguration{}
}
// WithResourceName sets the ResourceName field in the declarative configuration to the given value
// and returns the receiver, so that objects can be built by chaining "With" function invocations.
// If called multiple times, the ResourceName field is set to the value of the last call.
func (b *ContainerResizePolicyApplyConfiguration) WithResourceName(value v1.ResourceName) *ContainerResizePolicyApplyConfiguration {
b.ResourceName = &value
return b
}
// WithPolicy sets the Policy field in the declarative configuration to the given value
// and returns the receiver, so that objects can be built by chaining "With" function invocations.
// If called multiple times, the Policy field is set to the value of the last call.
func (b *ContainerResizePolicyApplyConfiguration) WithPolicy(value v1.ResourceResizePolicy) *ContainerResizePolicyApplyConfiguration {
b.Policy = &value
return b
}

View File

@@ -18,24 +18,18 @@ limitations under the License.
package v1
import (
corev1 "k8s.io/api/core/v1"
)
// ContainerStatusApplyConfiguration represents an declarative configuration of the ContainerStatus type for use
// with apply.
type ContainerStatusApplyConfiguration struct {
Name *string `json:"name,omitempty"`
State *ContainerStateApplyConfiguration `json:"state,omitempty"`
LastTerminationState *ContainerStateApplyConfiguration `json:"lastState,omitempty"`
Ready *bool `json:"ready,omitempty"`
RestartCount *int32 `json:"restartCount,omitempty"`
Image *string `json:"image,omitempty"`
ImageID *string `json:"imageID,omitempty"`
ContainerID *string `json:"containerID,omitempty"`
Started *bool `json:"started,omitempty"`
ResourcesAllocated *corev1.ResourceList `json:"resourcesAllocated,omitempty"`
Resources *ResourceRequirementsApplyConfiguration `json:"resources,omitempty"`
Name *string `json:"name,omitempty"`
State *ContainerStateApplyConfiguration `json:"state,omitempty"`
LastTerminationState *ContainerStateApplyConfiguration `json:"lastState,omitempty"`
Ready *bool `json:"ready,omitempty"`
RestartCount *int32 `json:"restartCount,omitempty"`
Image *string `json:"image,omitempty"`
ImageID *string `json:"imageID,omitempty"`
ContainerID *string `json:"containerID,omitempty"`
Started *bool `json:"started,omitempty"`
}
// ContainerStatusApplyConfiguration constructs an declarative configuration of the ContainerStatus type for use with
@@ -115,19 +109,3 @@ func (b *ContainerStatusApplyConfiguration) WithStarted(value bool) *ContainerSt
b.Started = &value
return b
}
// WithResourcesAllocated sets the ResourcesAllocated field in the declarative configuration to the given value
// and returns the receiver, so that objects can be built by chaining "With" function invocations.
// If called multiple times, the ResourcesAllocated field is set to the value of the last call.
func (b *ContainerStatusApplyConfiguration) WithResourcesAllocated(value corev1.ResourceList) *ContainerStatusApplyConfiguration {
b.ResourcesAllocated = &value
return b
}
// WithResources sets the Resources field in the declarative configuration to the given value
// and returns the receiver, so that objects can be built by chaining "With" function invocations.
// If called multiple times, the Resources field is set to the value of the last call.
func (b *ContainerStatusApplyConfiguration) WithResources(value *ResourceRequirementsApplyConfiguration) *ContainerStatusApplyConfiguration {
b.Resources = value
return b
}

View File

@@ -126,19 +126,6 @@ func (b *EphemeralContainerApplyConfiguration) WithResources(value *ResourceRequ
return b
}
// WithResizePolicy adds the given value to the ResizePolicy field in the declarative configuration
// and returns the receiver, so that objects can be build by chaining "With" function invocations.
// If called multiple times, values provided by each call will be appended to the ResizePolicy field.
func (b *EphemeralContainerApplyConfiguration) WithResizePolicy(values ...*ContainerResizePolicyApplyConfiguration) *EphemeralContainerApplyConfiguration {
for i := range values {
if values[i] == nil {
panic("nil value passed to WithResizePolicy")
}
b.ResizePolicy = append(b.ResizePolicy, *values[i])
}
return b
}
// WithVolumeMounts adds the given value to the VolumeMounts field in the declarative configuration
// and returns the receiver, so that objects can be build by chaining "With" function invocations.
// If called multiple times, values provided by each call will be appended to the VolumeMounts field.

View File

@@ -25,29 +25,28 @@ import (
// EphemeralContainerCommonApplyConfiguration represents an declarative configuration of the EphemeralContainerCommon type for use
// with apply.
type EphemeralContainerCommonApplyConfiguration struct {
Name *string `json:"name,omitempty"`
Image *string `json:"image,omitempty"`
Command []string `json:"command,omitempty"`
Args []string `json:"args,omitempty"`
WorkingDir *string `json:"workingDir,omitempty"`
Ports []ContainerPortApplyConfiguration `json:"ports,omitempty"`
EnvFrom []EnvFromSourceApplyConfiguration `json:"envFrom,omitempty"`
Env []EnvVarApplyConfiguration `json:"env,omitempty"`
Resources *ResourceRequirementsApplyConfiguration `json:"resources,omitempty"`
ResizePolicy []ContainerResizePolicyApplyConfiguration `json:"resizePolicy,omitempty"`
VolumeMounts []VolumeMountApplyConfiguration `json:"volumeMounts,omitempty"`
VolumeDevices []VolumeDeviceApplyConfiguration `json:"volumeDevices,omitempty"`
LivenessProbe *ProbeApplyConfiguration `json:"livenessProbe,omitempty"`
ReadinessProbe *ProbeApplyConfiguration `json:"readinessProbe,omitempty"`
StartupProbe *ProbeApplyConfiguration `json:"startupProbe,omitempty"`
Lifecycle *LifecycleApplyConfiguration `json:"lifecycle,omitempty"`
TerminationMessagePath *string `json:"terminationMessagePath,omitempty"`
TerminationMessagePolicy *corev1.TerminationMessagePolicy `json:"terminationMessagePolicy,omitempty"`
ImagePullPolicy *corev1.PullPolicy `json:"imagePullPolicy,omitempty"`
SecurityContext *SecurityContextApplyConfiguration `json:"securityContext,omitempty"`
Stdin *bool `json:"stdin,omitempty"`
StdinOnce *bool `json:"stdinOnce,omitempty"`
TTY *bool `json:"tty,omitempty"`
Name *string `json:"name,omitempty"`
Image *string `json:"image,omitempty"`
Command []string `json:"command,omitempty"`
Args []string `json:"args,omitempty"`
WorkingDir *string `json:"workingDir,omitempty"`
Ports []ContainerPortApplyConfiguration `json:"ports,omitempty"`
EnvFrom []EnvFromSourceApplyConfiguration `json:"envFrom,omitempty"`
Env []EnvVarApplyConfiguration `json:"env,omitempty"`
Resources *ResourceRequirementsApplyConfiguration `json:"resources,omitempty"`
VolumeMounts []VolumeMountApplyConfiguration `json:"volumeMounts,omitempty"`
VolumeDevices []VolumeDeviceApplyConfiguration `json:"volumeDevices,omitempty"`
LivenessProbe *ProbeApplyConfiguration `json:"livenessProbe,omitempty"`
ReadinessProbe *ProbeApplyConfiguration `json:"readinessProbe,omitempty"`
StartupProbe *ProbeApplyConfiguration `json:"startupProbe,omitempty"`
Lifecycle *LifecycleApplyConfiguration `json:"lifecycle,omitempty"`
TerminationMessagePath *string `json:"terminationMessagePath,omitempty"`
TerminationMessagePolicy *corev1.TerminationMessagePolicy `json:"terminationMessagePolicy,omitempty"`
ImagePullPolicy *corev1.PullPolicy `json:"imagePullPolicy,omitempty"`
SecurityContext *SecurityContextApplyConfiguration `json:"securityContext,omitempty"`
Stdin *bool `json:"stdin,omitempty"`
StdinOnce *bool `json:"stdinOnce,omitempty"`
TTY *bool `json:"tty,omitempty"`
}
// EphemeralContainerCommonApplyConfiguration constructs an declarative configuration of the EphemeralContainerCommon type for use with
@@ -147,19 +146,6 @@ func (b *EphemeralContainerCommonApplyConfiguration) WithResources(value *Resour
return b
}
// WithResizePolicy adds the given value to the ResizePolicy field in the declarative configuration
// and returns the receiver, so that objects can be build by chaining "With" function invocations.
// If called multiple times, values provided by each call will be appended to the ResizePolicy field.
func (b *EphemeralContainerCommonApplyConfiguration) WithResizePolicy(values ...*ContainerResizePolicyApplyConfiguration) *EphemeralContainerCommonApplyConfiguration {
for i := range values {
if values[i] == nil {
panic("nil value passed to WithResizePolicy")
}
b.ResizePolicy = append(b.ResizePolicy, *values[i])
}
return b
}
// WithVolumeMounts adds the given value to the VolumeMounts field in the declarative configuration
// and returns the receiver, so that objects can be build by chaining "With" function invocations.
// If called multiple times, values provided by each call will be appended to the VolumeMounts field.

View File

@@ -39,7 +39,6 @@ type PodStatusApplyConfiguration struct {
ContainerStatuses []ContainerStatusApplyConfiguration `json:"containerStatuses,omitempty"`
QOSClass *v1.PodQOSClass `json:"qosClass,omitempty"`
EphemeralContainerStatuses []ContainerStatusApplyConfiguration `json:"ephemeralContainerStatuses,omitempty"`
Resize *v1.PodResizeStatus `json:"resize,omitempty"`
}
// PodStatusApplyConfiguration constructs an declarative configuration of the PodStatus type for use with
@@ -176,11 +175,3 @@ func (b *PodStatusApplyConfiguration) WithEphemeralContainerStatuses(values ...*
}
return b
}
// WithResize sets the Resize field in the declarative configuration to the given value
// and returns the receiver, so that objects can be built by chaining "With" function invocations.
// If called multiple times, the Resize field is set to the value of the last call.
func (b *PodStatusApplyConfiguration) WithResize(value v1.PodResizeStatus) *PodStatusApplyConfiguration {
b.Resize = &value
return b
}

View File

@@ -4129,12 +4129,6 @@ var schemaYAML = typed.YAMLObject(`types:
- name: readinessProbe
type:
namedType: io.k8s.api.core.v1.Probe
- name: resizePolicy
type:
list:
elementType:
namedType: io.k8s.api.core.v1.ContainerResizePolicy
elementRelationship: atomic
- name: resources
type:
namedType: io.k8s.api.core.v1.ResourceRequirements
@@ -4211,17 +4205,6 @@ var schemaYAML = typed.YAMLObject(`types:
type:
scalar: string
default: TCP
- name: io.k8s.api.core.v1.ContainerResizePolicy
map:
fields:
- name: policy
type:
scalar: string
default: ""
- name: resourceName
type:
scalar: string
default: ""
- name: io.k8s.api.core.v1.ContainerState
map:
fields:
@@ -4303,14 +4286,6 @@ var schemaYAML = typed.YAMLObject(`types:
type:
scalar: boolean
default: false
- name: resources
type:
namedType: io.k8s.api.core.v1.ResourceRequirements
- name: resourcesAllocated
type:
map:
elementType:
namedType: io.k8s.apimachinery.pkg.api.resource.Quantity
- name: restartCount
type:
scalar: numeric
@@ -4546,12 +4521,6 @@ var schemaYAML = typed.YAMLObject(`types:
- name: readinessProbe
type:
namedType: io.k8s.api.core.v1.Probe
- name: resizePolicy
type:
list:
elementType:
namedType: io.k8s.api.core.v1.ContainerResizePolicy
elementRelationship: atomic
- name: resources
type:
namedType: io.k8s.api.core.v1.ResourceRequirements
@@ -6216,9 +6185,6 @@ var schemaYAML = typed.YAMLObject(`types:
- name: reason
type:
scalar: string
- name: resize
type:
scalar: string
- name: startTime
type:
namedType: io.k8s.apimachinery.pkg.apis.meta.v1.Time
@@ -11449,8 +11415,6 @@ var schemaYAML = typed.YAMLObject(`types:
elementType:
namedType: io.k8s.api.resource.v1alpha1.ResourceClaimConsumerReference
elementRelationship: associative
keys:
- uid
- name: io.k8s.api.resource.v1alpha1.ResourceClaimTemplate
map:
fields:

View File

@@ -579,8 +579,6 @@ func ForKind(kind schema.GroupVersionKind) interface{} {
return &applyconfigurationscorev1.ContainerImageApplyConfiguration{}
case corev1.SchemeGroupVersion.WithKind("ContainerPort"):
return &applyconfigurationscorev1.ContainerPortApplyConfiguration{}
case corev1.SchemeGroupVersion.WithKind("ContainerResizePolicy"):
return &applyconfigurationscorev1.ContainerResizePolicyApplyConfiguration{}
case corev1.SchemeGroupVersion.WithKind("ContainerState"):
return &applyconfigurationscorev1.ContainerStateApplyConfiguration{}
case corev1.SchemeGroupVersion.WithKind("ContainerStateRunning"):

View File

@@ -61,12 +61,6 @@ type dynamicSharedInformerFactory struct {
// This allows Start() to be called multiple times safely.
startedInformers map[schema.GroupVersionResource]bool
tweakListOptions TweakListOptionsFunc
// wg tracks how many goroutines were started.
wg sync.WaitGroup
// shuttingDown is true when Shutdown has been called. It may still be running
// because it needs to wait for goroutines.
shuttingDown bool
}
var _ DynamicSharedInformerFactory = &dynamicSharedInformerFactory{}
@@ -92,21 +86,9 @@ func (f *dynamicSharedInformerFactory) Start(stopCh <-chan struct{}) {
f.lock.Lock()
defer f.lock.Unlock()
if f.shuttingDown {
return
}
for informerType, informer := range f.informers {
if !f.startedInformers[informerType] {
f.wg.Add(1)
// We need a new variable in each loop iteration,
// otherwise the goroutine would use the loop variable
// and that keeps changing.
informer := informer.Informer()
go func() {
defer f.wg.Done()
informer.Run(stopCh)
}()
go informer.Informer().Run(stopCh)
f.startedInformers[informerType] = true
}
}
@@ -134,15 +116,6 @@ func (f *dynamicSharedInformerFactory) WaitForCacheSync(stopCh <-chan struct{})
return res
}
func (f *dynamicSharedInformerFactory) Shutdown() {
// Will return immediately if there is nothing to wait for.
defer f.wg.Wait()
f.lock.Lock()
defer f.lock.Unlock()
f.shuttingDown = true
}
// NewFilteredDynamicInformer constructs a new informer for a dynamic type.
func NewFilteredDynamicInformer(client dynamic.Interface, gvr schema.GroupVersionResource, namespace string, resyncPeriod time.Duration, indexers cache.Indexers, tweakListOptions TweakListOptionsFunc) informers.GenericInformer {
return &dynamicInformer{

View File

@@ -24,28 +24,9 @@ import (
// DynamicSharedInformerFactory provides access to a shared informer and lister for dynamic client
type DynamicSharedInformerFactory interface {
// Start initializes all requested informers. They are handled in goroutines
// which run until the stop channel gets closed.
Start(stopCh <-chan struct{})
// ForResource gives generic access to a shared informer of the matching type.
ForResource(gvr schema.GroupVersionResource) informers.GenericInformer
// WaitForCacheSync blocks until all started informers' caches were synced
// or the stop channel gets closed.
WaitForCacheSync(stopCh <-chan struct{}) map[schema.GroupVersionResource]bool
// Shutdown marks a factory as shutting down. At that point no new
// informers can be started anymore and Start will return without
// doing anything.
//
// In addition, Shutdown blocks until all goroutines have terminated. For that
// to happen, the close channel(s) that they were started with must be closed,
// either before Shutdown gets called or while it is waiting.
//
// Shutdown may be called multiple times, even concurrently. All such calls will
// block until all goroutines have terminated.
Shutdown()
}
// TweakListOptionsFunc defines the signature of a helper function

24
go.mod
View File

@@ -19,17 +19,16 @@ require (
github.com/peterbourgon/diskv v2.0.1+incompatible
github.com/spf13/pflag v1.0.5
github.com/stretchr/testify v1.8.1
golang.org/x/net v0.7.0
golang.org/x/net v0.4.0
golang.org/x/oauth2 v0.0.0-20220223155221-ee480838109b
golang.org/x/term v0.5.0
golang.org/x/term v0.3.0
golang.org/x/time v0.0.0-20220210224613-90d013bbcef8
google.golang.org/protobuf v1.28.1
k8s.io/api v0.27.0-alpha.3
k8s.io/apimachinery v0.27.0-alpha.3
k8s.io/klog/v2 v2.90.1
k8s.io/kube-openapi v0.0.0-20230123231816-1cb3ae25d79a
k8s.io/utils v0.0.0-20230209194617-a36077c30491
sigs.k8s.io/json v0.0.0-20220713155537-f223a00ba0e2
k8s.io/api v0.27.0-alpha.1
k8s.io/apimachinery v0.27.0-alpha.1
k8s.io/klog/v2 v2.80.1
k8s.io/kube-openapi v0.0.0-20230109183929-3758b55a6596
k8s.io/utils v0.0.0-20221107191617-1a15be271d1d
sigs.k8s.io/structured-merge-diff/v4 v4.2.3
sigs.k8s.io/yaml v1.3.0
)
@@ -50,15 +49,16 @@ require (
github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822 // indirect
github.com/pkg/errors v0.9.1 // indirect
github.com/pmezard/go-difflib v1.0.0 // indirect
golang.org/x/sys v0.5.0 // indirect
golang.org/x/text v0.7.0 // indirect
golang.org/x/sys v0.3.0 // indirect
golang.org/x/text v0.5.0 // indirect
google.golang.org/appengine v1.6.7 // indirect
gopkg.in/inf.v0 v0.9.1 // indirect
gopkg.in/yaml.v2 v2.4.0 // indirect
gopkg.in/yaml.v3 v3.0.1 // indirect
sigs.k8s.io/json v0.0.0-20220713155537-f223a00ba0e2 // indirect
)
replace (
k8s.io/api => k8s.io/api v0.27.0-alpha.3
k8s.io/apimachinery => k8s.io/apimachinery v0.27.0-alpha.3
k8s.io/api => k8s.io/api v0.27.0-alpha.1
k8s.io/apimachinery => k8s.io/apimachinery v0.27.0-alpha.1
)

38
go.sum
View File

@@ -165,7 +165,7 @@ github.com/modern-go/reflect2 v1.0.2/go.mod h1:yWuevngMOJpCy52FWWMvUC8ws7m/LJsjY
github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822 h1:C3w9PqII01/Oq1c1nUAm88MOHcQC9l5mIlSMApZMrHA=
github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822/go.mod h1:+n7T8mK8HuQTcFwEeznm/DIxMOiR9yIdICNftLE1DvQ=
github.com/onsi/ginkgo/v2 v2.7.0 h1:/XxtEV3I3Eif/HobnVx9YmJgk8ENdRsuUmM+fLCFNow=
github.com/onsi/gomega v1.26.0 h1:03cDLK28U6hWvCAns6NeydX3zIm4SF3ci69ulidS32Q=
github.com/onsi/gomega v1.24.2 h1:J/tulyYK6JwBldPViHJReihxxZ+22FHs0piGjQAvoUE=
github.com/peterbourgon/diskv v2.0.1+incompatible h1:UBdAOUP5p4RWqPBg048CAvpKN+vxiaj6gdUUzhl4XmI=
github.com/peterbourgon/diskv v2.0.1+incompatible/go.mod h1:uqqh8zWWbv1HBMNONnaR/tNboyR3/BZd58JJSHlUSCU=
github.com/pkg/errors v0.9.1 h1:FEBLx1zS214owpjy7qsBeixbURkuhQAwrK5UwLGTwt4=
@@ -260,8 +260,8 @@ golang.org/x/net v0.0.0-20200707034311-ab3426394381/go.mod h1:/O7V0waA8r7cgGh81R
golang.org/x/net v0.0.0-20200822124328-c89045814202/go.mod h1:/O7V0waA8r7cgGh81Ro3o1hOxt32SMVPicZroKQ2sZA=
golang.org/x/net v0.0.0-20201021035429-f5854403a974/go.mod h1:sp8m0HH+o8qH0wwXwYZr8TS3Oi6o0r6Gce1SSxlDquU=
golang.org/x/net v0.0.0-20220127200216-cd36cc0744dd/go.mod h1:CfG3xpIq0wQ8r1q4Su4UZFWDARRcnwPjda9FqA0JpMk=
golang.org/x/net v0.7.0 h1:rJrUqqhjsgNp7KqAIc25s9pZnjU7TUcSY7HcVZjdn1g=
golang.org/x/net v0.7.0/go.mod h1:2Tu9+aMcznHK/AK1HMvgo6xiTLG5rD5rZLDS+rp2Bjs=
golang.org/x/net v0.4.0 h1:Q5QPcMlvfxFTAPV0+07Xz/MpK9NTXu2VDUuy0FeMfaU=
golang.org/x/net v0.4.0/go.mod h1:MBQ8lrhLObU/6UmLb4fmbmk5OcyYmqtbGd/9yIeKjEE=
golang.org/x/oauth2 v0.0.0-20180821212333-d2e6202438be/go.mod h1:N/0e6XlmueqKjAGxoOufVs8QHGRruUQn6yWY3a++T0U=
golang.org/x/oauth2 v0.0.0-20190226205417-e64efc72b421/go.mod h1:gOpvHmFTYa4IltrdGE7lF6nIHvwfUNPOp7c8zoXwtLw=
golang.org/x/oauth2 v0.0.0-20190604053449-0f29369cfe45/go.mod h1:gOpvHmFTYa4IltrdGE7lF6nIHvwfUNPOp7c8zoXwtLw=
@@ -306,19 +306,19 @@ golang.org/x/sys v0.0.0-20200803210538-64077c9b5642/go.mod h1:h1NjWce9XRLGQEsW7w
golang.org/x/sys v0.0.0-20200930185726-fdedc70b468f/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20210615035016-665e8c7367d1/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.0.0-20211216021012-1d35b9e2eb4e/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.5.0 h1:MUK/U/4lj1t1oPg0HfuXDN/Z1wv31ZJ/YcPiGccS4DU=
golang.org/x/sys v0.5.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.3.0 h1:w8ZOecv6NaNa/zC8944JTU3vz4u6Lagfk4RPQxv92NQ=
golang.org/x/sys v0.3.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/term v0.0.0-20210927222741-03fcf44c2211/go.mod h1:jbD1KX2456YbFQfuXm/mYQcufACuNUgVhRMnK/tPxf8=
golang.org/x/term v0.5.0 h1:n2a8QNdAb0sZNpU9R1ALUXBbY+w51fCQDN+7EdxNBsY=
golang.org/x/term v0.5.0/go.mod h1:jMB1sMXY+tzblOD4FWmEbocvup2/aLOaQEp7JmGp78k=
golang.org/x/term v0.3.0 h1:qoo4akIqOcDME5bhc/NgxUdovd6BSS2uMsVjB56q1xI=
golang.org/x/term v0.3.0/go.mod h1:q750SLmJuPmVoN1blW3UFBPREJfb1KmY3vwxfr+nFDA=
golang.org/x/text v0.0.0-20170915032832-14c0d48ead0c/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
golang.org/x/text v0.3.1-0.20180807135948-17ff2d5776d2/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
golang.org/x/text v0.3.2/go.mod h1:bEr9sfX3Q8Zfm5fL9x+3itogRgK3+ptLWKqgva+5dAk=
golang.org/x/text v0.3.3/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ=
golang.org/x/text v0.3.7/go.mod h1:u+2+/6zg+i71rQMx5EYifcz6MCKuco9NR6JIITiCfzQ=
golang.org/x/text v0.7.0 h1:4BRB4x83lYWy72KwLD/qYDuTu7q9PjSagHvijDw7cLo=
golang.org/x/text v0.7.0/go.mod h1:mrYo+phRRbMaCq/xk9113O4dZlRixOauAjOtrjsXDZ8=
golang.org/x/text v0.5.0 h1:OLmvp0KP+FVG99Ct/qFiL/Fhk4zp4QQnZ7b2U+5piUM=
golang.org/x/text v0.5.0/go.mod h1:mrYo+phRRbMaCq/xk9113O4dZlRixOauAjOtrjsXDZ8=
golang.org/x/time v0.0.0-20181108054448-85acf8d2951c/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ=
golang.org/x/time v0.0.0-20190308202827-9d24e82272b4/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ=
golang.org/x/time v0.0.0-20191024005414-555d28b269f0/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ=
@@ -473,16 +473,16 @@ honnef.co/go/tools v0.0.0-20190523083050-ea95bdfd59fc/go.mod h1:rf3lG4BRIbNafJWh
honnef.co/go/tools v0.0.1-2019.2.3/go.mod h1:a3bituU0lyd329TUQxRnasdCoJDkEUEAqEt0JzvZhAg=
honnef.co/go/tools v0.0.1-2020.1.3/go.mod h1:X/FiERA/W4tHapMX5mGpAtMSVEeEUOyHaw9vFzvIQ3k=
honnef.co/go/tools v0.0.1-2020.1.4/go.mod h1:X/FiERA/W4tHapMX5mGpAtMSVEeEUOyHaw9vFzvIQ3k=
k8s.io/api v0.27.0-alpha.3 h1:YrTtpNq3KBAjG5h3OL9kI8ZyEX1NhgqrR1gRyTxd3C0=
k8s.io/api v0.27.0-alpha.3/go.mod h1:BWVtV/y+JWOxq8+/F9Fk4rv506SN3kW4JJsI2aFaHnk=
k8s.io/apimachinery v0.27.0-alpha.3 h1:uujqsdFrbqF+cEbqFHrkLKp+s3XxRgphTpc6Yg84qLo=
k8s.io/apimachinery v0.27.0-alpha.3/go.mod h1:TO4higCGNMwebVSdb1XPJdXMU4kk+nmMY/cTMVCGa6M=
k8s.io/klog/v2 v2.90.1 h1:m4bYOKall2MmOiRaR1J+We67Do7vm9KiQVlT96lnHUw=
k8s.io/klog/v2 v2.90.1/go.mod h1:y1WjHnz7Dj687irZUWR/WLkLc5N1YHtjLdmgWjndZn0=
k8s.io/kube-openapi v0.0.0-20230123231816-1cb3ae25d79a h1:s6zvHjyDQX1NtVT88pvw2tddqhqY0Bz0Gbnn+yctsFU=
k8s.io/kube-openapi v0.0.0-20230123231816-1cb3ae25d79a/go.mod h1:/BYxry62FuDzmI+i9B+X2pqfySRmSOW2ARmj5Zbqhj0=
k8s.io/utils v0.0.0-20230209194617-a36077c30491 h1:r0BAOLElQnnFhE/ApUsg3iHdVYYPBjNSSOMowRZxxsY=
k8s.io/utils v0.0.0-20230209194617-a36077c30491/go.mod h1:OLgZIPagt7ERELqWJFomSt595RzquPNLL48iOWgYOg0=
k8s.io/api v0.27.0-alpha.1 h1:4L3MEcje+LTMfkZrRaYOzA5a0MTvv3uqonblgQZ39E8=
k8s.io/api v0.27.0-alpha.1/go.mod h1:pUUR9UVsje2ip8mF3GilkKhUt21HVrMB0x0Mzta5HQw=
k8s.io/apimachinery v0.27.0-alpha.1 h1:q8VsOXO0tQsvrz6QSvolMvlciOFCRpCgQBD1lw/MHSo=
k8s.io/apimachinery v0.27.0-alpha.1/go.mod h1:mb1AP2xs7Ajs+OvXRynwIgKVID9/rOtI7lFxIixvFp0=
k8s.io/klog/v2 v2.80.1 h1:atnLQ121W371wYYFawwYx1aEY2eUfs4l3J72wtgAwV4=
k8s.io/klog/v2 v2.80.1/go.mod h1:y1WjHnz7Dj687irZUWR/WLkLc5N1YHtjLdmgWjndZn0=
k8s.io/kube-openapi v0.0.0-20230109183929-3758b55a6596 h1:8cNCQs+WqqnSpZ7y0LMQPKD+RZUHU17VqLPMW3qxnxc=
k8s.io/kube-openapi v0.0.0-20230109183929-3758b55a6596/go.mod h1:/BYxry62FuDzmI+i9B+X2pqfySRmSOW2ARmj5Zbqhj0=
k8s.io/utils v0.0.0-20221107191617-1a15be271d1d h1:0Smp/HP1OH4Rvhe+4B8nWGERtlqAGSftbSbbmm45oFs=
k8s.io/utils v0.0.0-20221107191617-1a15be271d1d/go.mod h1:OLgZIPagt7ERELqWJFomSt595RzquPNLL48iOWgYOg0=
rsc.io/binaryregexp v0.2.0/go.mod h1:qTv7/COck+e2FymRvadv62gMdZztPaShugOCi3I+8D8=
rsc.io/quote/v3 v3.1.0/go.mod h1:yEA65RcK8LyAZtP9Kv3t0HmxON59tX3rD+tICJqUlj0=
rsc.io/sampler v1.3.0/go.mod h1:T1hPZKmBbMNahiBKFy5HrXp6adAjACjK9JXDnKaTXpA=

View File

@@ -60,11 +60,6 @@ type metadataSharedInformerFactory struct {
// This allows Start() to be called multiple times safely.
startedInformers map[schema.GroupVersionResource]bool
tweakListOptions TweakListOptionsFunc
// wg tracks how many goroutines were started.
wg sync.WaitGroup
// shuttingDown is true when Shutdown has been called. It may still be running
// because it needs to wait for goroutines.
shuttingDown bool
}
var _ SharedInformerFactory = &metadataSharedInformerFactory{}
@@ -90,21 +85,9 @@ func (f *metadataSharedInformerFactory) Start(stopCh <-chan struct{}) {
f.lock.Lock()
defer f.lock.Unlock()
if f.shuttingDown {
return
}
for informerType, informer := range f.informers {
if !f.startedInformers[informerType] {
f.wg.Add(1)
// We need a new variable in each loop iteration,
// otherwise the goroutine would use the loop variable
// and that keeps changing.
informer := informer.Informer()
go func() {
defer f.wg.Done()
informer.Run(stopCh)
}()
go informer.Informer().Run(stopCh)
f.startedInformers[informerType] = true
}
}
@@ -132,15 +115,6 @@ func (f *metadataSharedInformerFactory) WaitForCacheSync(stopCh <-chan struct{})
return res
}
func (f *metadataSharedInformerFactory) Shutdown() {
// Will return immediately if there is nothing to wait for.
defer f.wg.Wait()
f.lock.Lock()
defer f.lock.Unlock()
f.shuttingDown = true
}
// NewFilteredMetadataInformer constructs a new informer for a metadata type.
func NewFilteredMetadataInformer(client metadata.Interface, gvr schema.GroupVersionResource, namespace string, resyncPeriod time.Duration, indexers cache.Indexers, tweakListOptions TweakListOptionsFunc) informers.GenericInformer {
return &metadataInformer{

View File

@@ -24,28 +24,9 @@ import (
// SharedInformerFactory provides access to a shared informer and lister for dynamic client
type SharedInformerFactory interface {
// Start initializes all requested informers. They are handled in goroutines
// which run until the stop channel gets closed.
Start(stopCh <-chan struct{})
// ForResource gives generic access to a shared informer of the matching type.
ForResource(gvr schema.GroupVersionResource) informers.GenericInformer
// WaitForCacheSync blocks until all started informers' caches were synced
// or the stop channel gets closed.
WaitForCacheSync(stopCh <-chan struct{}) map[schema.GroupVersionResource]bool
// Shutdown marks a factory as shutting down. At that point no new
// informers can be started anymore and Start will return without
// doing anything.
//
// In addition, Shutdown blocks until all goroutines have terminated. For that
// to happen, the close channel(s) that they were started with must be closed,
// either before Shutdown gets called or while it is waiting.
//
// Shutdown may be called multiple times, even concurrently. All such calls will
// block until all goroutines have terminated.
Shutdown()
}
// TweakListOptionsFunc defines the signature of a helper function

View File

@@ -1,79 +0,0 @@
/*
Copyright 2023 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package openapitest
import (
"fmt"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/client-go/openapi"
)
// FakeClient implements openapi.Client interface, with hard-coded
// return values, including the possibility to force errors.
type FakeClient struct {
// Hard-coded paths to return from Paths() function.
PathsMap map[string]openapi.GroupVersion
// Hard-coded returned error.
ForcedErr error
}
// Validate FakeClient implements openapi.Client interface.
var _ openapi.Client = &FakeClient{}
// NewFakeClient returns a fake openapi client with an empty PathsMap.
func NewFakeClient() *FakeClient {
return &FakeClient{PathsMap: make(map[string]openapi.GroupVersion)}
}
// Paths returns stored PathsMap field, creating an empty one if
// it does not already exist. If ForcedErr is set, this function
// returns the error instead.
func (f FakeClient) Paths() (map[string]openapi.GroupVersion, error) {
if f.ForcedErr != nil {
return nil, f.ForcedErr
}
return f.PathsMap, nil
}
// FakeGroupVersion implements openapi.GroupVersion with hard-coded
// return GroupVersion specification bytes. If ForcedErr is set, then
// "Schema()" function returns the error instead of the GVSpec.
type FakeGroupVersion struct {
// Hard-coded GroupVersion specification
GVSpec []byte
// Hard-coded returned error.
ForcedErr error
}
// FileOpenAPIGroupVersion implements the openapi.GroupVersion interface.
var _ openapi.GroupVersion = &FakeGroupVersion{}
// Schema returns the hard-coded byte slice, including creating an
// empty slice if it has not been set yet. If the ForcedErr is set,
// this function returns the error instead of the GVSpec field. If
// content type other than application/json is passed, and error is
// returned.
func (f FakeGroupVersion) Schema(contentType string) ([]byte, error) {
if contentType != runtime.ContentTypeJSON {
return nil, fmt.Errorf("application/json is only content type supported: %s", contentType)
}
if f.ForcedErr != nil {
return nil, f.ForcedErr
}
return f.GVSpec, nil
}

View File

@@ -1,107 +0,0 @@
/*
Copyright 2023 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package openapitest
import (
"embed"
"errors"
"path/filepath"
"strings"
"sync"
"testing"
"k8s.io/client-go/openapi"
)
//go:embed testdata/*_openapi.json
var f embed.FS
// NewFileClient returns a test client implementing the openapi.Client
// interface, which serves a subset of hard-coded GroupVersion
// Open API V3 specifications files. The subset of specifications is
// located in the "testdata" subdirectory.
func NewFileClient(t *testing.T) openapi.Client {
if t == nil {
panic("non-nil testing.T required; this package is only for use in tests")
}
return &fileClient{t: t}
}
type fileClient struct {
t *testing.T
init sync.Once
paths map[string]openapi.GroupVersion
err error
}
// fileClient implements the openapi.Client interface.
var _ openapi.Client = &fileClient{}
// Paths returns a map of api path string to openapi.GroupVersion or
// an error. The OpenAPI V3 GroupVersion specifications are hard-coded
// in the "testdata" subdirectory. The api path is derived from the
// spec filename. Example:
//
// apis__apps__v1_openapi.json -> apis/apps/v1
//
// The file contents are read only once. All files must parse correctly
// into an api path, or an error is returned.
func (t *fileClient) Paths() (map[string]openapi.GroupVersion, error) {
t.init.Do(func() {
t.paths = map[string]openapi.GroupVersion{}
entries, err := f.ReadDir("testdata")
if err != nil {
t.err = err
t.t.Error(err)
}
for _, e := range entries {
// this reverses the transformation done in hack/update-openapi-spec.sh
path := strings.ReplaceAll(strings.TrimSuffix(e.Name(), "_openapi.json"), "__", "/")
t.paths[path] = &fileGroupVersion{t: t.t, filename: filepath.Join("testdata", e.Name())}
}
})
return t.paths, t.err
}
type fileGroupVersion struct {
t *testing.T
init sync.Once
filename string
data []byte
err error
}
// fileGroupVersion implements the openapi.GroupVersion interface.
var _ openapi.GroupVersion = &fileGroupVersion{}
// Schema returns the OpenAPI V3 specification for the GroupVersion as
// unstructured bytes, or an error if the contentType is not
// "application/json" or there is an error reading the spec file. The
// file is read only once. The embedded file is located in the "testdata"
// subdirectory.
func (t *fileGroupVersion) Schema(contentType string) ([]byte, error) {
if contentType != "application/json" {
return nil, errors.New("openapitest only supports 'application/json' contentType")
}
t.init.Do(func() {
t.data, t.err = f.ReadFile(t.filename)
if t.err != nil {
t.t.Error(t.err)
}
})
return t.data, t.err
}

View File

@@ -1,66 +0,0 @@
/*
Copyright 2023 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package openapitest
import (
"k8s.io/kube-openapi/pkg/spec3"
kjson "sigs.k8s.io/json"
"testing"
)
func TestOpenAPITest(t *testing.T) {
client := NewFileClient(t)
// make sure we get paths
paths, err := client.Paths()
if err != nil {
t.Fatalf("error fetching paths: %v", err)
}
if len(paths) == 0 {
t.Error("empty paths")
}
// spot check specific paths
expectedPaths := []string{
"api/v1",
"apis/apps/v1",
"apis/batch/v1",
"apis/networking.k8s.io/v1alpha1",
"apis/discovery.k8s.io/v1",
}
for _, p := range expectedPaths {
if _, ok := paths[p]; !ok {
t.Fatalf("expected %s", p)
}
}
// make sure all paths can load
for path, gv := range paths {
data, err := gv.Schema("application/json")
if err != nil {
t.Fatalf("error reading schema for %v: %v", path, err)
}
o := &spec3.OpenAPI{}
stricterrs, err := kjson.UnmarshalStrict(data, o)
if err != nil {
t.Fatalf("error unmarshaling schema for %v: %v", path, err)
}
if len(stricterrs) > 0 {
t.Fatalf("strict errors unmarshaling schema for %v: %v", path, stricterrs)
}
}
}

File diff suppressed because it is too large Load Diff

File diff suppressed because it is too large Load Diff

File diff suppressed because it is too large Load Diff

File diff suppressed because it is too large Load Diff

File diff suppressed because it is too large Load Diff

View File

@@ -1,172 +0,0 @@
/*
Copyright 2023 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package openapi3
import (
"encoding/json"
"fmt"
"sort"
"strings"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/client-go/openapi"
"k8s.io/kube-openapi/pkg/spec3"
)
// Root interface defines functions implemented against the root
// OpenAPI V3 document. The root OpenAPI V3 document maps the
// API Server relative url for all GroupVersions to the relative
// url for the OpenAPI relative url. Example for single GroupVersion
// apps/v1:
//
// "apis/apps/v1": {
// "ServerRelativeURL": "/openapi/v3/apis/apps/v1?hash=<HASH>"
// }
type Root interface {
// GroupVersions returns every GroupVersion for which there is an
// OpenAPI V3 GroupVersion document. Returns an error for problems
// retrieving or parsing the OpenAPI V3 root document.
GroupVersions() ([]schema.GroupVersion, error)
// GVSpec returns the specification for all the resources in a
// GroupVersion as a pointer to a spec3.OpenAPI struct.
// Returns an error for problems retrieving or parsing the root
// document or GroupVersion OpenAPI V3 document.
GVSpec(gv schema.GroupVersion) (*spec3.OpenAPI, error)
// GVSpecAsMap returns the specification for all the resources in a
// GroupVersion as unstructured bytes. Returns an error for
// problems retrieving or parsing the root or GroupVersion
// OpenAPI V3 document.
GVSpecAsMap(gv schema.GroupVersion) (map[string]interface{}, error)
}
// root implements the Root interface, and encapsulates the
// fields to retrieve, store the parsed OpenAPI V3 root document.
type root struct {
// OpenAPI client to retrieve the OpenAPI V3 documents.
client openapi.Client
}
// Validate root implements the Root interface.
var _ Root = &root{}
// NewRoot returns a structure implementing the Root interface,
// created with the passed rest client.
func NewRoot(client openapi.Client) Root {
return &root{client: client}
}
func (r *root) GroupVersions() ([]schema.GroupVersion, error) {
paths, err := r.client.Paths()
if err != nil {
return nil, err
}
// Example GroupVersion API path: "apis/apps/v1"
gvs := make([]schema.GroupVersion, 0, len(paths))
for gvAPIPath := range paths {
gv, err := pathToGroupVersion(gvAPIPath)
if err != nil {
// Ignore paths which do not parse to GroupVersion
continue
}
gvs = append(gvs, gv)
}
// Sort GroupVersions alphabetically
sort.Slice(gvs, func(i, j int) bool {
return gvs[i].String() < gvs[j].String()
})
return gvs, nil
}
func (r *root) GVSpec(gv schema.GroupVersion) (*spec3.OpenAPI, error) {
openAPISchemaBytes, err := r.retrieveGVBytes(gv)
if err != nil {
return nil, err
}
// Unmarshal the downloaded Group/Version bytes into the spec3.OpenAPI struct.
var parsedV3Schema spec3.OpenAPI
err = json.Unmarshal(openAPISchemaBytes, &parsedV3Schema)
return &parsedV3Schema, err
}
func (r *root) GVSpecAsMap(gv schema.GroupVersion) (map[string]interface{}, error) {
gvOpenAPIBytes, err := r.retrieveGVBytes(gv)
if err != nil {
return nil, err
}
// GroupVersion bytes into unstructured map[string] -> empty interface.
var gvMap map[string]interface{}
err = json.Unmarshal(gvOpenAPIBytes, &gvMap)
return gvMap, err
}
// retrieveGVBytes returns the schema for a passed GroupVersion as an
// unstructured slice of bytes or an error if there is a problem downloading
// or if the passed GroupVersion is not supported.
func (r *root) retrieveGVBytes(gv schema.GroupVersion) ([]byte, error) {
paths, err := r.client.Paths()
if err != nil {
return nil, err
}
apiPath := gvToAPIPath(gv)
gvOpenAPI, found := paths[apiPath]
if !found {
return nil, fmt.Errorf("GroupVersion (%s) not found in OpenAPI V3 root document", gv)
}
return gvOpenAPI.Schema(runtime.ContentTypeJSON)
}
// gvToAPIPath maps the passed GroupVersion to a relative api
// server url. Example:
//
// GroupVersion{Group: "apps", Version: "v1"} -> "apis/apps/v1".
func gvToAPIPath(gv schema.GroupVersion) string {
var resourcePath string
if len(gv.Group) == 0 {
resourcePath = fmt.Sprintf("api/%s", gv.Version)
} else {
resourcePath = fmt.Sprintf("apis/%s/%s", gv.Group, gv.Version)
}
return resourcePath
}
// pathToGroupVersion is a helper function parsing the passed relative
// url into a GroupVersion.
//
// Example: apis/apps/v1 -> GroupVersion{Group: "apps", Version: "v1"}
// Example: api/v1 -> GroupVersion{Group: "", Version: "v1"}
func pathToGroupVersion(path string) (schema.GroupVersion, error) {
var gv schema.GroupVersion
parts := strings.Split(path, "/")
if len(parts) < 2 {
return gv, fmt.Errorf("Unable to parse api relative path: %s", path)
}
apiPrefix := parts[0]
if apiPrefix == "apis" {
// Example: apis/apps (without version)
if len(parts) < 3 {
return gv, fmt.Errorf("Group without Version not allowed")
}
gv.Group = parts[1]
gv.Version = parts[2]
} else if apiPrefix == "api" {
gv.Version = parts[1]
} else {
return gv, fmt.Errorf("Unable to parse api relative path: %s", path)
}
return gv, nil
}

View File

@@ -1,319 +0,0 @@
/*
Copyright 2023 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package openapi3
import (
"fmt"
"reflect"
"testing"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/client-go/openapi"
"k8s.io/client-go/openapi/openapitest"
)
func TestOpenAPIV3Root_GroupVersions(t *testing.T) {
tests := []struct {
name string
paths map[string]openapi.GroupVersion
expectedGVs []schema.GroupVersion
forcedErr error
}{
{
name: "OpenAPI V3 Root: No openapi.Paths() equals no GroupVersions.",
expectedGVs: []schema.GroupVersion{},
},
{
name: "OpenAPI V3 Root: Single openapi.Path equals one GroupVersion.",
paths: map[string]openapi.GroupVersion{
"apis/apps/v1": nil,
},
expectedGVs: []schema.GroupVersion{
{Group: "apps", Version: "v1"},
},
},
{
name: "OpenAPI V3 Root: Multiple openapi.Paths equals multiple GroupVersions.",
paths: map[string]openapi.GroupVersion{
"apis/apps/v1": nil,
"api/v1": nil,
"apis/batch/v1beta1": nil,
},
// Alphabetical ordering, since GV's are returned sorted.
expectedGVs: []schema.GroupVersion{
{Group: "apps", Version: "v1"},
{Group: "batch", Version: "v1beta1"},
{Group: "", Version: "v1"},
},
},
{
name: "Multiple GroupVersions, some invalid",
paths: map[string]openapi.GroupVersion{
"apis/batch/v1beta1": nil,
"api/v1": nil,
"foo/apps/v1": nil, // bad prefix
"apis/networking.k8s.io/v1alpha1": nil,
"api": nil, // No version
"apis/apps": nil, // Missing Version
"apis/apps/v1": nil,
},
// Alphabetical ordering, since GV's are returned sorted.
expectedGVs: []schema.GroupVersion{
{Group: "apps", Version: "v1"},
{Group: "batch", Version: "v1beta1"},
{Group: "networking.k8s.io", Version: "v1alpha1"},
{Group: "", Version: "v1"},
},
},
{
name: "OpenAPI V3 Root: Forced error returns error.",
forcedErr: fmt.Errorf("openapi client error"),
},
}
for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
fakeClient := openapitest.FakeClient{
PathsMap: test.paths,
ForcedErr: test.forcedErr,
}
root := NewRoot(fakeClient)
actualGVs, err := root.GroupVersions()
if test.forcedErr != nil {
require.Error(t, err)
} else {
require.NoError(t, err)
}
if !reflect.DeepEqual(test.expectedGVs, actualGVs) {
t.Errorf("expected GroupVersions (%s), got (%s): (%s)\n",
test.expectedGVs, actualGVs, err)
}
})
}
}
func TestOpenAPIV3Root_GVSpec(t *testing.T) {
tests := []struct {
name string
gv schema.GroupVersion
expectedPaths []string
err bool
}{
{
name: "OpenAPI V3 for apps/v1 works",
gv: schema.GroupVersion{Group: "apps", Version: "v1"},
expectedPaths: []string{
"/apis/apps/v1/",
"/apis/apps/v1/deployments",
"/apis/apps/v1/replicasets",
"/apis/apps/v1/daemonsets",
},
},
{
name: "OpenAPI V3 for networking/v1alpha1 works",
gv: schema.GroupVersion{Group: "networking.k8s.io", Version: "v1alpha1"},
expectedPaths: []string{
"/apis/networking.k8s.io/v1alpha1/",
},
},
{
name: "OpenAPI V3 for batch/v1 works",
gv: schema.GroupVersion{Group: "batch", Version: "v1"},
expectedPaths: []string{
"/apis/batch/v1/",
"/apis/batch/v1/jobs",
"/apis/batch/v1/cronjobs",
},
},
{
name: "OpenAPI V3 spec not found",
gv: schema.GroupVersion{Group: "not", Version: "found"},
err: true,
},
}
for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
client := openapitest.NewFileClient(t)
root := NewRoot(client)
gvSpec, err := root.GVSpec(test.gv)
if test.err {
require.Error(t, err)
return
}
require.NoError(t, err)
for _, path := range test.expectedPaths {
if _, found := gvSpec.Paths.Paths[path]; !found {
assert.True(t, found, "expected path not found (%s)\n", path)
}
}
})
}
}
func TestOpenAPIV3Root_GVSpecAsMap(t *testing.T) {
tests := []struct {
name string
gv schema.GroupVersion
expectedPaths []string
err bool
}{
{
name: "OpenAPI V3 for apps/v1 works",
gv: schema.GroupVersion{Group: "apps", Version: "v1"},
expectedPaths: []string{
"/apis/apps/v1/",
"/apis/apps/v1/deployments",
"/apis/apps/v1/replicasets",
"/apis/apps/v1/daemonsets",
},
},
{
name: "OpenAPI V3 for networking/v1alpha1 works",
gv: schema.GroupVersion{Group: "networking.k8s.io", Version: "v1alpha1"},
expectedPaths: []string{
"/apis/networking.k8s.io/v1alpha1/",
},
},
{
name: "OpenAPI V3 for batch/v1 works",
gv: schema.GroupVersion{Group: "batch", Version: "v1"},
expectedPaths: []string{
"/apis/batch/v1/",
"/apis/batch/v1/jobs",
"/apis/batch/v1/cronjobs",
},
},
{
name: "OpenAPI V3 spec not found",
gv: schema.GroupVersion{Group: "not", Version: "found"},
err: true,
},
}
for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
client := openapitest.NewFileClient(t)
root := NewRoot(client)
gvSpecAsMap, err := root.GVSpecAsMap(test.gv)
if test.err {
require.Error(t, err)
return
}
require.NoError(t, err)
for _, path := range test.expectedPaths {
pathsMap := gvSpecAsMap["paths"]
if _, found := pathsMap.(map[string]interface{})[path]; !found {
assert.True(t, found, "expected path not found (%s)\n", path)
}
}
})
}
}
func TestOpenAPIV3Root_GroupVersionToPath(t *testing.T) {
tests := []struct {
name string
groupVersion schema.GroupVersion
expectedPath string
}{
{
name: "OpenAPI V3 Root: Path to GroupVersion apps group",
groupVersion: schema.GroupVersion{
Group: "apps",
Version: "v1",
},
expectedPath: "apis/apps/v1",
},
{
name: "OpenAPI V3 Root: Path to GroupVersion batch group",
groupVersion: schema.GroupVersion{
Group: "batch",
Version: "v1beta1",
},
expectedPath: "apis/batch/v1beta1",
},
{
name: "OpenAPI V3 Root: Path to GroupVersion core group",
groupVersion: schema.GroupVersion{
Version: "v1",
},
expectedPath: "api/v1",
},
}
for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
actualPath := gvToAPIPath(test.groupVersion)
assert.Equal(t, test.expectedPath, actualPath, "expected API path (%s), got (%s)",
test.expectedPath, actualPath)
})
}
}
func TestOpenAPIV3Root_PathToGroupVersion(t *testing.T) {
tests := []struct {
name string
path string
expectedGV schema.GroupVersion
expectedErr bool
}{
{
name: "OpenAPI V3 Root: Path to GroupVersion apps/v1 group",
path: "apis/apps/v1",
expectedGV: schema.GroupVersion{
Group: "apps",
Version: "v1",
},
},
{
name: "Group without Version throws error",
path: "apis/apps",
expectedErr: true,
},
{
name: "OpenAPI V3 Root: Path to GroupVersion batch group",
path: "apis/batch/v1beta1",
expectedGV: schema.GroupVersion{
Group: "batch",
Version: "v1beta1",
},
},
{
name: "OpenAPI V3 Root: Path to GroupVersion core group",
path: "api/v1",
expectedGV: schema.GroupVersion{
Version: "v1",
},
},
}
for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
actualGV, err := pathToGroupVersion(test.path)
if test.expectedErr {
require.Error(t, err, "should have received error for path: %s", test.path)
} else {
require.NoError(t, err, "expected no error, got (%v)", err)
assert.Equal(t, test.expectedGV, actualGV, "expected GroupVersion (%s), got (%s)",
test.expectedGV, actualGV)
}
})
}
}

View File

@@ -43,8 +43,7 @@ var (
gitMinor string = "" // minor version, numeric possibly followed by "+"
// semantic version, derived by build scripts (see
// https://github.com/kubernetes/sig-release/blob/master/release-engineering/versioning.md#kubernetes-release-versioning
// https://kubernetes.io/releases/version-skew-policy/
// https://git.k8s.io/community/contributors/design-proposals/release/versioning.md
// for a detailed discussion of this field)
//
// TODO: This field is still called "gitVersion" for legacy

View File

@@ -481,13 +481,7 @@ func (r *Request) Body(obj interface{}) *Request {
return r
}
// Error returns any error encountered constructing the request, if any.
func (r *Request) Error() error {
return r.err
}
// URL returns the current working URL. Check the result of Error() to ensure
// that the returned URL is valid.
// URL returns the current working URL.
func (r *Request) URL() *url.URL {
p := r.pathPrefix
if r.namespaceSet && len(r.namespace) > 0 {
@@ -732,6 +726,7 @@ func (r *Request) Watch(ctx context.Context) (watch.Interface, error) {
}
resp, err := client.Do(req)
updateURLMetrics(ctx, r, resp, err)
retry.After(ctx, r, resp, err)
if err == nil && resp.StatusCode == http.StatusOK {
return r.newStreamWatcher(resp)
@@ -791,36 +786,22 @@ func (r *Request) newStreamWatcher(resp *http.Response) (watch.Interface, error)
), nil
}
// updateRequestResultMetric increments the RequestResult metric counter,
// it should be called with the (response, err) tuple from the final
// reply from the server.
func updateRequestResultMetric(ctx context.Context, req *Request, resp *http.Response, err error) {
code, host := sanitize(req, resp, err)
metrics.RequestResult.Increment(ctx, code, req.verb, host)
}
// updateRequestRetryMetric increments the RequestRetry metric counter,
// it should be called with the (response, err) tuple for each retry
// except for the final attempt.
func updateRequestRetryMetric(ctx context.Context, req *Request, resp *http.Response, err error) {
code, host := sanitize(req, resp, err)
metrics.RequestRetry.IncrementRetry(ctx, code, req.verb, host)
}
func sanitize(req *Request, resp *http.Response, err error) (string, string) {
host := "none"
// updateURLMetrics is a convenience function for pushing metrics.
// It also handles corner cases for incomplete/invalid request data.
func updateURLMetrics(ctx context.Context, req *Request, resp *http.Response, err error) {
url := "none"
if req.c.base != nil {
host = req.c.base.Host
url = req.c.base.Host
}
// Errors can be arbitrary strings. Unbound label cardinality is not suitable for a metric
// system so we just report them as `<error>`.
code := "<error>"
if resp != nil {
code = strconv.Itoa(resp.StatusCode)
if err != nil {
metrics.RequestResult.Increment(ctx, "<error>", req.verb, url)
} else {
// Metrics for failure codes
metrics.RequestResult.Increment(ctx, strconv.Itoa(resp.StatusCode), req.verb, url)
}
return code, host
}
// Stream formats and executes the request, and offers streaming of the response.
@@ -853,6 +834,7 @@ func (r *Request) Stream(ctx context.Context) (io.ReadCloser, error) {
return nil, err
}
resp, err := client.Do(req)
updateURLMetrics(ctx, r, resp, err)
retry.After(ctx, r, resp, err)
if err != nil {
// we only retry on an HTTP response with 'Retry-After' header
@@ -997,6 +979,7 @@ func (r *Request) request(ctx context.Context, fn func(*http.Request, *http.Resp
return err
}
resp, err := client.Do(req)
updateURLMetrics(ctx, r, resp, err)
// The value -1 or a value of 0 with a non-nil Body indicates that the length is unknown.
// https://pkg.go.dev/net/http#Request
if req.ContentLength >= 0 && !(req.Body != nil && req.ContentLength == 0) {

View File

@@ -31,7 +31,6 @@ import (
"reflect"
"strings"
"sync"
"sync/atomic"
"syscall"
"testing"
"time"
@@ -269,26 +268,6 @@ func TestRequestVersionedParamsFromListOptions(t *testing.T) {
}
}
func TestRequestVersionedParamsWithInvalidScheme(t *testing.T) {
parameterCodec := runtime.NewParameterCodec(runtime.NewScheme())
r := (&Request{c: &RESTClient{content: ClientContentConfig{GroupVersion: v1.SchemeGroupVersion}}})
r.VersionedParams(&v1.PodExecOptions{Stdin: false, Stdout: true},
parameterCodec)
if r.Error() == nil {
t.Errorf("should have recorded an error: %#v", r.params)
}
}
func TestRequestError(t *testing.T) {
// Invalid body, see TestRequestBody()
r := (&Request{}).Body([]string{"test"})
if r.Error() != r.err {
t.Errorf("getter should be identical to reference: %#v %#v", r.Error(), r.err)
}
}
func TestRequestURI(t *testing.T) {
r := (&Request{}).Param("foo", "a")
r.Prefix("other")
@@ -3011,7 +2990,6 @@ type withRateLimiterBackoffManagerAndMetrics struct {
metrics.ResultMetric
calculateBackoffSeq int64
calculateBackoffFn func(i int64) time.Duration
metrics.RetryMetric
invokeOrderGot []string
sleepsGot []string
@@ -3048,14 +3026,6 @@ func (lb *withRateLimiterBackoffManagerAndMetrics) Increment(ctx context.Context
}
}
func (lb *withRateLimiterBackoffManagerAndMetrics) IncrementRetry(ctx context.Context, code, _, _ string) {
// we are interested in the request context that is marked by this test
if marked, ok := ctx.Value(retryTestKey).(bool); ok && marked {
lb.invokeOrderGot = append(lb.invokeOrderGot, "RequestRetry.IncrementRetry")
lb.statusCodesGot = append(lb.statusCodesGot, code)
}
}
func (lb *withRateLimiterBackoffManagerAndMetrics) Do() {
lb.invokeOrderGot = append(lb.invokeOrderGot, "Client.Do")
}
@@ -3101,17 +3071,13 @@ func testRetryWithRateLimiterBackoffAndMetrics(t *testing.T, key string, doFunc
"Client.Do",
// it's a success, so do the following:
// count the result metric, and since it's a retry,
// count the retry metric, and then update backoff parameters.
// - call metrics and update backoff parameters
"RequestResult.Increment",
"RequestRetry.IncrementRetry",
"BackoffManager.UpdateBackoff",
}
statusCodesWant := []string{
// first attempt (A): we count the result metric only
"500",
// final attempt (B): we count the result metric, and the retry metric
"200", "200",
"200",
}
tests := []struct {
@@ -3225,13 +3191,10 @@ func testRetryWithRateLimiterBackoffAndMetrics(t *testing.T, key string, doFunc
// to override as well, and we want tests to be able to run in
// parallel then we will need to provide a way for tests to
// register/deregister their own metric inerfaces.
oldRequestResult := metrics.RequestResult
oldRequestRetry := metrics.RequestRetry
old := metrics.RequestResult
metrics.RequestResult = interceptor
metrics.RequestRetry = interceptor
defer func() {
metrics.RequestResult = oldRequestResult
metrics.RequestRetry = oldRequestRetry
metrics.RequestResult = old
}()
ctx, cancel := context.WithCancel(context.Background())
@@ -3808,266 +3771,3 @@ func TestTransportConcurrency(t *testing.T) {
})
}
}
func TestRetryableConditions(t *testing.T) {
var (
methods = map[string]func(ctx context.Context, r *Request){
"Do": func(ctx context.Context, r *Request) {
r.Do(ctx)
},
"DoRaw": func(ctx context.Context, r *Request) {
r.DoRaw(ctx)
},
"Stream": func(ctx context.Context, r *Request) {
r.Stream(ctx)
},
"Watch": func(ctx context.Context, r *Request) {
w, err := r.Watch(ctx)
if err == nil {
// we need to wait here to avoid race condition.
<-w.ResultChan()
}
},
}
alwaysRetry = map[string]bool{
"Do": true,
"DoRaw": true,
"Watch": true,
"Stream": true,
}
neverRetry = map[string]bool{
"Do": false,
"DoRaw": false,
"Watch": false,
"Stream": false,
}
alwaysRetryExceptStream = map[string]bool{
"Do": true,
"DoRaw": true,
"Watch": true,
"Stream": false,
}
)
tests := []struct {
name string
verbs []string
serverReturns responseErr
retryExpectation map[string]bool
}{
// {429, Retry-After: N} - we expect retry
{
name: "server returns {429, Retry-After}",
verbs: []string{"GET", "POST", "PUT", "DELETE", "PATCH"},
serverReturns: responseErr{response: retryAfterResponseWithCodeAndDelay(http.StatusTooManyRequests, "0"), err: nil},
retryExpectation: alwaysRetry,
},
// {5xx, Retry-After: N} - we expect retry
{
name: "server returns {503, Retry-After}",
verbs: []string{"GET", "POST", "PUT", "DELETE", "PATCH"},
serverReturns: responseErr{response: retryAfterResponseWithCodeAndDelay(http.StatusServiceUnavailable, "0"), err: nil},
retryExpectation: alwaysRetry,
},
// 5xx, but Retry-After: N is missing - no retry is expected
{
name: "server returns 5xx, but no Retry-After",
verbs: []string{"GET", "POST", "PUT", "DELETE", "PATCH"},
serverReturns: responseErr{response: &http.Response{StatusCode: http.StatusInternalServerError}, err: nil},
retryExpectation: neverRetry,
},
// 429, but Retry-After: N is missing - no retry is expected
{
name: "server returns 429 but no Retry-After",
verbs: []string{"GET", "POST", "PUT", "DELETE", "PATCH"},
serverReturns: responseErr{response: &http.Response{StatusCode: http.StatusTooManyRequests}, err: nil},
retryExpectation: neverRetry,
},
// response is nil, but error is set
{
name: "server returns connection reset error",
verbs: []string{"GET"},
serverReturns: responseErr{response: nil, err: syscall.ECONNRESET},
retryExpectation: alwaysRetryExceptStream,
},
{
name: "server returns EOF error",
verbs: []string{"GET"},
serverReturns: responseErr{response: nil, err: io.EOF},
retryExpectation: alwaysRetryExceptStream,
},
{
name: "server returns unexpected EOF error",
verbs: []string{"GET"},
serverReturns: responseErr{response: nil, err: io.ErrUnexpectedEOF},
retryExpectation: alwaysRetryExceptStream,
},
{
name: "server returns broken connection error",
verbs: []string{"GET"},
serverReturns: responseErr{response: nil, err: errors.New("http: can't write HTTP request on broken connection")},
retryExpectation: alwaysRetryExceptStream,
},
{
name: "server returns GOAWAY error",
verbs: []string{"GET"},
serverReturns: responseErr{response: nil, err: errors.New("http2: server sent GOAWAY and closed the connection")},
retryExpectation: alwaysRetryExceptStream,
},
{
name: "server returns connection reset by peer error",
verbs: []string{"GET"},
serverReturns: responseErr{response: nil, err: errors.New("connection reset by peer")},
retryExpectation: alwaysRetryExceptStream,
},
{
name: "server returns use of closed network connection error",
verbs: []string{"GET"},
serverReturns: responseErr{response: nil, err: errors.New("use of closed network connection")},
retryExpectation: alwaysRetryExceptStream,
},
// connection refused error never gets retried
{
name: "server returns connection refused error",
verbs: []string{"GET"},
serverReturns: responseErr{response: nil, err: syscall.ECONNREFUSED},
retryExpectation: neverRetry,
},
{
name: "server returns connection refused error",
verbs: []string{"POST"},
serverReturns: responseErr{response: nil, err: syscall.ECONNREFUSED},
retryExpectation: neverRetry,
},
{
name: "server returns EOF error",
verbs: []string{"POST"},
serverReturns: responseErr{response: nil, err: io.EOF},
retryExpectation: map[string]bool{
"Do": false,
"DoRaw": false,
"Watch": true, // not applicable, Watch should always be GET only
"Stream": false,
},
},
// Timeout error gets retries by watch only
{
name: "server returns net.Timeout() == true error",
verbs: []string{"GET"},
serverReturns: responseErr{response: nil, err: &net.DNSError{IsTimeout: true}},
retryExpectation: map[string]bool{
"Do": false,
"DoRaw": false,
"Watch": true,
"Stream": false,
},
},
{
name: "server returns OK, never retry",
verbs: []string{"GET", "POST", "PUT", "DELETE", "PATCH"},
serverReturns: responseErr{response: &http.Response{StatusCode: http.StatusOK}, err: nil},
retryExpectation: neverRetry,
},
{
name: "server returns {3xx, Retry-After}",
verbs: []string{"GET", "POST", "PUT", "DELETE", "PATCH"},
serverReturns: responseErr{response: &http.Response{StatusCode: http.StatusMovedPermanently, Header: http.Header{"Retry-After": []string{"0"}}}, err: nil},
retryExpectation: neverRetry,
},
}
for _, test := range tests {
for method, retryExpected := range test.retryExpectation {
fn, ok := methods[method]
if !ok {
t.Fatalf("Wrong test setup, unknown method: %s", method)
}
for _, verb := range test.verbs {
t.Run(fmt.Sprintf("%s/%s/%s", test.name, method, verb), func(t *testing.T) {
var attemptsGot int
client := clientForFunc(func(req *http.Request) (*http.Response, error) {
attemptsGot++
return test.serverReturns.response, test.serverReturns.err
})
u, _ := url.Parse("http://localhost:123" + "/apis")
req := &Request{
verb: verb,
c: &RESTClient{
base: u,
content: defaultContentConfig(),
Client: client,
},
backoff: &noSleepBackOff{},
maxRetries: 2,
retryFn: defaultRequestRetryFn,
}
fn(context.Background(), req)
if retryExpected {
if attemptsGot != 3 {
t.Errorf("Expected attempt count: %d, but got: %d", 3, attemptsGot)
}
return
}
// we don't expect retry, so we should see the first attempt only.
if attemptsGot > 1 {
t.Errorf("Expected no retry, but got %d attempts", attemptsGot)
}
})
}
}
}
}
func TestRequestConcurrencyWithRetry(t *testing.T) {
var attempts int32
client := clientForFunc(func(req *http.Request) (*http.Response, error) {
defer func() {
atomic.AddInt32(&attempts, 1)
}()
// always send a retry-after response
return &http.Response{
StatusCode: http.StatusInternalServerError,
Header: http.Header{"Retry-After": []string{"1"}},
}, nil
})
req := &Request{
verb: "POST",
c: &RESTClient{
content: defaultContentConfig(),
Client: client,
},
backoff: &noSleepBackOff{},
maxRetries: 9, // 10 attempts in total, including the first
retryFn: defaultRequestRetryFn,
}
concurrency := 20
wg := sync.WaitGroup{}
wg.Add(concurrency)
startCh := make(chan struct{})
for i := 0; i < concurrency; i++ {
go func() {
defer wg.Done()
<-startCh
req.Do(context.Background())
}()
}
close(startCh)
wg.Wait()
// we expect (concurrency*req.maxRetries+1) attempts to be recorded
expected := concurrency * (req.maxRetries + 1)
if atomic.LoadInt32(&attempts) != int32(expected) {
t.Errorf("Expected attempts: %d, but got: %d", expected, attempts)
}
}

View File

@@ -242,20 +242,8 @@ func (r *withRetry) After(ctx context.Context, request *Request, resp *http.Resp
// parameters calculated from the (response, err) tuple from
// attempt N-1, so r.retryAfter is outdated and should not be
// referred to here.
isRetry := r.retryAfter != nil
r.retryAfter = nil
// the client finishes a single request after N attempts (1..N)
// - all attempts (1..N) are counted to the rest_client_requests_total
// metric (current behavior).
// - every attempt after the first (2..N) are counted to the
// rest_client_request_retries_total metric.
updateRequestResultMetric(ctx, request, resp, err)
if isRetry {
// this is attempt 2 or later
updateRequestRetryMetric(ctx, request, resp, err)
}
if request.c.base != nil {
if err != nil {
request.backoff.UpdateBackoff(request.URL(), err, 0)
@@ -358,12 +346,8 @@ func retryAfterResponse() *http.Response {
}
func retryAfterResponseWithDelay(delay string) *http.Response {
return retryAfterResponseWithCodeAndDelay(http.StatusInternalServerError, delay)
}
func retryAfterResponseWithCodeAndDelay(code int, delay string) *http.Response {
return &http.Response{
StatusCode: code,
StatusCode: http.StatusInternalServerError,
Header: http.Header{"Retry-After": []string{delay}},
}
}

View File

@@ -571,11 +571,12 @@ func (f *DeltaFIFO) Pop(process PopProcessFunc) (interface{}, error) {
// using the Sync or Replace DeltaType and then (2) it does some deletions.
// In particular: for every pre-existing key K that is not the key of
// an object in `list` there is the effect of
// `Delete(DeletedFinalStateUnknown{K, O})` where O is the latest known
// object of K. The pre-existing keys are those in the union set of the keys in
// `f.items` and `f.knownObjects` (if not nil). The last known object for key K is
// the one present in the last delta in `f.items`. If there is no delta for K
// in `f.items`, it is the object in `f.knownObjects`
// `Delete(DeletedFinalStateUnknown{K, O})` where O is current object
// of K. If `f.knownObjects == nil` then the pre-existing keys are
// those in `f.items` and the current object of K is the `.Newest()`
// of the Deltas associated with K. Otherwise the pre-existing keys
// are those listed by `f.knownObjects` and the current object of K is
// what `f.knownObjects.GetByKey(K)` returns.
func (f *DeltaFIFO) Replace(list []interface{}, _ string) error {
f.lock.Lock()
defer f.lock.Unlock()
@@ -599,54 +600,56 @@ func (f *DeltaFIFO) Replace(list []interface{}, _ string) error {
}
}
// Do deletion detection against objects in the queue
queuedDeletions := 0
for k, oldItem := range f.items {
if keys.Has(k) {
continue
}
// Delete pre-existing items not in the new list.
// This could happen if watch deletion event was missed while
// disconnected from apiserver.
var deletedObj interface{}
if n := oldItem.Newest(); n != nil {
deletedObj = n.Object
// if the previous object is a DeletedFinalStateUnknown, we have to extract the actual Object
if d, ok := deletedObj.(DeletedFinalStateUnknown); ok {
deletedObj = d.Obj
}
}
queuedDeletions++
if err := f.queueActionLocked(Deleted, DeletedFinalStateUnknown{k, deletedObj}); err != nil {
return err
}
}
if f.knownObjects != nil {
// Detect deletions for objects not present in the queue, but present in KnownObjects
knownKeys := f.knownObjects.ListKeys()
for _, k := range knownKeys {
if f.knownObjects == nil {
// Do deletion detection against our own list.
queuedDeletions := 0
for k, oldItem := range f.items {
if keys.Has(k) {
continue
}
if len(f.items[k]) > 0 {
continue
}
deletedObj, exists, err := f.knownObjects.GetByKey(k)
if err != nil {
deletedObj = nil
klog.Errorf("Unexpected error %v during lookup of key %v, placing DeleteFinalStateUnknown marker without object", err, k)
} else if !exists {
deletedObj = nil
klog.Infof("Key %v does not exist in known objects store, placing DeleteFinalStateUnknown marker without object", k)
// Delete pre-existing items not in the new list.
// This could happen if watch deletion event was missed while
// disconnected from apiserver.
var deletedObj interface{}
if n := oldItem.Newest(); n != nil {
deletedObj = n.Object
}
queuedDeletions++
if err := f.queueActionLocked(Deleted, DeletedFinalStateUnknown{k, deletedObj}); err != nil {
return err
}
}
if !f.populated {
f.populated = true
// While there shouldn't be any queued deletions in the initial
// population of the queue, it's better to be on the safe side.
f.initialPopulationCount = keys.Len() + queuedDeletions
}
return nil
}
// Detect deletions not already in the queue.
knownKeys := f.knownObjects.ListKeys()
queuedDeletions := 0
for _, k := range knownKeys {
if keys.Has(k) {
continue
}
deletedObj, exists, err := f.knownObjects.GetByKey(k)
if err != nil {
deletedObj = nil
klog.Errorf("Unexpected error %v during lookup of key %v, placing DeleteFinalStateUnknown marker without object", err, k)
} else if !exists {
deletedObj = nil
klog.Infof("Key %v does not exist in known objects store, placing DeleteFinalStateUnknown marker without object", k)
}
queuedDeletions++
if err := f.queueActionLocked(Deleted, DeletedFinalStateUnknown{k, deletedObj}); err != nil {
return err
}
}
if !f.populated {

View File

@@ -121,130 +121,6 @@ func TestDeltaFIFO_replaceWithDeleteDeltaIn(t *testing.T) {
}
}
func TestDeltaFIFOW_ReplaceMakesDeletionsForObjectsOnlyInQueue(t *testing.T) {
obj := mkFifoObj("foo", 2)
objV2 := mkFifoObj("foo", 3)
table := []struct {
name string
operations func(f *DeltaFIFO)
expectedDeltas Deltas
}{
{
name: "Added object should be deleted on Replace",
operations: func(f *DeltaFIFO) {
f.Add(obj)
f.Replace([]interface{}{}, "0")
},
expectedDeltas: Deltas{
{Added, obj},
{Deleted, DeletedFinalStateUnknown{Key: "foo", Obj: obj}},
},
},
{
name: "Replaced object should have only a single Delete",
operations: func(f *DeltaFIFO) {
f.emitDeltaTypeReplaced = true
f.Add(obj)
f.Replace([]interface{}{obj}, "0")
f.Replace([]interface{}{}, "0")
},
expectedDeltas: Deltas{
{Added, obj},
{Replaced, obj},
{Deleted, DeletedFinalStateUnknown{Key: "foo", Obj: obj}},
},
},
{
name: "Deleted object should have only a single Delete",
operations: func(f *DeltaFIFO) {
f.Add(obj)
f.Delete(obj)
f.Replace([]interface{}{}, "0")
},
expectedDeltas: Deltas{
{Added, obj},
{Deleted, obj},
},
},
{
name: "Synced objects should have a single delete",
operations: func(f *DeltaFIFO) {
f.Add(obj)
f.Replace([]interface{}{obj}, "0")
f.Replace([]interface{}{obj}, "0")
f.Replace([]interface{}{}, "0")
},
expectedDeltas: Deltas{
{Added, obj},
{Sync, obj},
{Sync, obj},
{Deleted, DeletedFinalStateUnknown{Key: "foo", Obj: obj}},
},
},
{
name: "Added objects should have a single delete on multiple Replaces",
operations: func(f *DeltaFIFO) {
f.Add(obj)
f.Replace([]interface{}{}, "0")
f.Replace([]interface{}{}, "1")
},
expectedDeltas: Deltas{
{Added, obj},
{Deleted, DeletedFinalStateUnknown{Key: "foo", Obj: obj}},
},
},
{
name: "Added and deleted and added object should be deleted",
operations: func(f *DeltaFIFO) {
f.Add(obj)
f.Delete(obj)
f.Add(objV2)
f.Replace([]interface{}{}, "0")
},
expectedDeltas: Deltas{
{Added, obj},
{Deleted, obj},
{Added, objV2},
{Deleted, DeletedFinalStateUnknown{Key: "foo", Obj: objV2}},
},
},
}
for _, tt := range table {
tt := tt
t.Run(tt.name, func(t *testing.T) {
// Test with a DeltaFIFO with a backing KnownObjects
fWithKnownObjects := NewDeltaFIFOWithOptions(DeltaFIFOOptions{
KeyFunction: testFifoObjectKeyFunc,
KnownObjects: literalListerGetter(func() []testFifoObject {
return []testFifoObject{}
}),
})
tt.operations(fWithKnownObjects)
actualDeltasWithKnownObjects := Pop(fWithKnownObjects)
if !reflect.DeepEqual(tt.expectedDeltas, actualDeltasWithKnownObjects) {
t.Errorf("expected %#v, got %#v", tt.expectedDeltas, actualDeltasWithKnownObjects)
}
if len(fWithKnownObjects.items) != 0 {
t.Errorf("expected no extra deltas (empty map), got %#v", fWithKnownObjects.items)
}
// Test with a DeltaFIFO without a backing KnownObjects
fWithoutKnownObjects := NewDeltaFIFOWithOptions(DeltaFIFOOptions{
KeyFunction: testFifoObjectKeyFunc,
})
tt.operations(fWithoutKnownObjects)
actualDeltasWithoutKnownObjects := Pop(fWithoutKnownObjects)
if !reflect.DeepEqual(tt.expectedDeltas, actualDeltasWithoutKnownObjects) {
t.Errorf("expected %#v, got %#v", tt.expectedDeltas, actualDeltasWithoutKnownObjects)
}
if len(fWithoutKnownObjects.items) != 0 {
t.Errorf("expected no extra deltas (empty map), got %#v", fWithoutKnownObjects.items)
}
})
}
}
func TestDeltaFIFO_requeueOnPop(t *testing.T) {
f := NewDeltaFIFOWithOptions(DeltaFIFOOptions{KeyFunction: testFifoObjectKeyFunc})
@@ -495,7 +371,7 @@ func TestDeltaFIFO_ReplaceMakesDeletions(t *testing.T) {
expectedList = []Deltas{
{{Added, mkFifoObj("baz", 10)},
{Deleted, DeletedFinalStateUnknown{Key: "baz", Obj: mkFifoObj("baz", 10)}}},
{Deleted, DeletedFinalStateUnknown{Key: "baz", Obj: mkFifoObj("baz", 7)}}},
{{Sync, mkFifoObj("foo", 5)}},
// Since "bar" didn't have a delete event and wasn't in the Replace list
// it should get a tombstone key with the right Obj.
@@ -509,67 +385,6 @@ func TestDeltaFIFO_ReplaceMakesDeletions(t *testing.T) {
}
}
// Now try deleting and recreating the object in the queue, then delete it by a Replace call
f = NewDeltaFIFOWithOptions(DeltaFIFOOptions{
KeyFunction: testFifoObjectKeyFunc,
KnownObjects: literalListerGetter(func() []testFifoObject {
return []testFifoObject{mkFifoObj("foo", 5), mkFifoObj("bar", 6), mkFifoObj("baz", 7)}
}),
})
f.Delete(mkFifoObj("bar", 6))
f.Add(mkFifoObj("bar", 100))
f.Replace([]interface{}{mkFifoObj("foo", 5)}, "0")
expectedList = []Deltas{
{
{Deleted, mkFifoObj("bar", 6)},
{Added, mkFifoObj("bar", 100)},
// Since "bar" has a newer object in the queue than in the state,
// it should get a tombstone key with the latest object from the queue
{Deleted, DeletedFinalStateUnknown{Key: "bar", Obj: mkFifoObj("bar", 100)}},
},
{{Sync, mkFifoObj("foo", 5)}},
{{Deleted, DeletedFinalStateUnknown{Key: "baz", Obj: mkFifoObj("baz", 7)}}},
}
for _, expected := range expectedList {
cur := Pop(f).(Deltas)
if e, a := expected, cur; !reflect.DeepEqual(e, a) {
t.Errorf("Expected %#v, got %#v", e, a)
}
}
// Now try syncing it first to ensure the delete use the latest version
f = NewDeltaFIFOWithOptions(DeltaFIFOOptions{
KeyFunction: testFifoObjectKeyFunc,
KnownObjects: literalListerGetter(func() []testFifoObject {
return []testFifoObject{mkFifoObj("foo", 5), mkFifoObj("bar", 6), mkFifoObj("baz", 7)}
}),
})
f.Replace([]interface{}{mkFifoObj("bar", 100), mkFifoObj("foo", 5)}, "0")
f.Replace([]interface{}{mkFifoObj("foo", 5)}, "0")
expectedList = []Deltas{
{
{Sync, mkFifoObj("bar", 100)},
// Since "bar" didn't have a delete event and wasn't in the Replace list
// it should get a tombstone key with the right Obj.
{Deleted, DeletedFinalStateUnknown{Key: "bar", Obj: mkFifoObj("bar", 100)}},
},
{
{Sync, mkFifoObj("foo", 5)},
{Sync, mkFifoObj("foo", 5)},
},
{{Deleted, DeletedFinalStateUnknown{Key: "baz", Obj: mkFifoObj("baz", 7)}}},
}
for _, expected := range expectedList {
cur := Pop(f).(Deltas)
if e, a := expected, cur; !reflect.DeepEqual(e, a) {
t.Errorf("Expected %#v, got %#v", e, a)
}
}
// Now try starting without an explicit KeyListerGetter
f = NewDeltaFIFOWithOptions(DeltaFIFOOptions{KeyFunction: testFifoObjectKeyFunc})
f.Add(mkFifoObj("baz", 10))

View File

@@ -292,7 +292,7 @@ func (le *LeaderElector) release() bool {
if !le.IsLeader() {
return true
}
now := metav1.NewTime(le.clock.Now())
now := metav1.Now()
leaderElectionRecord := rl.LeaderElectionRecord{
LeaderTransitions: le.observedRecord.LeaderTransitions,
LeaseDurationSeconds: 1,
@@ -312,7 +312,7 @@ func (le *LeaderElector) release() bool {
// else it tries to renew the lease if it has already been acquired. Returns true
// on success else returns false.
func (le *LeaderElector) tryAcquireOrRenew(ctx context.Context) bool {
now := metav1.NewTime(le.clock.Now())
now := metav1.Now()
leaderElectionRecord := rl.LeaderElectionRecord{
HolderIdentity: le.config.Lock.Identity(),
LeaseDurationSeconds: int(le.config.LeaseDuration / time.Second),
@@ -344,7 +344,7 @@ func (le *LeaderElector) tryAcquireOrRenew(ctx context.Context) bool {
le.observedRawRecord = oldLeaderElectionRawRecord
}
if len(oldLeaderElectionRecord.HolderIdentity) > 0 &&
le.observedTime.Add(time.Second*time.Duration(oldLeaderElectionRecord.LeaseDurationSeconds)).After(now.Time) &&
le.observedTime.Add(le.config.LeaseDuration).After(now.Time) &&
!le.IsLeader() {
klog.V(4).Infof("lock is held by %v and has not yet expired", oldLeaderElectionRecord.HolderIdentity)
return false

View File

@@ -74,15 +74,13 @@ type Reactor struct {
}
func testTryAcquireOrRenew(t *testing.T, objectType string) {
clock := clock.RealClock{}
future := clock.Now().Add(1000 * time.Hour)
past := clock.Now().Add(-1000 * time.Hour)
future := time.Now().Add(1000 * time.Hour)
past := time.Now().Add(-1000 * time.Hour)
tests := []struct {
name string
observedRecord rl.LeaderElectionRecord
observedTime time.Time
retryAfter time.Duration
reactors []Reactor
expectedEvents []string
@@ -129,33 +127,6 @@ func testTryAcquireOrRenew(t *testing.T, objectType string) {
transitionLeader: true,
outHolder: "baz",
},
{
name: "acquire from led object with the lease duration seconds",
reactors: []Reactor{
{
verb: "get",
reaction: func(action fakeclient.Action) (handled bool, ret runtime.Object, err error) {
return true, createLockObject(t, objectType, action.GetNamespace(), action.(fakeclient.GetAction).GetName(), &rl.LeaderElectionRecord{HolderIdentity: "bing", LeaseDurationSeconds: 3}), nil
},
},
{
verb: "get",
reaction: func(action fakeclient.Action) (handled bool, ret runtime.Object, err error) {
return true, createLockObject(t, objectType, action.GetNamespace(), action.(fakeclient.GetAction).GetName(), &rl.LeaderElectionRecord{HolderIdentity: "bing", LeaseDurationSeconds: 3}), nil
},
},
{
verb: "update",
reaction: func(action fakeclient.Action) (handled bool, ret runtime.Object, err error) {
return true, action.(fakeclient.CreateAction).GetObject(), nil
},
},
},
retryAfter: 3 * time.Second,
expectSuccess: true,
transitionLeader: true,
outHolder: "baz",
},
{
name: "acquire from unled object",
reactors: []Reactor{
@@ -312,17 +283,10 @@ func testTryAcquireOrRenew(t *testing.T, objectType string) {
observedRecord: test.observedRecord,
observedRawRecord: observedRawRecord,
observedTime: test.observedTime,
clock: clock,
clock: clock.RealClock{},
}
if test.expectSuccess != le.tryAcquireOrRenew(context.Background()) {
if test.retryAfter != 0 {
time.Sleep(test.retryAfter)
if test.expectSuccess != le.tryAcquireOrRenew(context.Background()) {
t.Errorf("unexpected result of tryAcquireOrRenew: [succeeded=%v]", !test.expectSuccess)
}
} else {
t.Errorf("unexpected result of tryAcquireOrRenew: [succeeded=%v]", !test.expectSuccess)
}
t.Errorf("unexpected result of tryAcquireOrRenew: [succeeded=%v]", !test.expectSuccess)
}
le.observedRecord.AcquireTime = metav1.Time{}
@@ -414,9 +378,8 @@ func GetRawRecordOrDie(t *testing.T, objectType string, ler rl.LeaderElectionRec
}
func testTryAcquireOrRenewMultiLock(t *testing.T, objectType string) {
clock := clock.RealClock{}
future := clock.Now().Add(1000 * time.Hour)
past := clock.Now().Add(-1000 * time.Hour)
future := time.Now().Add(1000 * time.Hour)
past := time.Now().Add(-1000 * time.Hour)
primaryType, secondaryType := multiLockType(t, objectType)
tests := []struct {
name string
@@ -875,7 +838,7 @@ func testTryAcquireOrRenewMultiLock(t *testing.T, objectType string) {
observedRecord: test.observedRecord,
observedRawRecord: test.observedRawRecord,
observedTime: test.observedTime,
clock: clock,
clock: clock.RealClock{},
}
if test.expectSuccess != le.tryAcquireOrRenew(context.Background()) {
t.Errorf("unexpected result of tryAcquireOrRenew: [succeeded=%v]", !test.expectSuccess)

View File

@@ -68,7 +68,7 @@ const (
// name: '*'
// namespace: kube-system
EndpointsLeasesResourceLock = "endpointsleases"
// When using ConfigMapsLeasesResourceLock, you need to ensure that
// When using EndpointsLeasesResourceLock, you need to ensure that
// API Priority & Fairness is configured with non-default flow-schema
// that will catch the necessary operations on leader-election related
// configmap objects.

View File

@@ -58,12 +58,6 @@ type CallsMetric interface {
Increment(exitCode int, callStatus string)
}
// RetryMetric counts the number of retries sent to the server
// partitioned by code, method, and host.
type RetryMetric interface {
IncrementRetry(ctx context.Context, code string, method string, host string)
}
var (
// ClientCertExpiry is the expiry time of a client certificate
ClientCertExpiry ExpiryMetric = noopExpiry{}
@@ -82,9 +76,6 @@ var (
// ExecPluginCalls is the number of calls made to an exec plugin, partitioned by
// exit code and call status.
ExecPluginCalls CallsMetric = noopCalls{}
// RequestRetry is the retry metric that tracks the number of
// retries sent to the server.
RequestRetry RetryMetric = noopRetry{}
)
// RegisterOpts contains all the metrics to register. Metrics may be nil.
@@ -97,7 +88,6 @@ type RegisterOpts struct {
RateLimiterLatency LatencyMetric
RequestResult ResultMetric
ExecPluginCalls CallsMetric
RequestRetry RetryMetric
}
// Register registers metrics for the rest client to use. This can
@@ -128,9 +118,6 @@ func Register(opts RegisterOpts) {
if opts.ExecPluginCalls != nil {
ExecPluginCalls = opts.ExecPluginCalls
}
if opts.RequestRetry != nil {
RequestRetry = opts.RequestRetry
}
})
}
@@ -157,7 +144,3 @@ func (noopResult) Increment(context.Context, string, string, string) {}
type noopCalls struct{}
func (noopCalls) Increment(int, string) {}
type noopRetry struct{}
func (noopRetry) IncrementRetry(context.Context, string, string, string) {}

View File

@@ -17,7 +17,6 @@ limitations under the License.
package record
import (
"context"
"fmt"
"math/rand"
"time"
@@ -133,9 +132,7 @@ type EventBroadcaster interface {
// with the event source set to the given event source.
NewRecorder(scheme *runtime.Scheme, source v1.EventSource) EventRecorder
// Shutdown shuts down the broadcaster. Once the broadcaster is shut
// down, it will only try to record an event in a sink once before
// giving up on it with an error message.
// Shutdown shuts down the broadcaster
Shutdown()
}
@@ -160,34 +157,31 @@ func (a *EventRecorderAdapter) Eventf(regarding, _ runtime.Object, eventtype, re
// Creates a new event broadcaster.
func NewBroadcaster() EventBroadcaster {
return newEventBroadcaster(watch.NewLongQueueBroadcaster(maxQueuedEvents, watch.DropIfChannelFull), defaultSleepDuration)
return &eventBroadcasterImpl{
Broadcaster: watch.NewLongQueueBroadcaster(maxQueuedEvents, watch.DropIfChannelFull),
sleepDuration: defaultSleepDuration,
}
}
func NewBroadcasterForTests(sleepDuration time.Duration) EventBroadcaster {
return newEventBroadcaster(watch.NewLongQueueBroadcaster(maxQueuedEvents, watch.DropIfChannelFull), sleepDuration)
return &eventBroadcasterImpl{
Broadcaster: watch.NewLongQueueBroadcaster(maxQueuedEvents, watch.DropIfChannelFull),
sleepDuration: sleepDuration,
}
}
func NewBroadcasterWithCorrelatorOptions(options CorrelatorOptions) EventBroadcaster {
eventBroadcaster := newEventBroadcaster(watch.NewLongQueueBroadcaster(maxQueuedEvents, watch.DropIfChannelFull), defaultSleepDuration)
eventBroadcaster.options = options
return eventBroadcaster
}
func newEventBroadcaster(broadcaster *watch.Broadcaster, sleepDuration time.Duration) *eventBroadcasterImpl {
eventBroadcaster := &eventBroadcasterImpl{
Broadcaster: broadcaster,
sleepDuration: sleepDuration,
return &eventBroadcasterImpl{
Broadcaster: watch.NewLongQueueBroadcaster(maxQueuedEvents, watch.DropIfChannelFull),
sleepDuration: defaultSleepDuration,
options: options,
}
eventBroadcaster.cancelationCtx, eventBroadcaster.cancel = context.WithCancel(context.Background())
return eventBroadcaster
}
type eventBroadcasterImpl struct {
*watch.Broadcaster
sleepDuration time.Duration
options CorrelatorOptions
cancelationCtx context.Context
cancel func()
sleepDuration time.Duration
options CorrelatorOptions
}
// StartRecordingToSink starts sending events received from the specified eventBroadcaster to the given sink.
@@ -197,16 +191,15 @@ func (e *eventBroadcasterImpl) StartRecordingToSink(sink EventSink) watch.Interf
eventCorrelator := NewEventCorrelatorWithOptions(e.options)
return e.StartEventWatcher(
func(event *v1.Event) {
e.recordToSink(sink, event, eventCorrelator)
recordToSink(sink, event, eventCorrelator, e.sleepDuration)
})
}
func (e *eventBroadcasterImpl) Shutdown() {
e.Broadcaster.Shutdown()
e.cancel()
}
func (e *eventBroadcasterImpl) recordToSink(sink EventSink, event *v1.Event, eventCorrelator *EventCorrelator) {
func recordToSink(sink EventSink, event *v1.Event, eventCorrelator *EventCorrelator, sleepDuration time.Duration) {
// Make a copy before modification, because there could be multiple listeners.
// Events are safe to copy like this.
eventCopy := *event
@@ -228,18 +221,12 @@ func (e *eventBroadcasterImpl) recordToSink(sink EventSink, event *v1.Event, eve
klog.Errorf("Unable to write event '%#v' (retry limit exceeded!)", event)
break
}
// Randomize the first sleep so that various clients won't all be
// synced up if the master goes down.
delay := e.sleepDuration
if tries == 1 {
delay = time.Duration(float64(delay) * rand.Float64())
}
select {
case <-e.cancelationCtx.Done():
klog.Errorf("Unable to write event '%#v' (broadcaster is shut down)", event)
return
case <-time.After(delay):
time.Sleep(time.Duration(float64(sleepDuration) * rand.Float64()))
} else {
time.Sleep(sleepDuration)
}
}
}

View File

@@ -17,7 +17,6 @@ limitations under the License.
package record
import (
"context"
"encoding/json"
"fmt"
"net/http"
@@ -453,12 +452,7 @@ func TestWriteEventError(t *testing.T) {
},
}
ev := &v1.Event{}
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
e := eventBroadcasterImpl{
cancelationCtx: ctx,
}
e.recordToSink(sink, ev, eventCorrelator)
recordToSink(sink, ev, eventCorrelator, 0)
if attempts != ent.attemptsWanted {
t.Errorf("case %v: wanted %d, got %d attempts", caseName, ent.attemptsWanted, attempts)
}
@@ -488,12 +482,7 @@ func TestUpdateExpiredEvent(t *testing.T) {
ev := &v1.Event{}
ev.ResourceVersion = "updated-resource-version"
ev.Count = 2
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
e := eventBroadcasterImpl{
cancelationCtx: ctx,
}
e.recordToSink(sink, ev, eventCorrelator)
recordToSink(sink, ev, eventCorrelator, 0)
if createdEvent == nil {
t.Error("Event did not get created after patch failed")
@@ -505,33 +494,6 @@ func TestUpdateExpiredEvent(t *testing.T) {
}
}
func TestCancelEvent(t *testing.T) {
clock := testclocks.SimpleIntervalClock{Time: time.Now(), Duration: time.Second}
eventCorrelator := NewEventCorrelator(&clock)
attempts := 0
sink := &testEventSink{
OnCreate: func(event *v1.Event) (*v1.Event, error) {
attempts++
return nil, &errors.UnexpectedObjectError{}
},
}
ev := &v1.Event{}
// Cancel before even calling recordToSink.
ctx, cancel := context.WithCancel(context.Background())
cancel()
e := eventBroadcasterImpl{
cancelationCtx: ctx,
sleepDuration: time.Second,
}
e.recordToSink(sink, ev, eventCorrelator)
if attempts != 1 {
t.Errorf("recordToSink should have tried once, then given up immediately. Instead it tried %d times.", attempts)
}
}
func TestLotsOfEvents(t *testing.T) {
recorderCalled := make(chan struct{})
loggerCalled := make(chan struct{})

View File

@@ -41,31 +41,20 @@ func objectString(object runtime.Object, includeObject bool) string {
)
}
func annotationsString(annotations map[string]string) string {
if len(annotations) == 0 {
return ""
} else {
return " " + fmt.Sprint(annotations)
}
}
func (f *FakeRecorder) writeEvent(object runtime.Object, annotations map[string]string, eventtype, reason, messageFmt string, args ...interface{}) {
if f.Events != nil {
f.Events <- fmt.Sprintf(eventtype+" "+reason+" "+messageFmt, args...) +
objectString(object, f.IncludeObject) + annotationsString(annotations)
}
}
func (f *FakeRecorder) Event(object runtime.Object, eventtype, reason, message string) {
f.writeEvent(object, nil, eventtype, reason, "%s", message)
if f.Events != nil {
f.Events <- fmt.Sprintf("%s %s %s%s", eventtype, reason, message, objectString(object, f.IncludeObject))
}
}
func (f *FakeRecorder) Eventf(object runtime.Object, eventtype, reason, messageFmt string, args ...interface{}) {
f.writeEvent(object, nil, eventtype, reason, messageFmt, args...)
if f.Events != nil {
f.Events <- fmt.Sprintf(eventtype+" "+reason+" "+messageFmt, args...) + objectString(object, f.IncludeObject)
}
}
func (f *FakeRecorder) AnnotatedEventf(object runtime.Object, annotations map[string]string, eventtype, reason, messageFmt string, args ...interface{}) {
f.writeEvent(object, annotations, eventtype, reason, messageFmt, args...)
f.Eventf(object, eventtype, reason, messageFmt, args...)
}
// NewFakeRecorder creates new fake event recorder with event channel with

View File

@@ -109,7 +109,7 @@ func (c *tlsTransportCache) get(config *Config) (http.RoundTripper, error) {
// If we use are reloading files, we need to handle certificate rotation properly
// TODO(jackkleeman): We can also add rotation here when config.HasCertCallback() is true
if config.TLS.ReloadTLSFiles && tlsConfig != nil && tlsConfig.GetClientCertificate != nil {
if config.TLS.ReloadTLSFiles {
dynamicCertDialer := certRotatingDialer(tlsConfig.GetClientCertificate, dial)
tlsConfig.GetClientCertificate = dynamicCertDialer.GetClientCertificate
dial = dynamicCertDialer.connDialer.DialContext

View File

@@ -191,7 +191,7 @@ func GenerateSelfSignedCertKeyWithFixtures(host string, alternateIPs []net.IP, a
if err := os.WriteFile(certFixturePath, certBuffer.Bytes(), 0644); err != nil {
return nil, nil, fmt.Errorf("failed to write cert fixture to %s: %v", certFixturePath, err)
}
if err := os.WriteFile(keyFixturePath, keyBuffer.Bytes(), 0600); err != nil {
if err := os.WriteFile(keyFixturePath, keyBuffer.Bytes(), 0644); err != nil {
return nil, nil, fmt.Errorf("failed to write key fixture to %s: %v", certFixturePath, err)
}
}