diff --git a/staging/src/k8s.io/apiserver/pkg/storage/etcd3/feature/feature_support_checker.go b/staging/src/k8s.io/apiserver/pkg/storage/etcd3/feature/feature_support_checker.go new file mode 100644 index 00000000000..ddf77b8e8b0 --- /dev/null +++ b/staging/src/k8s.io/apiserver/pkg/storage/etcd3/feature/feature_support_checker.go @@ -0,0 +1,155 @@ +/* +Copyright 2024 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package feature + +import ( + "context" + "fmt" + "sync" + + clientv3 "go.etcd.io/etcd/client/v3" + "k8s.io/apimachinery/pkg/util/version" + "k8s.io/apiserver/pkg/storage" + "k8s.io/klog/v2" + "k8s.io/utils/ptr" +) + +var ( + // Define these static versions to use for checking version of etcd, issue on kubernetes #123192 + v3_4_31 = version.MustParseSemantic("3.4.31") + v3_5_0 = version.MustParseSemantic("3.5.0") + v3_5_13 = version.MustParseSemantic("3.5.13") + + // DefaultFeatureSupportChecker is a shared global etcd FeatureSupportChecker. + DefaultFeatureSupportChecker FeatureSupportChecker = newDefaultFeatureSupportChecker() +) + +// FeatureSupportChecker to define Supports functions. +type FeatureSupportChecker interface { + // Supports check if the feature is supported or not by checking internal cache. + // By default all calls to this function before calling CheckClient returns false. + // Returns true if all endpoints in etcd clients are supporting the feature. + Supports(feature storage.Feature) (bool, error) + // CheckClient works with etcd client to recalcualte feature support and cache it internally. + // All etcd clients should support feature to cause `Supports` return true. + // If client A supports and client B doesn't support the feature, the `Supports` will + // first return true at client A initializtion and then return false on client B + // initialzation, it can flip the support at runtime. + CheckClient(ctx context.Context, c client, feature storage.Feature) error +} + +type defaultFeatureSupportChecker struct { + lock sync.Mutex + progressNotifySupported *bool + progresNotifyEndpointCache map[string]bool +} + +func newDefaultFeatureSupportChecker() *defaultFeatureSupportChecker { + return &defaultFeatureSupportChecker{ + progresNotifyEndpointCache: make(map[string]bool), + } +} + +// Supports can check the featue from anywhere without storage if it was cached before. +func (f *defaultFeatureSupportChecker) Supports(feature storage.Feature) (bool, error) { + switch feature { + case storage.RequestWatchProgress: + f.lock.Lock() + defer f.lock.Unlock() + + return ptr.Deref(f.progressNotifySupported, false), nil + default: + return false, fmt.Errorf("feature %q is not implemented in DefaultFeatureSupportChecker", feature) + } +} + +// CheckClient accepts client and calculate the support per endpoint and caches it. +// It will return at any point if error happens or one endpoint is not supported. +func (f *defaultFeatureSupportChecker) CheckClient(ctx context.Context, c client, feature storage.Feature) error { + switch feature { + case storage.RequestWatchProgress: + return f.clientSupportsRequestWatchProgress(ctx, c) + default: + return fmt.Errorf("feature %q is not implemented in DefaultFeatureSupportChecker", feature) + + } +} + +func (f *defaultFeatureSupportChecker) clientSupportsRequestWatchProgress(ctx context.Context, c client) error { + f.lock.Lock() + defer f.lock.Unlock() + + for _, ep := range c.Endpoints() { + supported, err := f.supportsProgressNotifyEndpointLocked(ctx, c, ep) + if err != nil { + return err + } + if !supported { + f.progressNotifySupported = ptr.To(false) + return nil + } + } + if f.progressNotifySupported == nil && len(c.Endpoints()) > 0 { + f.progressNotifySupported = ptr.To(true) + } + return nil +} + +func (f *defaultFeatureSupportChecker) supportsProgressNotifyEndpointLocked(ctx context.Context, c client, ep string) (bool, error) { + if supported, ok := f.progresNotifyEndpointCache[ep]; ok { + return supported, nil + } + + supported, err := endpointSupportsRequestWatchProgress(ctx, c, ep) + if err != nil { + return false, err + } + + f.progresNotifyEndpointCache[ep] = supported + return supported, nil +} + +// Sub interface of etcd client. +type client interface { + // Endpoints returns list of endpoints in etcd client. + Endpoints() []string + // Status retrieves the status information from the etcd client connected to the specified endpoint. + // It takes a context.Context parameter for cancellation or timeout control. + // It returns a clientv3.StatusResponse containing the status information or an error if the operation fails. + Status(ctx context.Context, endpoint string) (*clientv3.StatusResponse, error) +} + +// endpointSupportsRequestWatchProgress evaluates whether RequestWatchProgress supported by current version of etcd endpoint. +// Based on this issues: +// - https://github.com/etcd-io/etcd/issues/15220 - Fixed in etcd v3.4.25+ and v3.5.8+ +// - https://github.com/etcd-io/etcd/issues/17507 - Fixed in etcd v3.4.31+ and v3.5.13+ +func endpointSupportsRequestWatchProgress(ctx context.Context, c client, endpoint string) (bool, error) { + resp, err := c.Status(ctx, endpoint) + if err != nil { + return false, fmt.Errorf("failed checking etcd version, endpoint: %q: %w", endpoint, err) + } + ver, err := version.ParseSemantic(resp.Version) + if err != nil { + // Assume feature is not supported if etcd version cannot be parsed. + klog.ErrorS(err, "Failed to parse etcd version", "version", resp.Version) + return false, nil + } + if ver.LessThan(v3_4_31) || ver.AtLeast(v3_5_0) && ver.LessThan(v3_5_13) { + return false, nil + } + return true, nil +} diff --git a/staging/src/k8s.io/apiserver/pkg/storage/etcd3/feature/feature_support_checker_test.go b/staging/src/k8s.io/apiserver/pkg/storage/etcd3/feature/feature_support_checker_test.go new file mode 100644 index 00000000000..c800dfd5733 --- /dev/null +++ b/staging/src/k8s.io/apiserver/pkg/storage/etcd3/feature/feature_support_checker_test.go @@ -0,0 +1,272 @@ +/* +Copyright 2024 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package feature + +import ( + "context" + "fmt" + "testing" + + "github.com/stretchr/testify/assert" + clientv3 "go.etcd.io/etcd/client/v3" + "k8s.io/apiserver/pkg/storage" +) + +type mockEndpointVersion struct { + Endpoint string + Version string + Error error +} + +// MockEtcdClient is a mock implementation of the EtcdClientInterface interface. +type MockEtcdClient struct { + EndpointVersion []mockEndpointVersion +} + +func (m MockEtcdClient) getEndpoints() []string { + var endpoints []string + for _, ev := range m.EndpointVersion { + endpoints = append(endpoints, ev.Endpoint) + } + return endpoints +} + +func (m MockEtcdClient) getVersion(endpoint string) (string, error) { + for _, ev := range m.EndpointVersion { + if ev.Endpoint == endpoint { + return ev.Version, ev.Error + } + } + // Never should happen, unless tests having a problem. + return "", fmt.Errorf("No version found") +} + +func (m *MockEtcdClient) Endpoints() []string { + return m.getEndpoints() +} + +// Status returns a mock status response. +func (m *MockEtcdClient) Status(ctx context.Context, endpoint string) (*clientv3.StatusResponse, error) { + version, err := m.getVersion(endpoint) + if err != nil { + return nil, err + } + // Return a mock status response + return &clientv3.StatusResponse{ + Version: version, + }, nil +} + +func TestSupports(t *testing.T) { + tests := []struct { + testName string + featureName string + expectedResult bool + expectedError error + }{ + { + testName: "Error with unknown feature", + featureName: "some unknown feature", + expectedError: fmt.Errorf("feature %q is not implemented in DefaultFeatureSupportChecker", "some unknown feature"), + }, + { + testName: "Error with empty feature", + featureName: "", + expectedError: fmt.Errorf("feature %q is not implemented in DefaultFeatureSupportChecker", ""), + }, + { + testName: "No error but disabled by default", + featureName: storage.RequestWatchProgress, + expectedResult: false, + }, + } + + for _, tt := range tests { + t.Run(tt.testName, func(t *testing.T) { + var testFeatureSupportChecker FeatureSupportChecker = newDefaultFeatureSupportChecker() + + supported, err := testFeatureSupportChecker.Supports(tt.featureName) + + assert.Equal(t, tt.expectedResult, supported) + assert.Equal(t, tt.expectedError, err) + }) + } +} + +func TestSupportsRequestWatchProgress(t *testing.T) { + type testCase struct { + endpointsVersion []mockEndpointVersion + expectedResult bool + expectedError error + } + tests := []struct { + testName string + rounds []testCase + }{ + { + testName: "Disabled - default disabled", + rounds: []testCase{{expectedResult: false}}, + }, + { + testName: "Enabled - supported versions bound", + rounds: []testCase{ + {endpointsVersion: []mockEndpointVersion{ + {Version: "3.4.31", Endpoint: "localhost:2390"}}, + expectedResult: true, + }, + {endpointsVersion: []mockEndpointVersion{ + {Version: "3.5.13", Endpoint: "localhost:2391"}}, + expectedResult: true, + }, + {endpointsVersion: []mockEndpointVersion{ + {Version: "3.6.0", Endpoint: "localhost:2392"}}, + expectedResult: true}}, + }, + { + testName: "Disabled - supported versions bound, 3.4.30", + rounds: []testCase{ + {endpointsVersion: []mockEndpointVersion{ + {Version: "3.4.30", Endpoint: "localhost:2390"}}, + expectedResult: false}}, + }, + { + testName: "Disabled - supported versions bound, 3.5.0", + rounds: []testCase{ + {endpointsVersion: []mockEndpointVersion{ + {Version: "3.5.0", Endpoint: "localhost:2390"}}, + expectedResult: false}}, + }, + { + testName: "Disabled - supported versions bound, 3.5.12", + rounds: []testCase{ + {endpointsVersion: []mockEndpointVersion{ + {Version: "3.5.12", Endpoint: "localhost:2390"}}, + expectedResult: false}}, + }, + { + testName: "Disabled - disables if called with one client doesn't support it", + rounds: []testCase{ + {endpointsVersion: []mockEndpointVersion{ + {Version: "3.5.13", Endpoint: "localhost:2390"}, + {Version: "3.5.10", Endpoint: "localhost:2391"}}, + expectedResult: false}}, + }, + { + testName: "Disabled - disables if called with all client doesn't support it", + rounds: []testCase{ + {endpointsVersion: []mockEndpointVersion{ + {Version: "3.5.9", Endpoint: "localhost:2390"}, + {Version: "3.5.10", Endpoint: "localhost:2391"}}, + expectedResult: false}}, + }, + { + testName: "Enabled - if provided client has at least one endpoint that supports it and no client that doesn't", + rounds: []testCase{ + {endpointsVersion: []mockEndpointVersion{ + {Version: "3.4.31", Endpoint: "localhost:2390"}, + {Version: "3.5.13", Endpoint: "localhost:2391"}, + {Version: "3.5.14", Endpoint: "localhost:2392"}, + {Version: "3.6.0", Endpoint: "localhost:2393"}}, + expectedResult: true}}, + }, + { + testName: "Disabled - cannot be re-enabled", + rounds: []testCase{ + {endpointsVersion: []mockEndpointVersion{ + {Version: "3.4.0", Endpoint: "localhost:2390"}, + {Version: "3.4.1", Endpoint: "localhost:2391"}}, + expectedResult: false}, + {endpointsVersion: []mockEndpointVersion{ + {Version: "3.6.0", Endpoint: "localhost:2392"}}, + expectedResult: false}}, + }, + { + testName: "Enabled - one client supports it and later disabled it with second client", + rounds: []testCase{ + {endpointsVersion: []mockEndpointVersion{ + {Version: "3.6.0", Endpoint: "localhost:2390"}, + {Version: "3.5.14", Endpoint: "localhost:2391"}}, + expectedResult: true}, + {endpointsVersion: []mockEndpointVersion{ + {Version: "3.4.0", Endpoint: "localhost:2392"}}, + expectedResult: false}}, + }, + { + testName: "Disabled - malformed version would disable the supported cluster and can not be re-enabled again", + rounds: []testCase{ + {endpointsVersion: []mockEndpointVersion{ + {Version: "3.6.0", Endpoint: "localhost:2390"}}, + expectedResult: true, + }, + {endpointsVersion: []mockEndpointVersion{ + {Version: "3.4.--aaa", Endpoint: "localhost:2392"}}, + expectedResult: false}, + {endpointsVersion: []mockEndpointVersion{ + {Version: "3.5.13", Endpoint: "localhost:2393"}}, + expectedResult: false}}, + }, + { + testName: "Enabled - error on first client, enabled success on second client", + rounds: []testCase{ + {endpointsVersion: []mockEndpointVersion{ + {Version: "3.6.0", Endpoint: "localhost:2390", Error: fmt.Errorf("some error")}}, + expectedResult: false, + expectedError: fmt.Errorf("failed checking etcd version, endpoint: %q: %w", "localhost:2390", fmt.Errorf("some error")), + }, + {endpointsVersion: []mockEndpointVersion{ + {Version: "3.5.14", Endpoint: "localhost:2391"}}, + expectedResult: true}}, + }, + { + testName: "Disabled - enabled success on first client, error on second client, disabled success on third client", + rounds: []testCase{ + {endpointsVersion: []mockEndpointVersion{ + {Version: "3.6.0", Endpoint: "localhost:2390"}}, + expectedResult: true, + }, + {endpointsVersion: []mockEndpointVersion{ + {Version: "3.6.0", Endpoint: "localhost:2391", Error: fmt.Errorf("some error")}}, + expectedResult: true, + expectedError: fmt.Errorf("failed checking etcd version, endpoint: %q: %w", "localhost:2391", fmt.Errorf("some error")), + }, + {endpointsVersion: []mockEndpointVersion{ + {Version: "3.5.10", Endpoint: "localhost:2392"}}, + expectedResult: false}}, + }, + { + testName: "Disabled - client doesn't have any endpoints", + rounds: []testCase{{endpointsVersion: []mockEndpointVersion{}, expectedResult: false}}, + }, + } + for _, tt := range tests { + t.Run(tt.testName, func(t *testing.T) { + var testFeatureSupportChecker FeatureSupportChecker = newDefaultFeatureSupportChecker() + for _, round := range tt.rounds { + // Mock Etcd client + mockClient := &MockEtcdClient{EndpointVersion: round.endpointsVersion} + ctx := context.Background() + + err := testFeatureSupportChecker.CheckClient(ctx, mockClient, storage.RequestWatchProgress) + assert.Equal(t, err, round.expectedError) + + // Error of Supports already tested in TestSupports. + supported, _ := testFeatureSupportChecker.Supports(storage.RequestWatchProgress) + assert.Equal(t, supported, round.expectedResult) + } + }) + } +} diff --git a/staging/src/k8s.io/apiserver/pkg/storage/interfaces.go b/staging/src/k8s.io/apiserver/pkg/storage/interfaces.go index 5489660809d..cc891c6b90f 100644 --- a/staging/src/k8s.io/apiserver/pkg/storage/interfaces.go +++ b/staging/src/k8s.io/apiserver/pkg/storage/interfaces.go @@ -29,6 +29,12 @@ import ( "k8s.io/apimachinery/pkg/watch" ) +// Feature is the name of each feature in storage that we check in feature_support_checker. +type Feature = string + +// RequestWatchProgress is an etcd feature that may use to check if it supported or not. +var RequestWatchProgress Feature = "RequestWatchProgress" + // Versioner abstracts setting and retrieving metadata fields from database response // onto the object ot list. It is required to maintain storage invariants - updating an // object twice with the same data except for the ResourceVersion and SelfLink must be