mirror of
https://github.com/kubernetes/client-go.git
synced 2026-06-12 20:26:33 +00:00
Compare commits
49 Commits
v0.29.8
...
v0.30.0-al
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
7f4bc0ee62 | ||
|
|
8c4efe8d07 | ||
|
|
e88f4481f2 | ||
|
|
7087b65a74 | ||
|
|
bb04dc47e3 | ||
|
|
306b201a2d | ||
|
|
d23a110967 | ||
|
|
dc532e7975 | ||
|
|
5fb8d886b5 | ||
|
|
790cfadf62 | ||
|
|
76174b8af8 | ||
|
|
3d92ad924f | ||
|
|
8092c71d36 | ||
|
|
2231ff5ae4 | ||
|
|
dc967a1ca9 | ||
|
|
89528c43be | ||
|
|
17b5405ddb | ||
|
|
657d7be98b | ||
|
|
202c415847 | ||
|
|
ffe7bf60eb | ||
|
|
7e651e598d | ||
|
|
b2c0677f40 | ||
|
|
eab7383fd8 | ||
|
|
aa81cb55f0 | ||
|
|
b13c4f4b00 | ||
|
|
fb1e77b992 | ||
|
|
a3cbf5a7be | ||
|
|
b0cf21f3e8 | ||
|
|
e8a81a3a43 | ||
|
|
49ba51431a | ||
|
|
9f8ed7bc90 | ||
|
|
ca4f3a73f7 | ||
|
|
5a0a424792 | ||
|
|
4106282f90 | ||
|
|
96e9c8d6f1 | ||
|
|
abce78fd54 | ||
|
|
0a514be22c | ||
|
|
c609c97b33 | ||
|
|
12b0e099db | ||
|
|
e3d7e06bd6 | ||
|
|
feecac4b44 | ||
|
|
9434e7539b | ||
|
|
8468c261bc | ||
|
|
2a48f1ee02 | ||
|
|
3c7c00d2d6 | ||
|
|
e9d1484a8e | ||
|
|
84a6fe7e40 | ||
|
|
7f07a956f8 | ||
|
|
785e19661f |
50
README.md
50
README.md
@@ -75,14 +75,14 @@ We will backport bugfixes--but not new features--into older versions of
|
||||
|
||||
#### Compatibility matrix
|
||||
|
||||
| | Kubernetes 1.15 | Kubernetes 1.16 | Kubernetes 1.17 | Kubernetes 1.18 | Kubernetes 1.19 | Kubernetes 1.20 |
|
||||
|-------------------------------|-----------------|-----------------|-----------------|-----------------|-----------------|-----------------|
|
||||
| `kubernetes-1.15.0` | ✓ | +- | +- | +- | +- | +- |
|
||||
| `kubernetes-1.16.0` | +- | ✓ | +- | +- | +- | +- |
|
||||
| `kubernetes-1.17.0`/`v0.17.0` | +- | +- | ✓ | +- | +- | +- |
|
||||
| `kubernetes-1.18.0`/`v0.18.0` | +- | +- | +- | ✓ | +- | +- |
|
||||
| `kubernetes-1.19.0`/`v0.19.0` | +- | +- | +- | +- | ✓ | +- |
|
||||
| `kubernetes-1.20.0`/`v0.20.0` | +- | +- | +- | +- | +- | ✓ |
|
||||
| | Kubernetes 1.23 | Kubernetes 1.24 | Kubernetes 1.25 | Kubernetes 1.26 | Kubernetes 1.27 | Kubernetes 1.28 |
|
||||
| ----------------------------- | --------------- | --------------- | --------------- | --------------- | --------------- | --------------- |
|
||||
| `kubernetes-1.23.0`/`v0.23.0` | ✓ | +- | +- | +- | +- | +- |
|
||||
| `kubernetes-1.24.0`/`v0.24.0` | +- | ✓ | +- | +- | +- | +- |
|
||||
| `kubernetes-1.25.0`/`v0.25.0` | +- | +- | ✓ | +- | +- | +- |
|
||||
| `kubernetes-1.26.0`/`v0.26.0` | +- | +- | +- | ✓ | +- | +- |
|
||||
| `kubernetes-1.27.0`/`v0.27.0` | +- | +- | +- | +- | ✓ | +- |
|
||||
| `kubernetes-1.28.0`/`v0.28.0` | +- | +- | +- | +- | +- | ✓ |
|
||||
| `HEAD` | +- | +- | +- | +- | +- | +- |
|
||||
|
||||
Key:
|
||||
@@ -102,27 +102,19 @@ Key:
|
||||
See the [CHANGELOG](./CHANGELOG.md) for a detailed description of changes
|
||||
between client-go versions.
|
||||
|
||||
| Branch | Canonical source code location | Maintenance status |
|
||||
|----------------|--------------------------------------|-------------------------------|
|
||||
| `release-1.4` | Kubernetes main repo, 1.4 branch | = - |
|
||||
| `release-1.5` | Kubernetes main repo, 1.5 branch | = - |
|
||||
| `release-2.0` | Kubernetes main repo, 1.5 branch | = - |
|
||||
| `release-3.0` | Kubernetes main repo, 1.6 branch | = - |
|
||||
| `release-4.0` | Kubernetes main repo, 1.7 branch | = - |
|
||||
| `release-5.0` | Kubernetes main repo, 1.8 branch | = - |
|
||||
| `release-6.0` | Kubernetes main repo, 1.9 branch | = - |
|
||||
| `release-7.0` | Kubernetes main repo, 1.10 branch | = - |
|
||||
| `release-8.0` | Kubernetes main repo, 1.11 branch | =- |
|
||||
| `release-9.0` | Kubernetes main repo, 1.12 branch | =- |
|
||||
| `release-10.0` | Kubernetes main repo, 1.13 branch | =- |
|
||||
| `release-11.0` | Kubernetes main repo, 1.14 branch | =- |
|
||||
| `release-12.0` | Kubernetes main repo, 1.15 branch | =- |
|
||||
| `release-13.0` | Kubernetes main repo, 1.16 branch | =- |
|
||||
| `release-14.0` | Kubernetes main repo, 1.17 branch | ✓ |
|
||||
| `release-1.18` | Kubernetes main repo, 1.18 branch | ✓ |
|
||||
| `release-1.19` | Kubernetes main repo, 1.19 branch | ✓ |
|
||||
| `release-1.20` | Kubernetes main repo, 1.20 branch | ✓ |
|
||||
| client-go HEAD | Kubernetes main repo, master branch | ✓ |
|
||||
| Branch | Canonical source code location | Maintenance status |
|
||||
| -------------- | ----------------------------------- | ------------------ |
|
||||
| `release-1.19` | Kubernetes main repo, 1.19 branch | =- |
|
||||
| `release-1.20` | Kubernetes main repo, 1.20 branch | =- |
|
||||
| `release-1.21` | Kubernetes main repo, 1.21 branch | =- |
|
||||
| `release-1.22` | Kubernetes main repo, 1.22 branch | =- |
|
||||
| `release-1.23` | Kubernetes main repo, 1.23 branch | =- |
|
||||
| `release-1.24` | Kubernetes main repo, 1.24 branch | =- |
|
||||
| `release-1.25` | Kubernetes main repo, 1.25 branch | ✓ |
|
||||
| `release-1.26` | Kubernetes main repo, 1.26 branch | ✓ |
|
||||
| `release-1.27` | Kubernetes main repo, 1.27 branch | ✓ |
|
||||
| `release-1.28` | Kubernetes main repo, 1.28 branch | ✓ |
|
||||
| client-go HEAD | Kubernetes main repo, master branch | ✓ |
|
||||
|
||||
Key:
|
||||
|
||||
|
||||
@@ -148,4 +148,4 @@ reconciliation code that performs a "read/modify-in-place/update" (or patch) wor
|
||||
// apply
|
||||
applied, err := deploymentClient.Apply(ctx, extractedDeployment, metav1.ApplyOptions{FieldManager: fieldMgr})
|
||||
*/
|
||||
package applyconfigurations
|
||||
package applyconfigurations // import "k8s.io/client-go/applyconfigurations"
|
||||
|
||||
2
discovery/testdata/apis/batch/v1.json
vendored
2
discovery/testdata/apis/batch/v1.json
vendored
File diff suppressed because one or more lines are too long
2
discovery/testdata/apis/batch/v1beta1.json
vendored
2
discovery/testdata/apis/batch/v1beta1.json
vendored
File diff suppressed because one or more lines are too long
@@ -17,4 +17,4 @@ limitations under the License.
|
||||
// Package fakeclient contains examples on how to use fakeclient in tests.
|
||||
// Note: This file is here to avoid warnings on go build since there are no
|
||||
// non-test files in this package.
|
||||
package fakeclient
|
||||
package fakeclient // import "k8s.io/client-go/examples/fake-client"
|
||||
|
||||
138
features/envvar.go
Normal file
138
features/envvar.go
Normal file
@@ -0,0 +1,138 @@
|
||||
/*
|
||||
Copyright 2024 The Kubernetes Authors.
|
||||
|
||||
Licensed under the Apache License, Version 2.0 (the "License");
|
||||
you may not use this file except in compliance with the License.
|
||||
You may obtain a copy of the License at
|
||||
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
|
||||
Unless required by applicable law or agreed to in writing, software
|
||||
distributed under the License is distributed on an "AS IS" BASIS,
|
||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
*/
|
||||
|
||||
package features
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"os"
|
||||
"strconv"
|
||||
"sync"
|
||||
"sync/atomic"
|
||||
|
||||
"k8s.io/apimachinery/pkg/util/naming"
|
||||
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
|
||||
"k8s.io/klog/v2"
|
||||
)
|
||||
|
||||
// internalPackages are packages that ignored when creating a name for featureGates. These packages are in the common
|
||||
// call chains, so they'd be unhelpful as names.
|
||||
var internalPackages = []string{"k8s.io/client-go/features/envvar.go"}
|
||||
|
||||
var _ Gates = &envVarFeatureGates{}
|
||||
|
||||
// newEnvVarFeatureGates creates a feature gate that allows for registration
|
||||
// of features and checking if the features are enabled.
|
||||
//
|
||||
// On the first call to Enabled, the effective state of all known features is loaded from
|
||||
// environment variables. The environment variable read for a given feature is formed by
|
||||
// concatenating the prefix "KUBE_FEATURE_" with the feature's name.
|
||||
//
|
||||
// For example, if you have a feature named "MyFeature"
|
||||
// setting an environmental variable "KUBE_FEATURE_MyFeature"
|
||||
// will allow you to configure the state of that feature.
|
||||
//
|
||||
// Please note that environmental variables can only be set to the boolean value.
|
||||
// Incorrect values will be ignored and logged.
|
||||
func newEnvVarFeatureGates(features map[Feature]FeatureSpec) *envVarFeatureGates {
|
||||
known := map[Feature]FeatureSpec{}
|
||||
for name, spec := range features {
|
||||
known[name] = spec
|
||||
}
|
||||
|
||||
fg := &envVarFeatureGates{
|
||||
callSiteName: naming.GetNameFromCallsite(internalPackages...),
|
||||
known: known,
|
||||
}
|
||||
fg.enabled.Store(map[Feature]bool{})
|
||||
|
||||
return fg
|
||||
}
|
||||
|
||||
// envVarFeatureGates implements Gates and allows for feature registration.
|
||||
type envVarFeatureGates struct {
|
||||
// callSiteName holds the name of the file
|
||||
// that created this instance
|
||||
callSiteName string
|
||||
|
||||
// readEnvVarsOnce guards reading environmental variables
|
||||
readEnvVarsOnce sync.Once
|
||||
|
||||
// known holds known feature gates
|
||||
known map[Feature]FeatureSpec
|
||||
|
||||
// enabled holds a map[Feature]bool
|
||||
// with values explicitly set via env var
|
||||
enabled atomic.Value
|
||||
|
||||
// readEnvVars holds the boolean value which
|
||||
// indicates whether readEnvVarsOnce has been called.
|
||||
readEnvVars atomic.Bool
|
||||
}
|
||||
|
||||
// Enabled returns true if the key is enabled. If the key is not known, this call will panic.
|
||||
func (f *envVarFeatureGates) Enabled(key Feature) bool {
|
||||
if v, ok := f.getEnabledMapFromEnvVar()[key]; ok {
|
||||
return v
|
||||
}
|
||||
if v, ok := f.known[key]; ok {
|
||||
return v.Default
|
||||
}
|
||||
panic(fmt.Errorf("feature %q is not registered in FeatureGates %q", key, f.callSiteName))
|
||||
}
|
||||
|
||||
// getEnabledMapFromEnvVar will fill the enabled map on the first call.
|
||||
// This is the only time a known feature can be set to a value
|
||||
// read from the corresponding environmental variable.
|
||||
func (f *envVarFeatureGates) getEnabledMapFromEnvVar() map[Feature]bool {
|
||||
f.readEnvVarsOnce.Do(func() {
|
||||
featureGatesState := map[Feature]bool{}
|
||||
for feature, featureSpec := range f.known {
|
||||
featureState, featureStateSet := os.LookupEnv(fmt.Sprintf("KUBE_FEATURE_%s", feature))
|
||||
if !featureStateSet {
|
||||
continue
|
||||
}
|
||||
boolVal, boolErr := strconv.ParseBool(featureState)
|
||||
switch {
|
||||
case boolErr != nil:
|
||||
utilruntime.HandleError(fmt.Errorf("cannot set feature gate %q to %q, due to %v", feature, featureState, boolErr))
|
||||
case featureSpec.LockToDefault:
|
||||
if boolVal != featureSpec.Default {
|
||||
utilruntime.HandleError(fmt.Errorf("cannot set feature gate %q to %q, feature is locked to %v", feature, featureState, featureSpec.Default))
|
||||
break
|
||||
}
|
||||
featureGatesState[feature] = featureSpec.Default
|
||||
default:
|
||||
featureGatesState[feature] = boolVal
|
||||
}
|
||||
}
|
||||
f.enabled.Store(featureGatesState)
|
||||
f.readEnvVars.Store(true)
|
||||
|
||||
for feature, featureSpec := range f.known {
|
||||
if featureState, ok := featureGatesState[feature]; ok {
|
||||
klog.V(1).InfoS("Feature gate updated state", "feature", feature, "enabled", featureState)
|
||||
continue
|
||||
}
|
||||
klog.V(1).InfoS("Feature gate default state", "feature", feature, "enabled", featureSpec.Default)
|
||||
}
|
||||
})
|
||||
return f.enabled.Load().(map[Feature]bool)
|
||||
}
|
||||
|
||||
func (f *envVarFeatureGates) hasAlreadyReadEnvVar() bool {
|
||||
return f.readEnvVars.Load()
|
||||
}
|
||||
156
features/envvar_test.go
Normal file
156
features/envvar_test.go
Normal file
@@ -0,0 +1,156 @@
|
||||
/*
|
||||
Copyright 2024 The Kubernetes Authors.
|
||||
|
||||
Licensed under the Apache License, Version 2.0 (the "License");
|
||||
you may not use this file except in compliance with the License.
|
||||
You may obtain a copy of the License at
|
||||
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
|
||||
Unless required by applicable law or agreed to in writing, software
|
||||
distributed under the License is distributed on an "AS IS" BASIS,
|
||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
*/
|
||||
|
||||
package features
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"testing"
|
||||
|
||||
"github.com/stretchr/testify/require"
|
||||
)
|
||||
|
||||
func TestEnvVarFeatureGates(t *testing.T) {
|
||||
defaultTestFeatures := map[Feature]FeatureSpec{
|
||||
"TestAlpha": {
|
||||
Default: false,
|
||||
LockToDefault: false,
|
||||
PreRelease: "Alpha",
|
||||
},
|
||||
"TestBeta": {
|
||||
Default: true,
|
||||
LockToDefault: false,
|
||||
PreRelease: "Beta",
|
||||
},
|
||||
}
|
||||
expectedDefaultFeaturesState := map[Feature]bool{"TestAlpha": false, "TestBeta": true}
|
||||
|
||||
copyExpectedStateMap := func(toCopy map[Feature]bool) map[Feature]bool {
|
||||
m := map[Feature]bool{}
|
||||
for k, v := range toCopy {
|
||||
m[k] = v
|
||||
}
|
||||
return m
|
||||
}
|
||||
|
||||
scenarios := []struct {
|
||||
name string
|
||||
features map[Feature]FeatureSpec
|
||||
envVariables map[string]string
|
||||
expectedFeaturesState map[Feature]bool
|
||||
expectedInternalEnabledFeatureState map[Feature]bool
|
||||
}{
|
||||
{
|
||||
name: "can add empty features",
|
||||
},
|
||||
{
|
||||
name: "no env var, features get Defaults assigned",
|
||||
features: defaultTestFeatures,
|
||||
expectedFeaturesState: expectedDefaultFeaturesState,
|
||||
},
|
||||
{
|
||||
name: "incorrect env var, feature gets Default assigned",
|
||||
features: defaultTestFeatures,
|
||||
envVariables: map[string]string{"TestAlpha": "true"},
|
||||
expectedFeaturesState: expectedDefaultFeaturesState,
|
||||
},
|
||||
{
|
||||
name: "correct env var changes the feature gets state",
|
||||
features: defaultTestFeatures,
|
||||
envVariables: map[string]string{"KUBE_FEATURE_TestAlpha": "true"},
|
||||
expectedFeaturesState: func() map[Feature]bool {
|
||||
expectedDefaultFeaturesStateCopy := copyExpectedStateMap(expectedDefaultFeaturesState)
|
||||
expectedDefaultFeaturesStateCopy["TestAlpha"] = true
|
||||
return expectedDefaultFeaturesStateCopy
|
||||
}(),
|
||||
expectedInternalEnabledFeatureState: map[Feature]bool{"TestAlpha": true},
|
||||
},
|
||||
{
|
||||
name: "incorrect env var value gets ignored",
|
||||
features: defaultTestFeatures,
|
||||
envVariables: map[string]string{"KUBE_FEATURE_TestAlpha": "TrueFalse"},
|
||||
expectedFeaturesState: expectedDefaultFeaturesState,
|
||||
},
|
||||
{
|
||||
name: "empty env var value gets ignored",
|
||||
features: defaultTestFeatures,
|
||||
envVariables: map[string]string{"KUBE_FEATURE_TestAlpha": ""},
|
||||
expectedFeaturesState: expectedDefaultFeaturesState,
|
||||
},
|
||||
{
|
||||
name: "a feature LockToDefault wins",
|
||||
features: map[Feature]FeatureSpec{
|
||||
"TestAlpha": {
|
||||
Default: true,
|
||||
LockToDefault: true,
|
||||
PreRelease: "Alpha",
|
||||
},
|
||||
},
|
||||
envVariables: map[string]string{"KUBE_FEATURE_TestAlpha": "False"},
|
||||
expectedFeaturesState: map[Feature]bool{"TestAlpha": true},
|
||||
},
|
||||
{
|
||||
name: "setting a feature to LockToDefault changes the internal state",
|
||||
features: map[Feature]FeatureSpec{
|
||||
"TestAlpha": {
|
||||
Default: true,
|
||||
LockToDefault: true,
|
||||
PreRelease: "Alpha",
|
||||
},
|
||||
},
|
||||
envVariables: map[string]string{"KUBE_FEATURE_TestAlpha": "True"},
|
||||
expectedFeaturesState: map[Feature]bool{"TestAlpha": true},
|
||||
expectedInternalEnabledFeatureState: map[Feature]bool{"TestAlpha": true},
|
||||
},
|
||||
}
|
||||
for _, scenario := range scenarios {
|
||||
t.Run(scenario.name, func(t *testing.T) {
|
||||
for k, v := range scenario.envVariables {
|
||||
t.Setenv(k, v)
|
||||
}
|
||||
target := newEnvVarFeatureGates(scenario.features)
|
||||
|
||||
for expectedFeature, expectedValue := range scenario.expectedFeaturesState {
|
||||
actualValue := target.Enabled(expectedFeature)
|
||||
require.Equal(t, actualValue, expectedValue, "expected feature=%v, to be=%v, not=%v", expectedFeature, expectedValue, actualValue)
|
||||
}
|
||||
|
||||
enabledInternalMap := target.enabled.Load().(map[Feature]bool)
|
||||
require.Len(t, enabledInternalMap, len(scenario.expectedInternalEnabledFeatureState))
|
||||
|
||||
for expectedFeature, expectedInternalPresence := range scenario.expectedInternalEnabledFeatureState {
|
||||
featureInternalValue, featureSet := enabledInternalMap[expectedFeature]
|
||||
require.Equal(t, expectedInternalPresence, featureSet, "feature %v present = %v, expected = %v", expectedFeature, featureSet, expectedInternalPresence)
|
||||
|
||||
expectedFeatureInternalValue := scenario.expectedFeaturesState[expectedFeature]
|
||||
require.Equal(t, expectedFeatureInternalValue, featureInternalValue)
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
func TestEnvVarFeatureGatesEnabledPanic(t *testing.T) {
|
||||
target := newEnvVarFeatureGates(nil)
|
||||
require.PanicsWithError(t, fmt.Errorf("feature %q is not registered in FeatureGates %q", "UnknownFeature", target.callSiteName).Error(), func() { target.Enabled("UnknownFeature") })
|
||||
}
|
||||
|
||||
func TestHasAlreadyReadEnvVar(t *testing.T) {
|
||||
target := newEnvVarFeatureGates(nil)
|
||||
require.False(t, target.hasAlreadyReadEnvVar())
|
||||
|
||||
_ = target.getEnabledMapFromEnvVar()
|
||||
require.True(t, target.hasAlreadyReadEnvVar())
|
||||
}
|
||||
143
features/features.go
Normal file
143
features/features.go
Normal file
@@ -0,0 +1,143 @@
|
||||
/*
|
||||
Copyright 2024 The Kubernetes Authors.
|
||||
|
||||
Licensed under the Apache License, Version 2.0 (the "License");
|
||||
you may not use this file except in compliance with the License.
|
||||
You may obtain a copy of the License at
|
||||
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
|
||||
Unless required by applicable law or agreed to in writing, software
|
||||
distributed under the License is distributed on an "AS IS" BASIS,
|
||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
*/
|
||||
|
||||
package features
|
||||
|
||||
import (
|
||||
"errors"
|
||||
|
||||
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
|
||||
"sync/atomic"
|
||||
)
|
||||
|
||||
// NOTE: types Feature, FeatureSpec, prerelease (and its values)
|
||||
// were duplicated from the component-base repository
|
||||
//
|
||||
// for more information please refer to https://docs.google.com/document/d/1g9BGCRw-7ucUxO6OtCWbb3lfzUGA_uU9178wLdXAIfs
|
||||
|
||||
const (
|
||||
// Values for PreRelease.
|
||||
Alpha = prerelease("ALPHA")
|
||||
Beta = prerelease("BETA")
|
||||
GA = prerelease("")
|
||||
|
||||
// Deprecated
|
||||
Deprecated = prerelease("DEPRECATED")
|
||||
)
|
||||
|
||||
type prerelease string
|
||||
|
||||
type Feature string
|
||||
|
||||
type FeatureSpec struct {
|
||||
// Default is the default enablement state for the feature
|
||||
Default bool
|
||||
// LockToDefault indicates that the feature is locked to its default and cannot be changed
|
||||
LockToDefault bool
|
||||
// PreRelease indicates the maturity level of the feature
|
||||
PreRelease prerelease
|
||||
}
|
||||
|
||||
// Gates indicates whether a given feature is enabled or not.
|
||||
type Gates interface {
|
||||
// Enabled returns true if the key is enabled.
|
||||
Enabled(key Feature) bool
|
||||
}
|
||||
|
||||
// Registry represents an external feature gates registry.
|
||||
type Registry interface {
|
||||
// Add adds existing feature gates to the provided registry.
|
||||
//
|
||||
// As of today, this method is used by AddFeaturesToExistingFeatureGates and
|
||||
// ReplaceFeatureGates to take control of the features exposed by this library.
|
||||
Add(map[Feature]FeatureSpec) error
|
||||
}
|
||||
|
||||
// FeatureGates returns the feature gates exposed by this library.
|
||||
//
|
||||
// By default, only the default features gates will be returned.
|
||||
// The default implementation allows controlling the features
|
||||
// via environmental variables.
|
||||
// For example, if you have a feature named "MyFeature"
|
||||
// setting an environmental variable "KUBE_FEATURE_MyFeature"
|
||||
// will allow you to configure the state of that feature.
|
||||
//
|
||||
// Please note that the actual set of the feature gates
|
||||
// might be overwritten by calling ReplaceFeatureGates method.
|
||||
func FeatureGates() Gates {
|
||||
return featureGates.Load().(*featureGatesWrapper).Gates
|
||||
}
|
||||
|
||||
// AddFeaturesToExistingFeatureGates adds the default feature gates to the provided registry.
|
||||
// Usually this function is combined with ReplaceFeatureGates to take control of the
|
||||
// features exposed by this library.
|
||||
func AddFeaturesToExistingFeatureGates(registry Registry) error {
|
||||
return registry.Add(defaultKubernetesFeatureGates)
|
||||
}
|
||||
|
||||
// ReplaceFeatureGates overwrites the default implementation of the feature gates
|
||||
// used by this library.
|
||||
//
|
||||
// Useful for binaries that would like to have full control of the features
|
||||
// exposed by this library, such as allowing consumers of a binary
|
||||
// to interact with the features via a command line flag.
|
||||
//
|
||||
// For example:
|
||||
//
|
||||
// // first, register client-go's features to your registry.
|
||||
// clientgofeaturegate.AddFeaturesToExistingFeatureGates(utilfeature.DefaultMutableFeatureGate)
|
||||
// // then replace client-go's feature gates implementation with your implementation
|
||||
// clientgofeaturegate.ReplaceFeatureGates(utilfeature.DefaultMutableFeatureGate)
|
||||
func ReplaceFeatureGates(newFeatureGates Gates) {
|
||||
if replaceFeatureGatesWithWarningIndicator(newFeatureGates) {
|
||||
utilruntime.HandleError(errors.New("the default feature gates implementation has already been used and now it's being overwritten. This might lead to unexpected behaviour. Check your initialization order"))
|
||||
}
|
||||
}
|
||||
|
||||
func replaceFeatureGatesWithWarningIndicator(newFeatureGates Gates) bool {
|
||||
shouldProduceWarning := false
|
||||
|
||||
if defaultFeatureGates, ok := FeatureGates().(*envVarFeatureGates); ok {
|
||||
if defaultFeatureGates.hasAlreadyReadEnvVar() {
|
||||
shouldProduceWarning = true
|
||||
}
|
||||
}
|
||||
wrappedFeatureGates := &featureGatesWrapper{newFeatureGates}
|
||||
featureGates.Store(wrappedFeatureGates)
|
||||
|
||||
return shouldProduceWarning
|
||||
}
|
||||
|
||||
func init() {
|
||||
envVarGates := newEnvVarFeatureGates(defaultKubernetesFeatureGates)
|
||||
|
||||
wrappedFeatureGates := &featureGatesWrapper{envVarGates}
|
||||
featureGates.Store(wrappedFeatureGates)
|
||||
}
|
||||
|
||||
// featureGatesWrapper a thin wrapper to satisfy featureGates variable (atomic.Value).
|
||||
// That is, all calls to Store for a given Value must use values of the same concrete type.
|
||||
type featureGatesWrapper struct {
|
||||
Gates
|
||||
}
|
||||
|
||||
var (
|
||||
// featureGates is a shared global FeatureGates.
|
||||
//
|
||||
// Top-level commands/options setup that needs to modify this feature gates
|
||||
// should use AddFeaturesToExistingFeatureGates followed by ReplaceFeatureGates.
|
||||
featureGates = &atomic.Value{}
|
||||
)
|
||||
40
features/features_test.go
Normal file
40
features/features_test.go
Normal file
@@ -0,0 +1,40 @@
|
||||
/*
|
||||
Copyright 2024 The Kubernetes Authors.
|
||||
|
||||
Licensed under the Apache License, Version 2.0 (the "License");
|
||||
you may not use this file except in compliance with the License.
|
||||
You may obtain a copy of the License at
|
||||
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
|
||||
Unless required by applicable law or agreed to in writing, software
|
||||
distributed under the License is distributed on an "AS IS" BASIS,
|
||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
*/
|
||||
|
||||
package features
|
||||
|
||||
import (
|
||||
"testing"
|
||||
|
||||
"github.com/stretchr/testify/require"
|
||||
)
|
||||
|
||||
// TestAddFeaturesToExistingFeatureGates ensures that
|
||||
// the defaultKubernetesFeatureGates are added to a test feature gates registry.
|
||||
func TestAddFeaturesToExistingFeatureGates(t *testing.T) {
|
||||
fakeFeatureGates := &fakeRegistry{}
|
||||
require.NoError(t, AddFeaturesToExistingFeatureGates(fakeFeatureGates))
|
||||
require.Equal(t, defaultKubernetesFeatureGates, fakeFeatureGates.specs)
|
||||
}
|
||||
|
||||
type fakeRegistry struct {
|
||||
specs map[Feature]FeatureSpec
|
||||
}
|
||||
|
||||
func (f *fakeRegistry) Add(specs map[Feature]FeatureSpec) error {
|
||||
f.specs = specs
|
||||
return nil
|
||||
}
|
||||
49
features/known_features.go
Normal file
49
features/known_features.go
Normal file
@@ -0,0 +1,49 @@
|
||||
/*
|
||||
Copyright 2024 The Kubernetes Authors.
|
||||
|
||||
Licensed under the Apache License, Version 2.0 (the "License");
|
||||
you may not use this file except in compliance with the License.
|
||||
You may obtain a copy of the License at
|
||||
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
|
||||
Unless required by applicable law or agreed to in writing, software
|
||||
distributed under the License is distributed on an "AS IS" BASIS,
|
||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
*/
|
||||
|
||||
package features
|
||||
|
||||
const (
|
||||
// Every feature gate should add method here following this template:
|
||||
//
|
||||
// // owner: @username
|
||||
// // alpha: v1.4
|
||||
// MyFeature featuregate.Feature = "MyFeature"
|
||||
//
|
||||
// Feature gates should be listed in alphabetical, case-sensitive
|
||||
// (upper before any lower case character) order. This reduces the risk
|
||||
// of code conflicts because changes are more likely to be scattered
|
||||
// across the file.
|
||||
|
||||
// owner: @p0lyn0mial
|
||||
// beta: v1.30
|
||||
//
|
||||
// Allow the client to get a stream of individual items instead of chunking from the server.
|
||||
//
|
||||
// NOTE:
|
||||
// The feature is disabled in Beta by default because
|
||||
// it will only be turned on for selected control plane component(s).
|
||||
WatchListClient Feature = "WatchListClient"
|
||||
)
|
||||
|
||||
// defaultKubernetesFeatureGates consists of all known Kubernetes-specific feature keys.
|
||||
//
|
||||
// To add a new feature, define a key for it above and add it here.
|
||||
// After registering with the binary, the features are, by default, controllable using environment variables.
|
||||
// For more details, please see envVarFeatureGates implementation.
|
||||
var defaultKubernetesFeatureGates = map[Feature]FeatureSpec{
|
||||
WatchListClient: {Default: false, PreRelease: Beta},
|
||||
}
|
||||
79
features/testing/features_init_test.go
Normal file
79
features/testing/features_init_test.go
Normal file
@@ -0,0 +1,79 @@
|
||||
/*
|
||||
Copyright 2024 The Kubernetes Authors.
|
||||
|
||||
Licensed under the Apache License, Version 2.0 (the "License");
|
||||
you may not use this file except in compliance with the License.
|
||||
You may obtain a copy of the License at
|
||||
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
|
||||
Unless required by applicable law or agreed to in writing, software
|
||||
distributed under the License is distributed on an "AS IS" BASIS,
|
||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
*/
|
||||
|
||||
package testing
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"strings"
|
||||
"testing"
|
||||
|
||||
"github.com/stretchr/testify/require"
|
||||
|
||||
"k8s.io/client-go/features"
|
||||
)
|
||||
|
||||
func TestDriveInitDefaultFeatureGates(t *testing.T) {
|
||||
featureGates := features.FeatureGates()
|
||||
assertFunctionPanicsWithMessage(t, func() { featureGates.Enabled("FakeFeatureGate") }, "features.FeatureGates().Enabled", fmt.Sprintf("feature %q is not registered in FeatureGate", "FakeFeatureGate"))
|
||||
|
||||
fakeFeatureGates := &alwaysEnabledFakeGates{}
|
||||
require.True(t, fakeFeatureGates.Enabled("FakeFeatureGate"))
|
||||
|
||||
features.ReplaceFeatureGates(fakeFeatureGates)
|
||||
featureGates = features.FeatureGates()
|
||||
|
||||
assertFeatureGatesType(t, featureGates)
|
||||
require.True(t, featureGates.Enabled("FakeFeatureGate"))
|
||||
}
|
||||
|
||||
type alwaysEnabledFakeGates struct{}
|
||||
|
||||
func (f *alwaysEnabledFakeGates) Enabled(features.Feature) bool {
|
||||
return true
|
||||
}
|
||||
|
||||
func assertFeatureGatesType(t *testing.T, fg features.Gates) {
|
||||
_, ok := fg.(*alwaysEnabledFakeGates)
|
||||
if !ok {
|
||||
t.Fatalf("passed features.FeatureGates() is NOT of type *alwaysEnabledFakeGates, it is of type = %T", fg)
|
||||
}
|
||||
}
|
||||
|
||||
func assertFunctionPanicsWithMessage(t *testing.T, f func(), fName, errMessage string) {
|
||||
didPanic, panicMessage := didFunctionPanic(f)
|
||||
if !didPanic {
|
||||
t.Fatalf("function %q did not panicked", fName)
|
||||
}
|
||||
|
||||
panicError, ok := panicMessage.(error)
|
||||
if !ok || !strings.Contains(panicError.Error(), errMessage) {
|
||||
t.Fatalf("func %q should panic with error message:\t%#v\n\tPanic value:\t%#v\n", fName, errMessage, panicMessage)
|
||||
}
|
||||
}
|
||||
|
||||
func didFunctionPanic(f func()) (didPanic bool, panicMessage interface{}) {
|
||||
didPanic = true
|
||||
|
||||
defer func() {
|
||||
panicMessage = recover()
|
||||
}()
|
||||
|
||||
f()
|
||||
didPanic = false
|
||||
|
||||
return
|
||||
}
|
||||
24
go.mod
24
go.mod
@@ -2,7 +2,7 @@
|
||||
|
||||
module k8s.io/client-go
|
||||
|
||||
go 1.21.3
|
||||
go 1.21
|
||||
|
||||
require (
|
||||
github.com/evanphx/json-patch v4.12.0+incompatible
|
||||
@@ -19,15 +19,15 @@ require (
|
||||
github.com/peterbourgon/diskv v2.0.1+incompatible
|
||||
github.com/spf13/pflag v1.0.5
|
||||
github.com/stretchr/testify v1.8.4
|
||||
golang.org/x/net v0.17.0
|
||||
golang.org/x/net v0.19.0
|
||||
golang.org/x/oauth2 v0.10.0
|
||||
golang.org/x/term v0.13.0
|
||||
golang.org/x/term v0.17.0
|
||||
golang.org/x/time v0.3.0
|
||||
google.golang.org/protobuf v1.31.0
|
||||
k8s.io/api v0.0.0-20231104011324-cca653eefe74
|
||||
k8s.io/apimachinery v0.0.0-20231104004456-12dc3f82eb47
|
||||
k8s.io/klog/v2 v2.110.1
|
||||
k8s.io/kube-openapi v0.0.0-20231010175941-2dd684a91f00
|
||||
k8s.io/api v0.30.0-alpha.3
|
||||
k8s.io/apimachinery v0.30.0-alpha.3
|
||||
k8s.io/klog/v2 v2.120.1
|
||||
k8s.io/kube-openapi v0.0.0-20231113174909-778a5567bc1e
|
||||
k8s.io/utils v0.0.0-20230726121419-3b25d923346b
|
||||
sigs.k8s.io/json v0.0.0-20221116044647-bc3834ca7abd
|
||||
sigs.k8s.io/structured-merge-diff/v4 v4.4.1
|
||||
@@ -37,7 +37,7 @@ require (
|
||||
require (
|
||||
github.com/davecgh/go-spew v1.1.1 // indirect
|
||||
github.com/emicklei/go-restful/v3 v3.11.0 // indirect
|
||||
github.com/go-logr/logr v1.3.0 // indirect
|
||||
github.com/go-logr/logr v1.4.1 // indirect
|
||||
github.com/go-openapi/jsonpointer v0.19.6 // indirect
|
||||
github.com/go-openapi/jsonreference v0.20.2 // indirect
|
||||
github.com/go-openapi/swag v0.22.3 // indirect
|
||||
@@ -52,8 +52,8 @@ require (
|
||||
github.com/mxk/go-flowrate v0.0.0-20140419014527-cca7078d478f // indirect
|
||||
github.com/pkg/errors v0.9.1 // indirect
|
||||
github.com/pmezard/go-difflib v1.0.0 // indirect
|
||||
golang.org/x/sys v0.13.0 // indirect
|
||||
golang.org/x/text v0.13.0 // indirect
|
||||
golang.org/x/sys v0.17.0 // indirect
|
||||
golang.org/x/text v0.14.0 // indirect
|
||||
google.golang.org/appengine v1.6.7 // indirect
|
||||
gopkg.in/inf.v0 v0.9.1 // indirect
|
||||
gopkg.in/yaml.v2 v2.4.0 // indirect
|
||||
@@ -61,6 +61,6 @@ require (
|
||||
)
|
||||
|
||||
replace (
|
||||
k8s.io/api => k8s.io/api v0.0.0-20231104011324-cca653eefe74
|
||||
k8s.io/apimachinery => k8s.io/apimachinery v0.0.0-20231104004456-12dc3f82eb47
|
||||
k8s.io/api => k8s.io/api v0.30.0-alpha.3
|
||||
k8s.io/apimachinery => k8s.io/apimachinery v0.30.0-alpha.3
|
||||
)
|
||||
|
||||
48
go.sum
48
go.sum
@@ -8,8 +8,8 @@ github.com/emicklei/go-restful/v3 v3.11.0 h1:rAQeMHw1c7zTmncogyy8VvRZwtkmkZ4FxER
|
||||
github.com/emicklei/go-restful/v3 v3.11.0/go.mod h1:6n3XBCmQQb25CM2LCACGz8ukIrRry+4bhvbpWn3mrbc=
|
||||
github.com/evanphx/json-patch v4.12.0+incompatible h1:4onqiflcdA9EOZ4RxV643DvftH5pOlLGNtQ5lPWQu84=
|
||||
github.com/evanphx/json-patch v4.12.0+incompatible/go.mod h1:50XU6AFN0ol/bzJsmQLiYLvXMP4fmwYFNcr97nuDLSk=
|
||||
github.com/go-logr/logr v1.3.0 h1:2y3SDp0ZXuc6/cjLSZ+Q3ir+QB9T/iG5yYRXqsagWSY=
|
||||
github.com/go-logr/logr v1.3.0/go.mod h1:9T104GzyrTigFIr8wt5mBrctHMim0Nb2HLGrmQ40KvY=
|
||||
github.com/go-logr/logr v1.4.1 h1:pKouT5E8xu9zeFC39JXRDukb6JFQPXM5p5I91188VAQ=
|
||||
github.com/go-logr/logr v1.4.1/go.mod h1:9T104GzyrTigFIr8wt5mBrctHMim0Nb2HLGrmQ40KvY=
|
||||
github.com/go-openapi/jsonpointer v0.19.6 h1:eCs3fxoIi3Wh6vtgmLTOjdhSpiqphQ+DaPn38N2ZdrE=
|
||||
github.com/go-openapi/jsonpointer v0.19.6/go.mod h1:osyAmYz/mB/C3I+WsTTSgw1ONzaLJoLCyoi6/zppojs=
|
||||
github.com/go-openapi/jsonreference v0.20.2 h1:3sVjiK66+uXK/6oQ8xgcRKcFgQ5KXa2KvnJRumpMGbE=
|
||||
@@ -74,10 +74,10 @@ github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822 h1:C3w9PqII01/Oq
|
||||
github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822/go.mod h1:+n7T8mK8HuQTcFwEeznm/DIxMOiR9yIdICNftLE1DvQ=
|
||||
github.com/mxk/go-flowrate v0.0.0-20140419014527-cca7078d478f h1:y5//uYreIhSUg3J1GEMiLbxo1LJaP8RfCpH6pymGZus=
|
||||
github.com/mxk/go-flowrate v0.0.0-20140419014527-cca7078d478f/go.mod h1:ZdcZmHo+o7JKHSa8/e818NopupXU1YMK5fe1lsApnBw=
|
||||
github.com/onsi/ginkgo/v2 v2.13.0 h1:0jY9lJquiL8fcf3M4LAXN5aMlS/b2BV86HFFPCPMgE4=
|
||||
github.com/onsi/ginkgo/v2 v2.13.0/go.mod h1:TE309ZR8s5FsKKpuB1YAQYBzCaAfUgatB/xlT/ETL/o=
|
||||
github.com/onsi/gomega v1.29.0 h1:KIA/t2t5UBzoirT4H9tsML45GEbo3ouUnBHsCfD2tVg=
|
||||
github.com/onsi/gomega v1.29.0/go.mod h1:9sxs+SwGrKI0+PWe4Fxa9tFQQBG5xSsSbMXOI8PPpoQ=
|
||||
github.com/onsi/ginkgo/v2 v2.15.0 h1:79HwNRBAZHOEwrczrgSOPy+eFTTlIGELKy5as+ClttY=
|
||||
github.com/onsi/ginkgo/v2 v2.15.0/go.mod h1:HlxMHtYF57y6Dpf+mc5529KKmSq9h2FpCF+/ZkwUxKM=
|
||||
github.com/onsi/gomega v1.31.0 h1:54UJxxj6cPInHS3a35wm6BK/F9nHYueZ1NVujHDrnXE=
|
||||
github.com/onsi/gomega v1.31.0/go.mod h1:DW9aCi7U6Yi40wNVAvT6kzFnEVEI5n3DloYBiKiT6zk=
|
||||
github.com/peterbourgon/diskv v2.0.1+incompatible h1:UBdAOUP5p4RWqPBg048CAvpKN+vxiaj6gdUUzhl4XmI=
|
||||
github.com/peterbourgon/diskv v2.0.1+incompatible/go.mod h1:uqqh8zWWbv1HBMNONnaR/tNboyR3/BZd58JJSHlUSCU=
|
||||
github.com/pkg/errors v0.9.1 h1:FEBLx1zS214owpjy7qsBeixbURkuhQAwrK5UwLGTwt4=
|
||||
@@ -109,8 +109,8 @@ golang.org/x/net v0.0.0-20190603091049-60506f45cf65/go.mod h1:HSz+uSET+XFnRR8LxR
|
||||
golang.org/x/net v0.0.0-20190620200207-3b0461eec859/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s=
|
||||
golang.org/x/net v0.0.0-20200226121028-0de0cce0169b/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s=
|
||||
golang.org/x/net v0.0.0-20201021035429-f5854403a974/go.mod h1:sp8m0HH+o8qH0wwXwYZr8TS3Oi6o0r6Gce1SSxlDquU=
|
||||
golang.org/x/net v0.17.0 h1:pVaXccu2ozPjCXewfr1S7xza/zcXTity9cCdXQYSjIM=
|
||||
golang.org/x/net v0.17.0/go.mod h1:NxSsAGuq816PNPmqtQdLE42eU2Fs7NoRIZrHJAlaCOE=
|
||||
golang.org/x/net v0.19.0 h1:zTwKpTd2XuCqf8huc7Fo2iSy+4RHPd10s4KzeTnVr1c=
|
||||
golang.org/x/net v0.19.0/go.mod h1:CfAk/cbD4CthTvqiEl8NpboMuiuOYsAr/7NOjZJtv1U=
|
||||
golang.org/x/oauth2 v0.10.0 h1:zHCpF2Khkwy4mMB4bv0U37YtJdTGW8jI0glAApi0Kh8=
|
||||
golang.org/x/oauth2 v0.10.0/go.mod h1:kTpgurOux7LqtuxjuyZa4Gj2gdezIt/jQtGnNFfypQI=
|
||||
golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
|
||||
@@ -119,23 +119,23 @@ golang.org/x/sync v0.0.0-20201020160332-67f06af15bc9/go.mod h1:RxMgew5VJxzue5/jJ
|
||||
golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
|
||||
golang.org/x/sys v0.0.0-20190412213103-97732733099d/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
|
||||
golang.org/x/sys v0.0.0-20200930185726-fdedc70b468f/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
|
||||
golang.org/x/sys v0.13.0 h1:Af8nKPmuFypiUBjVoU9V20FiaFXOcuZI21p0ycVYYGE=
|
||||
golang.org/x/sys v0.13.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
|
||||
golang.org/x/term v0.13.0 h1:bb+I9cTfFazGW51MZqBVmZy7+JEJMouUHTUSKVQLBek=
|
||||
golang.org/x/term v0.13.0/go.mod h1:LTmsnFJwVN6bCy1rVCoS+qHT1HhALEFxKncY3WNNh4U=
|
||||
golang.org/x/sys v0.17.0 h1:25cE3gD+tdBA7lp7QfhuV+rJiE9YXTcS3VG1SqssI/Y=
|
||||
golang.org/x/sys v0.17.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA=
|
||||
golang.org/x/term v0.17.0 h1:mkTF7LCd6WGJNL3K1Ad7kwxNfYAW6a8a8QqtMblp/4U=
|
||||
golang.org/x/term v0.17.0/go.mod h1:lLRBjIVuehSbZlaOtGMbcMncT+aqLLLmKrsjNrUguwk=
|
||||
golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
|
||||
golang.org/x/text v0.3.2/go.mod h1:bEr9sfX3Q8Zfm5fL9x+3itogRgK3+ptLWKqgva+5dAk=
|
||||
golang.org/x/text v0.3.3/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ=
|
||||
golang.org/x/text v0.13.0 h1:ablQoSUd0tRdKxZewP80B+BaqeKJuVhuRxj/dkrun3k=
|
||||
golang.org/x/text v0.13.0/go.mod h1:TvPlkZtksWOMsz7fbANvkp4WM8x/WCo/om8BMLbz+aE=
|
||||
golang.org/x/text v0.14.0 h1:ScX5w1eTa3QqT8oi6+ziP7dTV1S2+ALU0bI+0zXKWiQ=
|
||||
golang.org/x/text v0.14.0/go.mod h1:18ZOQIKpY8NJVqYksKHtTdi31H5itFRjB5/qKTNYzSU=
|
||||
golang.org/x/time v0.3.0 h1:rg5rLMjNzMS1RkNLzCG38eapWhnYLFYXDXj2gOlr8j4=
|
||||
golang.org/x/time v0.3.0/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ=
|
||||
golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ=
|
||||
golang.org/x/tools v0.0.0-20191119224855-298f0cb1881e/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo=
|
||||
golang.org/x/tools v0.0.0-20200619180055-7c47624df98f/go.mod h1:EkVYQZoAsY45+roYkvgYkIh4xh/qjgUK9TdY2XT94GE=
|
||||
golang.org/x/tools v0.0.0-20210106214847-113979e3529a/go.mod h1:emZCQorbCU4vsT4fOWvOPXz4eW1wZW4PmDk9uLelYpA=
|
||||
golang.org/x/tools v0.12.0 h1:YW6HUoUmYBpwSgyaGaZq1fHjrBjX1rlpZ54T6mu2kss=
|
||||
golang.org/x/tools v0.12.0/go.mod h1:Sc0INKfu04TlqNoRA1hgpFZbhYXHPr4V5DzpSBTPqQM=
|
||||
golang.org/x/tools v0.16.1 h1:TLyB3WofjdOEepBHAU20JdNC1Zbg87elYofWYAY5oZA=
|
||||
golang.org/x/tools v0.16.1/go.mod h1:kYVVN6I1mBNoB1OX+noeBjbRk4IUEPa7JJ+TJMEooJ0=
|
||||
golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
|
||||
golang.org/x/xerrors v0.0.0-20191011141410-1b5146add898/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
|
||||
golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
|
||||
@@ -157,14 +157,14 @@ gopkg.in/yaml.v2 v2.4.0/go.mod h1:RDklbk79AGWmwhnvt/jBztapEOGDOx6ZbXqjP6csGnQ=
|
||||
gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
|
||||
gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA=
|
||||
gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
|
||||
k8s.io/api v0.0.0-20231104011324-cca653eefe74 h1:A/WZDLAcaGLXkFBbekgwUWKJBuRtPShuANcONbv3OrA=
|
||||
k8s.io/api v0.0.0-20231104011324-cca653eefe74/go.mod h1:6Z3XP8HABddpcRWQa0OQUhKH7fQ9DkfDiAfwsZYoav8=
|
||||
k8s.io/apimachinery v0.0.0-20231104004456-12dc3f82eb47 h1:wWtw59NclKzCKEt2E5e65INhGVzoUW0ll6X2AyDrsVU=
|
||||
k8s.io/apimachinery v0.0.0-20231104004456-12dc3f82eb47/go.mod h1:yFk3nwBh/jXlkMvRKH7BKtX7saT1lRmmGV6Ru0cTSUA=
|
||||
k8s.io/klog/v2 v2.110.1 h1:U/Af64HJf7FcwMcXyKm2RPM22WZzyR7OSpYj5tg3cL0=
|
||||
k8s.io/klog/v2 v2.110.1/go.mod h1:YGtd1984u+GgbuZ7e08/yBuAfKLSO0+uR1Fhi6ExXjo=
|
||||
k8s.io/kube-openapi v0.0.0-20231010175941-2dd684a91f00 h1:aVUu9fTY98ivBPKR9Y5w/AuzbMm96cd3YHRTU83I780=
|
||||
k8s.io/kube-openapi v0.0.0-20231010175941-2dd684a91f00/go.mod h1:AsvuZPBlUDVuCdzJ87iajxtXuR9oktsTctW/R9wwouA=
|
||||
k8s.io/api v0.30.0-alpha.3 h1:EcbaDi8WjoR8QdQS6LKd8oP0qjODWxfKdVj5U8WM1P0=
|
||||
k8s.io/api v0.30.0-alpha.3/go.mod h1:gUziZ7QreMQgwigIm0O6q1xN4w2DPIs6PwP9Ha3c9dQ=
|
||||
k8s.io/apimachinery v0.30.0-alpha.3 h1:9FoqT1Wc+48DJ+mYkbmZd3n4351u9YbGnQSPnVWUwWM=
|
||||
k8s.io/apimachinery v0.30.0-alpha.3/go.mod h1:/862Kkwje5hhHGJWPKiaHuov2c6mw6uCXWikV9kOIP4=
|
||||
k8s.io/klog/v2 v2.120.1 h1:QXU6cPEOIslTGvZaXvFWiP9VKyeet3sawzTOvdXb4Vw=
|
||||
k8s.io/klog/v2 v2.120.1/go.mod h1:3Jpz1GvMt720eyJH1ckRHK1EDfpxISzJ7I9OYgaDtPE=
|
||||
k8s.io/kube-openapi v0.0.0-20231113174909-778a5567bc1e h1:snPmy96t93RredGRjKfMFt+gvxuVAncqSAyBveJtr4Q=
|
||||
k8s.io/kube-openapi v0.0.0-20231113174909-778a5567bc1e/go.mod h1:AsvuZPBlUDVuCdzJ87iajxtXuR9oktsTctW/R9wwouA=
|
||||
k8s.io/utils v0.0.0-20230726121419-3b25d923346b h1:sgn3ZU783SCgtaSJjpcVVlRqd6GSnlTLKgpAAttJvpI=
|
||||
k8s.io/utils v0.0.0-20230726121419-3b25d923346b/go.mod h1:OLgZIPagt7ERELqWJFomSt595RzquPNLL48iOWgYOg0=
|
||||
sigs.k8s.io/json v0.0.0-20221116044647-bc3834ca7abd h1:EDPBXCAspyGV4jQlpZSudPeMmr1bNJefnuqLsRAsHZo=
|
||||
|
||||
@@ -15,4 +15,4 @@ limitations under the License.
|
||||
*/
|
||||
|
||||
// Package informers provides generated informers for Kubernetes APIs.
|
||||
package informers
|
||||
package informers // import "k8s.io/client-go/informers"
|
||||
|
||||
@@ -16,4 +16,4 @@ limitations under the License.
|
||||
|
||||
// Package kubernetes holds packages which implement a clientset for Kubernetes
|
||||
// APIs.
|
||||
package kubernetes
|
||||
package kubernetes // import "k8s.io/client-go/kubernetes"
|
||||
|
||||
@@ -15,4 +15,4 @@ limitations under the License.
|
||||
*/
|
||||
|
||||
// Package listers provides generated listers for Kubernetes APIs.
|
||||
package listers
|
||||
package listers // import "k8s.io/client-go/listers"
|
||||
|
||||
@@ -191,7 +191,7 @@ func (c *client) Get(ctx context.Context, name string, opts metav1.GetOptions, s
|
||||
}
|
||||
obj, err := result.Get()
|
||||
if runtime.IsNotRegisteredError(err) {
|
||||
klog.V(5).Infof("Unable to retrieve PartialObjectMetadata: %#v", err)
|
||||
klog.FromContext(ctx).V(5).Info("Could not retrieve PartialObjectMetadata", "err", err)
|
||||
rawBytes, err := result.Raw()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
@@ -227,7 +227,7 @@ func (c *client) List(ctx context.Context, opts metav1.ListOptions) (*metav1.Par
|
||||
}
|
||||
obj, err := result.Get()
|
||||
if runtime.IsNotRegisteredError(err) {
|
||||
klog.V(5).Infof("Unable to retrieve PartialObjectMetadataList: %#v", err)
|
||||
klog.FromContext(ctx).V(5).Info("Could not retrieve PartialObjectMetadataList", "err", err)
|
||||
rawBytes, err := result.Raw()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
|
||||
@@ -32,6 +32,7 @@ import (
|
||||
"k8s.io/apimachinery/pkg/runtime"
|
||||
"k8s.io/apimachinery/pkg/runtime/schema"
|
||||
"k8s.io/client-go/rest"
|
||||
"k8s.io/klog/v2/ktesting"
|
||||
)
|
||||
|
||||
func TestClient(t *testing.T) {
|
||||
@@ -55,7 +56,7 @@ func TestClient(t *testing.T) {
|
||||
testCases := []struct {
|
||||
name string
|
||||
handler func(t *testing.T, w http.ResponseWriter, req *http.Request)
|
||||
want func(t *testing.T, client *Client)
|
||||
want func(ctx context.Context, t *testing.T, client *Client)
|
||||
}{
|
||||
{
|
||||
name: "GET is able to convert a JSON object to PartialObjectMetadata",
|
||||
@@ -77,8 +78,8 @@ func TestClient(t *testing.T) {
|
||||
},
|
||||
})
|
||||
},
|
||||
want: func(t *testing.T, client *Client) {
|
||||
obj, err := client.Resource(gvr).Namespace("ns").Get(context.TODO(), "name", metav1.GetOptions{})
|
||||
want: func(ctx context.Context, t *testing.T, client *Client) {
|
||||
obj, err := client.Resource(gvr).Namespace("ns").Get(ctx, "name", metav1.GetOptions{})
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
@@ -125,8 +126,8 @@ func TestClient(t *testing.T) {
|
||||
},
|
||||
})
|
||||
},
|
||||
want: func(t *testing.T, client *Client) {
|
||||
objs, err := client.Resource(gvr).Namespace("ns").List(context.TODO(), metav1.ListOptions{})
|
||||
want: func(ctx context.Context, t *testing.T, client *Client) {
|
||||
objs, err := client.Resource(gvr).Namespace("ns").List(ctx, metav1.ListOptions{})
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
@@ -167,8 +168,8 @@ func TestClient(t *testing.T) {
|
||||
},
|
||||
})
|
||||
},
|
||||
want: func(t *testing.T, client *Client) {
|
||||
obj, err := client.Resource(gvr).Namespace("ns").Get(context.TODO(), "name", metav1.GetOptions{})
|
||||
want: func(ctx context.Context, t *testing.T, client *Client) {
|
||||
obj, err := client.Resource(gvr).Namespace("ns").Get(ctx, "name", metav1.GetOptions{})
|
||||
if err == nil || !runtime.IsMissingKind(err) {
|
||||
t.Fatal(err)
|
||||
}
|
||||
@@ -196,8 +197,8 @@ func TestClient(t *testing.T) {
|
||||
},
|
||||
})
|
||||
},
|
||||
want: func(t *testing.T, client *Client) {
|
||||
obj, err := client.Resource(gvr).Namespace("ns").Get(context.TODO(), "name", metav1.GetOptions{})
|
||||
want: func(ctx context.Context, t *testing.T, client *Client) {
|
||||
obj, err := client.Resource(gvr).Namespace("ns").Get(ctx, "name", metav1.GetOptions{})
|
||||
if err == nil || !runtime.IsMissingVersion(err) {
|
||||
t.Fatal(err)
|
||||
}
|
||||
@@ -224,8 +225,8 @@ func TestClient(t *testing.T) {
|
||||
ObjectMeta: metav1.ObjectMeta{},
|
||||
})
|
||||
},
|
||||
want: func(t *testing.T, client *Client) {
|
||||
obj, err := client.Resource(gvr).Namespace("ns").Get(context.TODO(), "name", metav1.GetOptions{})
|
||||
want: func(ctx context.Context, t *testing.T, client *Client) {
|
||||
obj, err := client.Resource(gvr).Namespace("ns").Get(ctx, "name", metav1.GetOptions{})
|
||||
if err == nil || !strings.Contains(err.Error(), "object does not appear to match the ObjectMeta schema") {
|
||||
t.Fatal(err)
|
||||
}
|
||||
@@ -254,8 +255,8 @@ func TestClient(t *testing.T) {
|
||||
}
|
||||
writeJSON(t, w, statusOK)
|
||||
},
|
||||
want: func(t *testing.T, client *Client) {
|
||||
err := client.Resource(gvr).Namespace("ns").Delete(context.TODO(), "name", metav1.DeleteOptions{})
|
||||
want: func(ctx context.Context, t *testing.T, client *Client) {
|
||||
err := client.Resource(gvr).Namespace("ns").Delete(ctx, "name", metav1.DeleteOptions{})
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
@@ -282,8 +283,8 @@ func TestClient(t *testing.T) {
|
||||
|
||||
writeJSON(t, w, statusOK)
|
||||
},
|
||||
want: func(t *testing.T, client *Client) {
|
||||
err := client.Resource(gvr).Namespace("ns").DeleteCollection(context.TODO(), metav1.DeleteOptions{}, metav1.ListOptions{})
|
||||
want: func(ctx context.Context, t *testing.T, client *Client) {
|
||||
err := client.Resource(gvr).Namespace("ns").DeleteCollection(ctx, metav1.DeleteOptions{}, metav1.ListOptions{})
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
@@ -296,9 +297,10 @@ func TestClient(t *testing.T) {
|
||||
s := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) { tt.handler(t, w, req) }))
|
||||
defer s.Close()
|
||||
|
||||
_, ctx := ktesting.NewTestContext(t)
|
||||
cfg := ConfigFor(&rest.Config{Host: s.URL})
|
||||
client := NewForConfigOrDie(cfg).(*Client)
|
||||
tt.want(t, client)
|
||||
tt.want(ctx, t, client)
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
@@ -5235,7 +5235,7 @@
|
||||
"description": "SecurityContext holds pod-level security attributes and common container settings. Optional: Defaults to empty. See type description for default values of each field."
|
||||
},
|
||||
"serviceAccount": {
|
||||
"description": "DeprecatedServiceAccount is a depreciated alias for ServiceAccountName. Deprecated: Use serviceAccountName instead.",
|
||||
"description": "DeprecatedServiceAccount is a deprecated alias for ServiceAccountName. Deprecated: Use serviceAccountName instead.",
|
||||
"type": "string"
|
||||
},
|
||||
"serviceAccountName": {
|
||||
|
||||
@@ -3677,7 +3677,7 @@
|
||||
"description": "SecurityContext holds pod-level security attributes and common container settings. Optional: Defaults to empty. See type description for default values of each field."
|
||||
},
|
||||
"serviceAccount": {
|
||||
"description": "DeprecatedServiceAccount is a depreciated alias for ServiceAccountName. Deprecated: Use serviceAccountName instead.",
|
||||
"description": "DeprecatedServiceAccount is a deprecated alias for ServiceAccountName. Deprecated: Use serviceAccountName instead.",
|
||||
"type": "string"
|
||||
},
|
||||
"serviceAccountName": {
|
||||
|
||||
@@ -2851,7 +2851,7 @@
|
||||
"description": "SecurityContext holds pod-level security attributes and common container settings. Optional: Defaults to empty. See type description for default values of each field."
|
||||
},
|
||||
"serviceAccount": {
|
||||
"description": "DeprecatedServiceAccount is a depreciated alias for ServiceAccountName. Deprecated: Use serviceAccountName instead.",
|
||||
"description": "DeprecatedServiceAccount is a deprecated alias for ServiceAccountName. Deprecated: Use serviceAccountName instead.",
|
||||
"type": "string"
|
||||
},
|
||||
"serviceAccountName": {
|
||||
|
||||
@@ -19,4 +19,4 @@ limitations under the License.
|
||||
// It doesn't have any of its own types -- it's just necessary to
|
||||
// get the expected behavior out of runtime.Scheme.ConvertToVersion
|
||||
// and associated methods.
|
||||
package appsint
|
||||
package appsint // import "k8s.io/client-go/scale/scheme/appsint"
|
||||
|
||||
@@ -19,4 +19,4 @@ limitations under the License.
|
||||
// Package scheme contains a runtime.Scheme to be used for serializing
|
||||
// and deserializing different versions of Scale, and for converting
|
||||
// in between them.
|
||||
package scheme
|
||||
package scheme // import "k8s.io/client-go/scale/scheme"
|
||||
|
||||
@@ -19,4 +19,4 @@ limitations under the License.
|
||||
// It doesn't have any of its own types -- it's just necessary to
|
||||
// get the expected behavior out of runtime.Scheme.ConvertToVersion
|
||||
// and associated methods.
|
||||
package extensionsint
|
||||
package extensionsint // import "k8s.io/client-go/scale/scheme/extensionsint"
|
||||
|
||||
3
tools/cache/index.go
vendored
3
tools/cache/index.go
vendored
@@ -50,8 +50,7 @@ type Indexer interface {
|
||||
// GetIndexers return the indexers
|
||||
GetIndexers() Indexers
|
||||
|
||||
// AddIndexers adds more indexers to this store. If you call this after you already have data
|
||||
// in the store, the results are undefined.
|
||||
// AddIndexers adds more indexers to this store. This supports adding indexes after the store already has items.
|
||||
AddIndexers(newIndexers Indexers) error
|
||||
}
|
||||
|
||||
|
||||
18
tools/cache/reflector.go
vendored
18
tools/cache/reflector.go
vendored
@@ -43,6 +43,7 @@ import (
|
||||
"k8s.io/klog/v2"
|
||||
"k8s.io/utils/clock"
|
||||
"k8s.io/utils/pointer"
|
||||
"k8s.io/utils/ptr"
|
||||
"k8s.io/utils/trace"
|
||||
)
|
||||
|
||||
@@ -107,7 +108,9 @@ type Reflector struct {
|
||||
// might result in an increased memory consumption of the APIServer.
|
||||
//
|
||||
// See https://github.com/kubernetes/enhancements/tree/master/keps/sig-api-machinery/3157-watch-list#design-details
|
||||
UseWatchList bool
|
||||
//
|
||||
// TODO(#115478): Consider making reflector.UseWatchList a private field. Since we implemented "api streaming" on the etcd storage layer it should work.
|
||||
UseWatchList *bool
|
||||
}
|
||||
|
||||
// ResourceVersionUpdater is an interface that allows store implementation to
|
||||
@@ -237,8 +240,12 @@ func NewReflectorWithOptions(lw ListerWatcher, expectedType interface{}, store S
|
||||
r.expectedGVK = getExpectedGVKFromObject(expectedType)
|
||||
}
|
||||
|
||||
if s := os.Getenv("ENABLE_CLIENT_GO_WATCH_LIST_ALPHA"); len(s) > 0 {
|
||||
r.UseWatchList = true
|
||||
// don't overwrite UseWatchList if already set
|
||||
// because the higher layers (e.g. storage/cacher) disabled it on purpose
|
||||
if r.UseWatchList == nil {
|
||||
if s := os.Getenv("ENABLE_CLIENT_GO_WATCH_LIST_ALPHA"); len(s) > 0 {
|
||||
r.UseWatchList = ptr.To(true)
|
||||
}
|
||||
}
|
||||
|
||||
return r
|
||||
@@ -325,9 +332,10 @@ func (r *Reflector) ListAndWatch(stopCh <-chan struct{}) error {
|
||||
klog.V(3).Infof("Listing and watching %v from %s", r.typeDescription, r.name)
|
||||
var err error
|
||||
var w watch.Interface
|
||||
fallbackToList := !r.UseWatchList
|
||||
useWatchList := ptr.Deref(r.UseWatchList, false)
|
||||
fallbackToList := !useWatchList
|
||||
|
||||
if r.UseWatchList {
|
||||
if useWatchList {
|
||||
w, err = r.watchList(stopCh)
|
||||
if w == nil && err == nil {
|
||||
// stopCh was closed
|
||||
|
||||
5
tools/cache/reflector_watchlist_test.go
vendored
5
tools/cache/reflector_watchlist_test.go
vendored
@@ -33,6 +33,7 @@ import (
|
||||
"k8s.io/apimachinery/pkg/types"
|
||||
"k8s.io/apimachinery/pkg/watch"
|
||||
"k8s.io/utils/pointer"
|
||||
"k8s.io/utils/ptr"
|
||||
)
|
||||
|
||||
func TestWatchList(t *testing.T) {
|
||||
@@ -415,7 +416,7 @@ func TestWatchList(t *testing.T) {
|
||||
listWatcher.customListResponse = scenario.podList
|
||||
listWatcher.closeAfterListRequests = scenario.closeAfterListRequests
|
||||
if scenario.disableUseWatchList {
|
||||
reflector.UseWatchList = false
|
||||
reflector.UseWatchList = ptr.To(false)
|
||||
}
|
||||
|
||||
err := reflector.ListAndWatch(stopCh)
|
||||
@@ -505,7 +506,7 @@ func testData() (*fakeListWatcher, Store, *Reflector, chan struct{}) {
|
||||
},
|
||||
}
|
||||
r := NewReflector(lw, &v1.Pod{}, s, 0)
|
||||
r.UseWatchList = true
|
||||
r.UseWatchList = ptr.To(true)
|
||||
|
||||
return lw, s, r, stopCh
|
||||
}
|
||||
|
||||
4
tools/cache/shared_informer.go
vendored
4
tools/cache/shared_informer.go
vendored
@@ -540,8 +540,8 @@ func (s *sharedIndexInformer) AddIndexers(indexers Indexers) error {
|
||||
s.startedLock.Lock()
|
||||
defer s.startedLock.Unlock()
|
||||
|
||||
if s.started {
|
||||
return fmt.Errorf("informer has already started")
|
||||
if s.stopped {
|
||||
return fmt.Errorf("indexer was not added because it has stopped already")
|
||||
}
|
||||
|
||||
return s.indexer.AddIndexers(indexers)
|
||||
|
||||
78
tools/cache/shared_informer_test.go
vendored
78
tools/cache/shared_informer_test.go
vendored
@@ -26,6 +26,9 @@ import (
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/google/go-cmp/cmp"
|
||||
"github.com/google/go-cmp/cmp/cmpopts"
|
||||
"github.com/stretchr/testify/assert"
|
||||
v1 "k8s.io/api/core/v1"
|
||||
"k8s.io/apimachinery/pkg/api/meta"
|
||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||
@@ -117,6 +120,81 @@ func isRegistered(i SharedInformer, h ResourceEventHandlerRegistration) bool {
|
||||
return s.processor.getListener(h) != nil
|
||||
}
|
||||
|
||||
func TestIndexer(t *testing.T) {
|
||||
assert := assert.New(t)
|
||||
// source simulates an apiserver object endpoint.
|
||||
source := fcache.NewFakeControllerSource()
|
||||
pod1 := &v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "pod1", Labels: map[string]string{"a": "a-val", "b": "b-val1"}}}
|
||||
pod2 := &v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "pod2", Labels: map[string]string{"b": "b-val2"}}}
|
||||
pod3 := &v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "pod3", Labels: map[string]string{"a": "a-val2"}}}
|
||||
source.Add(pod1)
|
||||
source.Add(pod2)
|
||||
|
||||
// create the shared informer and resync every 1s
|
||||
informer := NewSharedInformer(source, &v1.Pod{}, 1*time.Second).(*sharedIndexInformer)
|
||||
err := informer.AddIndexers(map[string]IndexFunc{
|
||||
"labels": func(obj interface{}) ([]string, error) {
|
||||
res := []string{}
|
||||
for k := range obj.(*v1.Pod).Labels {
|
||||
res = append(res, k)
|
||||
}
|
||||
return res, nil
|
||||
},
|
||||
})
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
stop := make(chan struct{})
|
||||
defer close(stop)
|
||||
|
||||
go informer.Run(stop)
|
||||
WaitForCacheSync(stop, informer.HasSynced)
|
||||
|
||||
cmpOps := cmpopts.SortSlices(func(a, b any) bool {
|
||||
return a.(*v1.Pod).Name < b.(*v1.Pod).Name
|
||||
})
|
||||
|
||||
// We should be able to lookup by index
|
||||
res, err := informer.GetIndexer().ByIndex("labels", "a")
|
||||
assert.NoError(err)
|
||||
if diff := cmp.Diff([]any{pod1}, res); diff != "" {
|
||||
t.Fatal(diff)
|
||||
}
|
||||
|
||||
// Adding an item later is fine as well
|
||||
source.Add(pod3)
|
||||
// Event is async, need to poll
|
||||
assert.Eventually(func() bool {
|
||||
res, _ := informer.GetIndexer().ByIndex("labels", "a")
|
||||
return cmp.Diff([]any{pod1, pod3}, res, cmpOps) == ""
|
||||
}, time.Second*3, time.Millisecond)
|
||||
|
||||
// Adding an index later is also fine
|
||||
err = informer.AddIndexers(map[string]IndexFunc{
|
||||
"labels-again": func(obj interface{}) ([]string, error) {
|
||||
res := []string{}
|
||||
for k := range obj.(*v1.Pod).Labels {
|
||||
res = append(res, k)
|
||||
}
|
||||
return res, nil
|
||||
},
|
||||
})
|
||||
assert.NoError(err)
|
||||
|
||||
// Should be immediately available
|
||||
res, err = informer.GetIndexer().ByIndex("labels-again", "a")
|
||||
assert.NoError(err)
|
||||
if diff := cmp.Diff([]any{pod1, pod3}, res, cmpOps); diff != "" {
|
||||
t.Fatal(diff)
|
||||
}
|
||||
if got := informer.GetIndexer().ListIndexFuncValues("labels"); !sets.New(got...).Equal(sets.New("a", "b")) {
|
||||
t.Fatalf("got %v", got)
|
||||
}
|
||||
if got := informer.GetIndexer().ListIndexFuncValues("labels-again"); !sets.New(got...).Equal(sets.New("a", "b")) {
|
||||
t.Fatalf("got %v", got)
|
||||
}
|
||||
}
|
||||
|
||||
func TestListenerResyncPeriods(t *testing.T) {
|
||||
// source simulates an apiserver object endpoint.
|
||||
source := fcache.NewFakeControllerSource()
|
||||
|
||||
108
tools/cache/thread_safe_store.go
vendored
108
tools/cache/thread_safe_store.go
vendored
@@ -52,8 +52,7 @@ type ThreadSafeStore interface {
|
||||
ByIndex(indexName, indexedValue string) ([]interface{}, error)
|
||||
GetIndexers() Indexers
|
||||
|
||||
// AddIndexers adds more indexers to this store. If you call this after you already have data
|
||||
// in the store, the results are undefined.
|
||||
// AddIndexers adds more indexers to this store. This supports adding indexes after the store already has items.
|
||||
AddIndexers(newIndexers Indexers) error
|
||||
// Resync is a no-op and is deprecated
|
||||
Resync() error
|
||||
@@ -135,50 +134,66 @@ func (i *storeIndex) addIndexers(newIndexers Indexers) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
// updateSingleIndex modifies the objects location in the named index:
|
||||
// - for create you must provide only the newObj
|
||||
// - for update you must provide both the oldObj and the newObj
|
||||
// - for delete you must provide only the oldObj
|
||||
// updateSingleIndex must be called from a function that already has a lock on the cache
|
||||
func (i *storeIndex) updateSingleIndex(name string, oldObj interface{}, newObj interface{}, key string) {
|
||||
var oldIndexValues, indexValues []string
|
||||
indexFunc, ok := i.indexers[name]
|
||||
if !ok {
|
||||
// Should never happen. Caller is responsible for ensuring this exists, and should call with lock
|
||||
// held to avoid any races.
|
||||
panic(fmt.Errorf("indexer %q does not exist", name))
|
||||
}
|
||||
if oldObj != nil {
|
||||
var err error
|
||||
oldIndexValues, err = indexFunc(oldObj)
|
||||
if err != nil {
|
||||
panic(fmt.Errorf("unable to calculate an index entry for key %q on index %q: %v", key, name, err))
|
||||
}
|
||||
} else {
|
||||
oldIndexValues = oldIndexValues[:0]
|
||||
}
|
||||
|
||||
if newObj != nil {
|
||||
var err error
|
||||
indexValues, err = indexFunc(newObj)
|
||||
if err != nil {
|
||||
panic(fmt.Errorf("unable to calculate an index entry for key %q on index %q: %v", key, name, err))
|
||||
}
|
||||
} else {
|
||||
indexValues = indexValues[:0]
|
||||
}
|
||||
|
||||
index := i.indices[name]
|
||||
if index == nil {
|
||||
index = Index{}
|
||||
i.indices[name] = index
|
||||
}
|
||||
|
||||
if len(indexValues) == 1 && len(oldIndexValues) == 1 && indexValues[0] == oldIndexValues[0] {
|
||||
// We optimize for the most common case where indexFunc returns a single value which has not been changed
|
||||
return
|
||||
}
|
||||
|
||||
for _, value := range oldIndexValues {
|
||||
i.deleteKeyFromIndex(key, value, index)
|
||||
}
|
||||
for _, value := range indexValues {
|
||||
i.addKeyToIndex(key, value, index)
|
||||
}
|
||||
}
|
||||
|
||||
// updateIndices modifies the objects location in the managed indexes:
|
||||
// - for create you must provide only the newObj
|
||||
// - for update you must provide both the oldObj and the newObj
|
||||
// - for delete you must provide only the oldObj
|
||||
// updateIndices must be called from a function that already has a lock on the cache
|
||||
func (i *storeIndex) updateIndices(oldObj interface{}, newObj interface{}, key string) {
|
||||
var oldIndexValues, indexValues []string
|
||||
var err error
|
||||
for name, indexFunc := range i.indexers {
|
||||
if oldObj != nil {
|
||||
oldIndexValues, err = indexFunc(oldObj)
|
||||
} else {
|
||||
oldIndexValues = oldIndexValues[:0]
|
||||
}
|
||||
if err != nil {
|
||||
panic(fmt.Errorf("unable to calculate an index entry for key %q on index %q: %v", key, name, err))
|
||||
}
|
||||
|
||||
if newObj != nil {
|
||||
indexValues, err = indexFunc(newObj)
|
||||
} else {
|
||||
indexValues = indexValues[:0]
|
||||
}
|
||||
if err != nil {
|
||||
panic(fmt.Errorf("unable to calculate an index entry for key %q on index %q: %v", key, name, err))
|
||||
}
|
||||
|
||||
index := i.indices[name]
|
||||
if index == nil {
|
||||
index = Index{}
|
||||
i.indices[name] = index
|
||||
}
|
||||
|
||||
if len(indexValues) == 1 && len(oldIndexValues) == 1 && indexValues[0] == oldIndexValues[0] {
|
||||
// We optimize for the most common case where indexFunc returns a single value which has not been changed
|
||||
continue
|
||||
}
|
||||
|
||||
for _, value := range oldIndexValues {
|
||||
i.deleteKeyFromIndex(key, value, index)
|
||||
}
|
||||
for _, value := range indexValues {
|
||||
i.addKeyToIndex(key, value, index)
|
||||
}
|
||||
for name := range i.indexers {
|
||||
i.updateSingleIndex(name, oldObj, newObj, key)
|
||||
}
|
||||
}
|
||||
|
||||
@@ -339,11 +354,18 @@ func (c *threadSafeMap) AddIndexers(newIndexers Indexers) error {
|
||||
c.lock.Lock()
|
||||
defer c.lock.Unlock()
|
||||
|
||||
if len(c.items) > 0 {
|
||||
return fmt.Errorf("cannot add indexers to running index")
|
||||
if err := c.index.addIndexers(newIndexers); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
return c.index.addIndexers(newIndexers)
|
||||
// If there are already items, index them
|
||||
for key, item := range c.items {
|
||||
for name := range newIndexers {
|
||||
c.index.updateSingleIndex(name, nil, item, key)
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (c *threadSafeMap) Resync() error {
|
||||
|
||||
@@ -16,4 +16,4 @@ limitations under the License.
|
||||
|
||||
// +k8s:deepcopy-gen=package
|
||||
|
||||
package api
|
||||
package api // import "k8s.io/client-go/tools/clientcmd/api"
|
||||
|
||||
@@ -18,4 +18,4 @@ limitations under the License.
|
||||
// +k8s:deepcopy-gen=package
|
||||
// +k8s:defaulter-gen=Kind
|
||||
|
||||
package v1
|
||||
package v1 // import "k8s.io/client-go/tools/clientcmd/api/v1"
|
||||
|
||||
@@ -72,6 +72,13 @@ type ClientConfig interface {
|
||||
ConfigAccess() ConfigAccess
|
||||
}
|
||||
|
||||
// OverridingClientConfig is used to enable overrriding the raw KubeConfig
|
||||
type OverridingClientConfig interface {
|
||||
ClientConfig
|
||||
// MergedRawConfig return the RawConfig merged with all overrides.
|
||||
MergedRawConfig() (clientcmdapi.Config, error)
|
||||
}
|
||||
|
||||
type PersistAuthProviderConfigForUser func(user string) restclient.AuthProviderConfigPersister
|
||||
|
||||
type promptedCredentials struct {
|
||||
@@ -91,22 +98,22 @@ type DirectClientConfig struct {
|
||||
}
|
||||
|
||||
// NewDefaultClientConfig creates a DirectClientConfig using the config.CurrentContext as the context name
|
||||
func NewDefaultClientConfig(config clientcmdapi.Config, overrides *ConfigOverrides) ClientConfig {
|
||||
func NewDefaultClientConfig(config clientcmdapi.Config, overrides *ConfigOverrides) OverridingClientConfig {
|
||||
return &DirectClientConfig{config, config.CurrentContext, overrides, nil, NewDefaultClientConfigLoadingRules(), promptedCredentials{}}
|
||||
}
|
||||
|
||||
// NewNonInteractiveClientConfig creates a DirectClientConfig using the passed context name and does not have a fallback reader for auth information
|
||||
func NewNonInteractiveClientConfig(config clientcmdapi.Config, contextName string, overrides *ConfigOverrides, configAccess ConfigAccess) ClientConfig {
|
||||
func NewNonInteractiveClientConfig(config clientcmdapi.Config, contextName string, overrides *ConfigOverrides, configAccess ConfigAccess) OverridingClientConfig {
|
||||
return &DirectClientConfig{config, contextName, overrides, nil, configAccess, promptedCredentials{}}
|
||||
}
|
||||
|
||||
// NewInteractiveClientConfig creates a DirectClientConfig using the passed context name and a reader in case auth information is not provided via files or flags
|
||||
func NewInteractiveClientConfig(config clientcmdapi.Config, contextName string, overrides *ConfigOverrides, fallbackReader io.Reader, configAccess ConfigAccess) ClientConfig {
|
||||
func NewInteractiveClientConfig(config clientcmdapi.Config, contextName string, overrides *ConfigOverrides, fallbackReader io.Reader, configAccess ConfigAccess) OverridingClientConfig {
|
||||
return &DirectClientConfig{config, contextName, overrides, fallbackReader, configAccess, promptedCredentials{}}
|
||||
}
|
||||
|
||||
// NewClientConfigFromBytes takes your kubeconfig and gives you back a ClientConfig
|
||||
func NewClientConfigFromBytes(configBytes []byte) (ClientConfig, error) {
|
||||
func NewClientConfigFromBytes(configBytes []byte) (OverridingClientConfig, error) {
|
||||
config, err := Load(configBytes)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
@@ -129,6 +136,40 @@ func (config *DirectClientConfig) RawConfig() (clientcmdapi.Config, error) {
|
||||
return config.config, nil
|
||||
}
|
||||
|
||||
// MergedRawConfig returns the raw kube config merged with the overrides
|
||||
func (config *DirectClientConfig) MergedRawConfig() (clientcmdapi.Config, error) {
|
||||
if err := config.ConfirmUsable(); err != nil {
|
||||
return clientcmdapi.Config{}, err
|
||||
}
|
||||
merged := config.config.DeepCopy()
|
||||
|
||||
// set the AuthInfo merged with overrides in the merged config
|
||||
mergedAuthInfo, err := config.getAuthInfo()
|
||||
if err != nil {
|
||||
return clientcmdapi.Config{}, err
|
||||
}
|
||||
mergedAuthInfoName, _ := config.getAuthInfoName()
|
||||
merged.AuthInfos[mergedAuthInfoName] = &mergedAuthInfo
|
||||
|
||||
// set the Context merged with overrides in the merged config
|
||||
mergedContext, err := config.getContext()
|
||||
if err != nil {
|
||||
return clientcmdapi.Config{}, err
|
||||
}
|
||||
mergedContextName, _ := config.getContextName()
|
||||
merged.Contexts[mergedContextName] = &mergedContext
|
||||
merged.CurrentContext = mergedContextName
|
||||
|
||||
// set the Cluster merged with overrides in the merged config
|
||||
configClusterInfo, err := config.getCluster()
|
||||
if err != nil {
|
||||
return clientcmdapi.Config{}, err
|
||||
}
|
||||
configClusterName, _ := config.getClusterName()
|
||||
merged.Clusters[configClusterName] = &configClusterInfo
|
||||
return *merged, nil
|
||||
}
|
||||
|
||||
// ClientConfig implements ClientConfig
|
||||
func (config *DirectClientConfig) ClientConfig() (*restclient.Config, error) {
|
||||
// check that getAuthInfo, getContext, and getCluster do not return an error.
|
||||
|
||||
@@ -1013,3 +1013,68 @@ func TestCleanANSIEscapeCodes(t *testing.T) {
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
func TestMergeRawConfigDoOverride(t *testing.T) {
|
||||
const (
|
||||
server = "https://anything.com:8080"
|
||||
token = "the-token"
|
||||
modifiedServer = "http://localhost:8081"
|
||||
modifiedToken = "modified-token"
|
||||
)
|
||||
config := createValidTestConfig()
|
||||
|
||||
// add another context which to modify with overrides
|
||||
config.Clusters["modify"] = &clientcmdapi.Cluster{
|
||||
Server: server,
|
||||
}
|
||||
config.AuthInfos["modify"] = &clientcmdapi.AuthInfo{
|
||||
Token: token,
|
||||
}
|
||||
config.Contexts["modify"] = &clientcmdapi.Context{
|
||||
Cluster: "modify",
|
||||
AuthInfo: "modify",
|
||||
Namespace: "modify",
|
||||
}
|
||||
|
||||
// create overrides for the modify context
|
||||
overrides := &ConfigOverrides{
|
||||
ClusterInfo: clientcmdapi.Cluster{
|
||||
Server: modifiedServer,
|
||||
},
|
||||
Context: clientcmdapi.Context{
|
||||
Namespace: "foobar",
|
||||
Cluster: "modify",
|
||||
AuthInfo: "modify",
|
||||
},
|
||||
AuthInfo: clientcmdapi.AuthInfo{
|
||||
Token: modifiedToken,
|
||||
},
|
||||
CurrentContext: "modify",
|
||||
}
|
||||
|
||||
cut := NewDefaultClientConfig(*config, overrides)
|
||||
act, err := cut.MergedRawConfig()
|
||||
if err != nil {
|
||||
t.Fatalf("Unexpected error: %v", err)
|
||||
}
|
||||
|
||||
// ensure overrides were applied to "modify"
|
||||
actContext := act.CurrentContext
|
||||
if actContext != "modify" {
|
||||
t.Errorf("Expected context %v, got %v", "modify", actContext)
|
||||
}
|
||||
if act.Clusters[actContext].Server != "http://localhost:8081" {
|
||||
t.Errorf("Expected server %v, got %v", "http://localhost:8081", act.Clusters[actContext].Server)
|
||||
}
|
||||
if act.Contexts[actContext].Namespace != "foobar" {
|
||||
t.Errorf("Expected namespace %v, got %v", "foobar", act.Contexts[actContext].Namespace)
|
||||
}
|
||||
|
||||
// ensure context "clean" was not touched
|
||||
if act.Clusters["clean"].Server != config.Clusters["clean"].Server {
|
||||
t.Errorf("Expected server %v, got %v", config.Clusters["clean"].Server, act.Clusters["clean"].Server)
|
||||
}
|
||||
if act.Contexts["clean"].Namespace != config.Contexts["clean"].Namespace {
|
||||
t.Errorf("Expected namespace %v, got %v", config.Contexts["clean"].Namespace, act.Contexts["clean"].Namespace)
|
||||
}
|
||||
}
|
||||
|
||||
@@ -404,7 +404,15 @@ type eventBroadcasterAdapterImpl struct {
|
||||
|
||||
// NewEventBroadcasterAdapter creates a wrapper around new and legacy broadcasters to simplify
|
||||
// migration of individual components to the new Event API.
|
||||
//
|
||||
//logcheck:context // NewEventBroadcasterAdapterWithContext should be used instead because record.NewBroadcaster is called and works better when a context is supplied (contextual logging, cancellation).
|
||||
func NewEventBroadcasterAdapter(client clientset.Interface) EventBroadcasterAdapter {
|
||||
return NewEventBroadcasterAdapterWithContext(context.Background(), client)
|
||||
}
|
||||
|
||||
// NewEventBroadcasterAdapterWithContext creates a wrapper around new and legacy broadcasters to simplify
|
||||
// migration of individual components to the new Event API.
|
||||
func NewEventBroadcasterAdapterWithContext(ctx context.Context, client clientset.Interface) EventBroadcasterAdapter {
|
||||
eventClient := &eventBroadcasterAdapterImpl{}
|
||||
if _, err := client.Discovery().ServerResourcesForGroupVersion(eventsv1.SchemeGroupVersion.String()); err == nil {
|
||||
eventClient.eventsv1Client = client.EventsV1()
|
||||
@@ -414,7 +422,7 @@ func NewEventBroadcasterAdapter(client clientset.Interface) EventBroadcasterAdap
|
||||
// we create it unconditionally because its overhead is minor and will simplify using usage
|
||||
// patterns of this library in all components.
|
||||
eventClient.coreClient = client.CoreV1()
|
||||
eventClient.coreBroadcaster = record.NewBroadcaster()
|
||||
eventClient.coreBroadcaster = record.NewBroadcaster(record.WithContext(ctx))
|
||||
return eventClient
|
||||
}
|
||||
|
||||
|
||||
@@ -325,7 +325,22 @@ func (le *LeaderElector) tryAcquireOrRenew(ctx context.Context) bool {
|
||||
AcquireTime: now,
|
||||
}
|
||||
|
||||
// 1. obtain or create the ElectionRecord
|
||||
// 1. fast path for the leader to update optimistically assuming that the record observed
|
||||
// last time is the current version.
|
||||
if le.IsLeader() && le.isLeaseValid(now.Time) {
|
||||
oldObservedRecord := le.getObservedRecord()
|
||||
leaderElectionRecord.AcquireTime = oldObservedRecord.AcquireTime
|
||||
leaderElectionRecord.LeaderTransitions = oldObservedRecord.LeaderTransitions
|
||||
|
||||
err := le.config.Lock.Update(ctx, leaderElectionRecord)
|
||||
if err == nil {
|
||||
le.setObservedRecord(&leaderElectionRecord)
|
||||
return true
|
||||
}
|
||||
klog.Errorf("Failed to update lock optimitically: %v, falling back to slow path", err)
|
||||
}
|
||||
|
||||
// 2. obtain or create the ElectionRecord
|
||||
oldLeaderElectionRecord, oldLeaderElectionRawRecord, err := le.config.Lock.Get(ctx)
|
||||
if err != nil {
|
||||
if !errors.IsNotFound(err) {
|
||||
@@ -342,24 +357,23 @@ func (le *LeaderElector) tryAcquireOrRenew(ctx context.Context) bool {
|
||||
return true
|
||||
}
|
||||
|
||||
// 2. Record obtained, check the Identity & Time
|
||||
// 3. Record obtained, check the Identity & Time
|
||||
if !bytes.Equal(le.observedRawRecord, oldLeaderElectionRawRecord) {
|
||||
le.setObservedRecord(oldLeaderElectionRecord)
|
||||
|
||||
le.observedRawRecord = oldLeaderElectionRawRecord
|
||||
}
|
||||
if len(oldLeaderElectionRecord.HolderIdentity) > 0 &&
|
||||
le.observedTime.Add(time.Second*time.Duration(oldLeaderElectionRecord.LeaseDurationSeconds)).After(now.Time) &&
|
||||
!le.IsLeader() {
|
||||
if len(oldLeaderElectionRecord.HolderIdentity) > 0 && le.isLeaseValid(now.Time) && !le.IsLeader() {
|
||||
klog.V(4).Infof("lock is held by %v and has not yet expired", oldLeaderElectionRecord.HolderIdentity)
|
||||
return false
|
||||
}
|
||||
|
||||
// 3. We're going to try to update. The leaderElectionRecord is set to it's default
|
||||
// 4. We're going to try to update. The leaderElectionRecord is set to it's default
|
||||
// here. Let's correct it before updating.
|
||||
if le.IsLeader() {
|
||||
leaderElectionRecord.AcquireTime = oldLeaderElectionRecord.AcquireTime
|
||||
leaderElectionRecord.LeaderTransitions = oldLeaderElectionRecord.LeaderTransitions
|
||||
le.metrics.slowpathExercised(le.config.Name)
|
||||
} else {
|
||||
leaderElectionRecord.LeaderTransitions = oldLeaderElectionRecord.LeaderTransitions + 1
|
||||
}
|
||||
@@ -400,6 +414,10 @@ func (le *LeaderElector) Check(maxTolerableExpiredLease time.Duration) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
func (le *LeaderElector) isLeaseValid(now time.Time) bool {
|
||||
return le.observedTime.Add(time.Second * time.Duration(le.getObservedRecord().LeaseDurationSeconds)).After(now)
|
||||
}
|
||||
|
||||
// setObservedRecord will set a new observedRecord and update observedTime to the current time.
|
||||
// Protect critical sections with lock.
|
||||
func (le *LeaderElector) setObservedRecord(observedRecord *rl.LeaderElectionRecord) {
|
||||
|
||||
@@ -315,6 +315,7 @@ func testTryAcquireOrRenew(t *testing.T, objectType string) {
|
||||
observedRawRecord: observedRawRecord,
|
||||
observedTime: test.observedTime,
|
||||
clock: clock,
|
||||
metrics: globalMetricsFactory.newLeaderMetrics(),
|
||||
}
|
||||
if test.expectSuccess != le.tryAcquireOrRenew(context.Background()) {
|
||||
if test.retryAfter != 0 {
|
||||
@@ -491,6 +492,7 @@ func testReleaseLease(t *testing.T, objectType string) {
|
||||
observedRawRecord: observedRawRecord,
|
||||
observedTime: test.observedTime,
|
||||
clock: clock.RealClock{},
|
||||
metrics: globalMetricsFactory.newLeaderMetrics(),
|
||||
}
|
||||
if !le.tryAcquireOrRenew(context.Background()) {
|
||||
t.Errorf("unexpected result of tryAcquireOrRenew: [succeeded=%v]", true)
|
||||
@@ -610,14 +612,18 @@ func testReleaseOnCancellation(t *testing.T, objectType string) {
|
||||
objectType: objectType,
|
||||
reaction: func(action fakeclient.Action) (handled bool, ret runtime.Object, err error) {
|
||||
updates++
|
||||
// Skip initial two fast path renews
|
||||
if updates%2 == 1 && updates < 5 {
|
||||
return true, nil, context.Canceled
|
||||
}
|
||||
|
||||
// Second update (first renew) should return our canceled error
|
||||
// FakeClient doesn't do anything with the context so we're doing this ourselves
|
||||
if updates == 2 {
|
||||
if updates == 4 {
|
||||
close(onRenewCalled)
|
||||
<-onRenewResume
|
||||
return true, nil, context.Canceled
|
||||
} else if updates == 3 {
|
||||
} else if updates == 5 {
|
||||
// We update the lock after the cancellation to release it
|
||||
// This wg is to avoid the data race on lockObj
|
||||
defer wg.Done()
|
||||
@@ -668,8 +674,12 @@ func testReleaseOnCancellation(t *testing.T, objectType string) {
|
||||
objectType: objectType,
|
||||
reaction: func(action fakeclient.Action) (handled bool, ret runtime.Object, err error) {
|
||||
updates++
|
||||
// Always skip fast path renew
|
||||
if updates%2 == 1 {
|
||||
return true, nil, context.Canceled
|
||||
}
|
||||
// Second update (first renew) should release the lock
|
||||
if updates == 2 {
|
||||
if updates == 4 {
|
||||
// We update the lock after the cancellation to release it
|
||||
// This wg is to avoid the data race on lockObj
|
||||
defer wg.Done()
|
||||
@@ -813,3 +823,156 @@ func assertEqualEvents(t *testing.T, expected []string, actual <-chan string) {
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func TestFastPathLeaderElection(t *testing.T) {
|
||||
objectType := "leases"
|
||||
var (
|
||||
lockObj runtime.Object
|
||||
updates int
|
||||
lockOps []string
|
||||
cancelFunc func()
|
||||
)
|
||||
resetVars := func() {
|
||||
lockObj = nil
|
||||
updates = 0
|
||||
lockOps = []string{}
|
||||
cancelFunc = nil
|
||||
}
|
||||
lec := LeaderElectionConfig{
|
||||
LeaseDuration: 15 * time.Second,
|
||||
RenewDeadline: 2 * time.Second,
|
||||
RetryPeriod: 1 * time.Second,
|
||||
|
||||
Callbacks: LeaderCallbacks{
|
||||
OnNewLeader: func(identity string) {},
|
||||
OnStoppedLeading: func() {},
|
||||
OnStartedLeading: func(context.Context) {
|
||||
},
|
||||
},
|
||||
}
|
||||
|
||||
tests := []struct {
|
||||
name string
|
||||
reactors []Reactor
|
||||
expectedLockOps []string
|
||||
}{
|
||||
{
|
||||
name: "Exercise fast path after lock acquired",
|
||||
reactors: []Reactor{
|
||||
{
|
||||
verb: "get",
|
||||
objectType: objectType,
|
||||
reaction: func(action fakeclient.Action) (handled bool, ret runtime.Object, err error) {
|
||||
lockOps = append(lockOps, "get")
|
||||
if lockObj != nil {
|
||||
return true, lockObj, nil
|
||||
}
|
||||
return true, nil, errors.NewNotFound(action.(fakeclient.GetAction).GetResource().GroupResource(), action.(fakeclient.GetAction).GetName())
|
||||
},
|
||||
},
|
||||
{
|
||||
verb: "create",
|
||||
objectType: objectType,
|
||||
reaction: func(action fakeclient.Action) (handled bool, ret runtime.Object, err error) {
|
||||
lockOps = append(lockOps, "create")
|
||||
lockObj = action.(fakeclient.CreateAction).GetObject()
|
||||
return true, lockObj, nil
|
||||
},
|
||||
},
|
||||
{
|
||||
verb: "update",
|
||||
objectType: objectType,
|
||||
reaction: func(action fakeclient.Action) (handled bool, ret runtime.Object, err error) {
|
||||
updates++
|
||||
lockOps = append(lockOps, "update")
|
||||
if updates == 2 {
|
||||
cancelFunc()
|
||||
}
|
||||
lockObj = action.(fakeclient.UpdateAction).GetObject()
|
||||
return true, lockObj, nil
|
||||
},
|
||||
},
|
||||
},
|
||||
expectedLockOps: []string{"get", "create", "update", "update"},
|
||||
},
|
||||
{
|
||||
name: "Fallback to slow path after fast path fails",
|
||||
reactors: []Reactor{
|
||||
{
|
||||
verb: "get",
|
||||
objectType: objectType,
|
||||
reaction: func(action fakeclient.Action) (handled bool, ret runtime.Object, err error) {
|
||||
lockOps = append(lockOps, "get")
|
||||
if lockObj != nil {
|
||||
return true, lockObj, nil
|
||||
}
|
||||
return true, nil, errors.NewNotFound(action.(fakeclient.GetAction).GetResource().GroupResource(), action.(fakeclient.GetAction).GetName())
|
||||
},
|
||||
},
|
||||
{
|
||||
verb: "create",
|
||||
objectType: objectType,
|
||||
reaction: func(action fakeclient.Action) (handled bool, ret runtime.Object, err error) {
|
||||
lockOps = append(lockOps, "create")
|
||||
lockObj = action.(fakeclient.CreateAction).GetObject()
|
||||
return true, lockObj, nil
|
||||
},
|
||||
},
|
||||
{
|
||||
verb: "update",
|
||||
objectType: objectType,
|
||||
reaction: func(action fakeclient.Action) (handled bool, ret runtime.Object, err error) {
|
||||
updates++
|
||||
lockOps = append(lockOps, "update")
|
||||
switch updates {
|
||||
case 2:
|
||||
return true, nil, errors.NewConflict(action.(fakeclient.UpdateAction).GetResource().GroupResource(), "fake conflict", nil)
|
||||
case 4:
|
||||
cancelFunc()
|
||||
}
|
||||
lockObj = action.(fakeclient.UpdateAction).GetObject()
|
||||
return true, lockObj, nil
|
||||
},
|
||||
},
|
||||
},
|
||||
expectedLockOps: []string{"get", "create", "update", "update", "get", "update", "update"},
|
||||
},
|
||||
}
|
||||
|
||||
for i := range tests {
|
||||
test := &tests[i]
|
||||
t.Run(test.name, func(t *testing.T) {
|
||||
resetVars()
|
||||
|
||||
recorder := record.NewFakeRecorder(100)
|
||||
resourceLockConfig := rl.ResourceLockConfig{
|
||||
Identity: "baz",
|
||||
EventRecorder: recorder,
|
||||
}
|
||||
c := &fake.Clientset{}
|
||||
for _, reactor := range test.reactors {
|
||||
c.AddReactor(reactor.verb, objectType, reactor.reaction)
|
||||
}
|
||||
c.AddReactor("*", "*", func(action fakeclient.Action) (bool, runtime.Object, error) {
|
||||
t.Errorf("unreachable action. testclient called too many times: %+v", action)
|
||||
return true, nil, fmt.Errorf("unreachable action")
|
||||
})
|
||||
lock, err := rl.New("leases", "foo", "bar", c.CoreV1(), c.CoordinationV1(), resourceLockConfig)
|
||||
if err != nil {
|
||||
t.Fatal("resourcelock.New() = ", err)
|
||||
}
|
||||
|
||||
lec.Lock = lock
|
||||
elector, err := NewLeaderElector(lec)
|
||||
if err != nil {
|
||||
t.Fatal("Failed to create leader elector: ", err)
|
||||
}
|
||||
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
cancelFunc = cancel
|
||||
|
||||
elector.Run(ctx)
|
||||
assert.Equal(t, test.expectedLockOps, lockOps, "Expected lock ops %q, got %q", test.expectedLockOps, lockOps)
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
@@ -26,24 +26,26 @@ import (
|
||||
type leaderMetricsAdapter interface {
|
||||
leaderOn(name string)
|
||||
leaderOff(name string)
|
||||
slowpathExercised(name string)
|
||||
}
|
||||
|
||||
// GaugeMetric represents a single numerical value that can arbitrarily go up
|
||||
// and down.
|
||||
type SwitchMetric interface {
|
||||
// LeaderMetric instruments metrics used in leader election.
|
||||
type LeaderMetric interface {
|
||||
On(name string)
|
||||
Off(name string)
|
||||
SlowpathExercised(name string)
|
||||
}
|
||||
|
||||
type noopMetric struct{}
|
||||
|
||||
func (noopMetric) On(name string) {}
|
||||
func (noopMetric) Off(name string) {}
|
||||
func (noopMetric) On(name string) {}
|
||||
func (noopMetric) Off(name string) {}
|
||||
func (noopMetric) SlowpathExercised(name string) {}
|
||||
|
||||
// defaultLeaderMetrics expects the caller to lock before setting any metrics.
|
||||
type defaultLeaderMetrics struct {
|
||||
// leader's value indicates if the current process is the owner of name lease
|
||||
leader SwitchMetric
|
||||
leader LeaderMetric
|
||||
}
|
||||
|
||||
func (m *defaultLeaderMetrics) leaderOn(name string) {
|
||||
@@ -60,19 +62,27 @@ func (m *defaultLeaderMetrics) leaderOff(name string) {
|
||||
m.leader.Off(name)
|
||||
}
|
||||
|
||||
func (m *defaultLeaderMetrics) slowpathExercised(name string) {
|
||||
if m == nil {
|
||||
return
|
||||
}
|
||||
m.leader.SlowpathExercised(name)
|
||||
}
|
||||
|
||||
type noMetrics struct{}
|
||||
|
||||
func (noMetrics) leaderOn(name string) {}
|
||||
func (noMetrics) leaderOff(name string) {}
|
||||
func (noMetrics) leaderOn(name string) {}
|
||||
func (noMetrics) leaderOff(name string) {}
|
||||
func (noMetrics) slowpathExercised(name string) {}
|
||||
|
||||
// MetricsProvider generates various metrics used by the leader election.
|
||||
type MetricsProvider interface {
|
||||
NewLeaderMetric() SwitchMetric
|
||||
NewLeaderMetric() LeaderMetric
|
||||
}
|
||||
|
||||
type noopMetricsProvider struct{}
|
||||
|
||||
func (_ noopMetricsProvider) NewLeaderMetric() SwitchMetric {
|
||||
func (noopMetricsProvider) NewLeaderMetric() LeaderMetric {
|
||||
return noopMetric{}
|
||||
}
|
||||
|
||||
|
||||
@@ -198,16 +198,29 @@ func NewBroadcaster(opts ...BroadcasterOption) EventBroadcaster {
|
||||
ctx := c.Context
|
||||
if ctx == nil {
|
||||
ctx = context.Background()
|
||||
} else {
|
||||
}
|
||||
// The are two scenarios where it makes no sense to wait for context cancelation:
|
||||
// - The context was nil.
|
||||
// - The context was context.Background() to begin with.
|
||||
//
|
||||
// Both cases get checked here.
|
||||
haveCtxCancelation := ctx.Done() == nil
|
||||
|
||||
eventBroadcaster.cancelationCtx, eventBroadcaster.cancel = context.WithCancel(ctx)
|
||||
|
||||
if haveCtxCancelation {
|
||||
// Calling Shutdown is not required when a context was provided:
|
||||
// when the context is canceled, this goroutine will shut down
|
||||
// the broadcaster.
|
||||
//
|
||||
// If Shutdown is called first, then this goroutine will
|
||||
// also stop.
|
||||
go func() {
|
||||
<-ctx.Done()
|
||||
<-eventBroadcaster.cancelationCtx.Done()
|
||||
eventBroadcaster.Broadcaster.Shutdown()
|
||||
}()
|
||||
}
|
||||
eventBroadcaster.cancelationCtx, eventBroadcaster.cancel = context.WithCancel(ctx)
|
||||
|
||||
return eventBroadcaster
|
||||
}
|
||||
|
||||
|
||||
@@ -23,7 +23,6 @@ import (
|
||||
|
||||
"k8s.io/utils/clock"
|
||||
testingclock "k8s.io/utils/clock/testing"
|
||||
"k8s.io/utils/integer"
|
||||
)
|
||||
|
||||
type backoffEntry struct {
|
||||
@@ -100,7 +99,7 @@ func (p *Backoff) Next(id string, eventTime time.Time) {
|
||||
} else {
|
||||
delay := entry.backoff * 2 // exponential
|
||||
delay += p.jitter(entry.backoff) // add some jitter to the delay
|
||||
entry.backoff = time.Duration(integer.Int64Min(int64(delay), int64(p.maxDuration)))
|
||||
entry.backoff = min(delay, p.maxDuration)
|
||||
}
|
||||
entry.lastUpdate = p.Clock.Now()
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user