diff --git a/pkg/registry/core/componentstatus/rest.go b/pkg/registry/core/componentstatus/rest.go index dc940a42283..204d1cee22e 100644 --- a/pkg/registry/core/componentstatus/rest.go +++ b/pkg/registry/core/componentstatus/rest.go @@ -38,12 +38,12 @@ import ( ) type REST struct { - GetServersToValidate func() map[string]*Server + GetServersToValidate func() map[string]Server rest.TableConvertor } // NewStorage returns a new REST. -func NewStorage(serverRetriever func() map[string]*Server) *REST { +func NewStorage(serverRetriever func() map[string]Server) *REST { return &REST{ GetServersToValidate: serverRetriever, TableConvertor: printerstorage.TableConvertor{TableGenerator: printers.NewTableGenerator().With(printersinternal.AddHandlers)}, @@ -83,7 +83,7 @@ func (rs *REST) List(ctx context.Context, options *metainternalversion.ListOptio wait.Add(len(servers)) statuses := make(chan api.ComponentStatus, len(servers)) for k, v := range servers { - go func(name string, server *Server) { + go func(name string, server Server) { defer wait.Done() status := rs.getComponentStatus(name, server) statuses <- *status @@ -153,7 +153,7 @@ func ToConditionStatus(s probe.Result) api.ConditionStatus { } } -func (rs *REST) getComponentStatus(name string, server *Server) *api.ComponentStatus { +func (rs *REST) getComponentStatus(name string, server Server) *api.ComponentStatus { status, msg, err := server.DoServerCheck() errorMsg := "" if err != nil { diff --git a/pkg/registry/core/componentstatus/rest_test.go b/pkg/registry/core/componentstatus/rest_test.go index d5fdb1ac306..a34c1586d8f 100644 --- a/pkg/registry/core/componentstatus/rest_test.go +++ b/pkg/registry/core/componentstatus/rest_test.go @@ -59,9 +59,9 @@ func NewTestREST(resp testResponse) *REST { err: resp.err, } return &REST{ - GetServersToValidate: func() map[string]*Server { - return map[string]*Server{ - "test1": {Addr: "testserver1", Port: 8000, Path: "/healthz", Prober: prober}, + GetServersToValidate: func() map[string]Server { + return map[string]Server{ + "test1": &HttpServer{Addr: "testserver1", Port: 8000, Path: "/healthz", Prober: prober}, } }, } diff --git a/pkg/registry/core/componentstatus/validator.go b/pkg/registry/core/componentstatus/validator.go index a38730dc1b5..d271bd7404f 100644 --- a/pkg/registry/core/componentstatus/validator.go +++ b/pkg/registry/core/componentstatus/validator.go @@ -17,12 +17,15 @@ limitations under the License. package componentstatus import ( + "context" "crypto/tls" "fmt" "sync" "time" utilnet "k8s.io/apimachinery/pkg/util/net" + "k8s.io/apiserver/pkg/storage/storagebackend" + "k8s.io/apiserver/pkg/storage/storagebackend/factory" "k8s.io/kubernetes/pkg/probe" httpprober "k8s.io/kubernetes/pkg/probe/http" ) @@ -33,7 +36,11 @@ const ( type ValidatorFn func([]byte) error -type Server struct { +type Server interface { + DoServerCheck() (probe.Result, string, error) +} + +type HttpServer struct { Addr string Port int Path string @@ -57,7 +64,7 @@ type ServerStatus struct { Err string `json:"err,omitempty"` } -func (server *Server) DoServerCheck() (probe.Result, string, error) { +func (server *HttpServer) DoServerCheck() (probe.Result, string, error) { // setup the prober server.Once.Do(func() { if server.Prober != nil { @@ -92,3 +99,23 @@ func (server *Server) DoServerCheck() (probe.Result, string, error) { } return result, data, nil } + +type EtcdServer struct { + storagebackend.Config +} + +func (server *EtcdServer) DoServerCheck() (probe.Result, string, error) { + prober, err := factory.CreateProber(server.Config) + if err != nil { + return probe.Failure, "", err + } + defer prober.Close() + + ctx, cancel := context.WithTimeout(context.Background(), probeTimeOut) + defer cancel() + err = prober.Probe(ctx) + if err != nil { + return probe.Failure, "", err + } + return probe.Success, "", err +} diff --git a/pkg/registry/core/componentstatus/validator_test.go b/pkg/registry/core/componentstatus/validator_test.go index 9ff62ba7467..b77fd369b55 100644 --- a/pkg/registry/core/componentstatus/validator_test.go +++ b/pkg/registry/core/componentstatus/validator_test.go @@ -49,7 +49,7 @@ func TestValidate(t *testing.T) { {probe.Success, "foo", nil, probe.Success, "foo", false, nil}, } - s := Server{Addr: "foo.com", Port: 8080, Path: "/healthz"} + s := HttpServer{Addr: "foo.com", Port: 8080, Path: "/healthz"} for _, test := range tests { fakeProber := &fakeHttpProber{ diff --git a/pkg/registry/core/rest/storage_core.go b/pkg/registry/core/rest/storage_core.go index dca42573995..0c959892699 100644 --- a/pkg/registry/core/rest/storage_core.go +++ b/pkg/registry/core/rest/storage_core.go @@ -21,12 +21,8 @@ import ( "fmt" "net" "net/http" - "net/url" - "strings" "time" - "k8s.io/klog/v2" - corev1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/runtime/schema" utilnet "k8s.io/apimachinery/pkg/util/net" @@ -35,7 +31,6 @@ import ( "k8s.io/apiserver/pkg/registry/rest" genericapiserver "k8s.io/apiserver/pkg/server" serverstorage "k8s.io/apiserver/pkg/server/storage" - "k8s.io/apiserver/pkg/storage/etcd3" utilfeature "k8s.io/apiserver/pkg/util/feature" "k8s.io/client-go/informers" networkingv1alpha1client "k8s.io/client-go/kubernetes/typed/networking/v1alpha1" @@ -69,7 +64,6 @@ import ( serviceaccountstore "k8s.io/kubernetes/pkg/registry/core/serviceaccount/storage" kubeschedulerconfig "k8s.io/kubernetes/pkg/scheduler/apis/config" "k8s.io/kubernetes/pkg/serviceaccount" - utilsnet "k8s.io/utils/net" ) // LegacyRESTStorageProvider provides information needed to build RESTStorage for core, but @@ -416,43 +410,16 @@ type componentStatusStorage struct { storageFactory serverstorage.StorageFactory } -func (s componentStatusStorage) serversToValidate() map[string]*componentstatus.Server { +func (s componentStatusStorage) serversToValidate() map[string]componentstatus.Server { // this is fragile, which assumes that the default port is being used // TODO: switch to secure port until these components remove the ability to serve insecurely. - serversToValidate := map[string]*componentstatus.Server{ - "controller-manager": {EnableHTTPS: true, TLSConfig: &tls.Config{InsecureSkipVerify: true}, Addr: "127.0.0.1", Port: ports.KubeControllerManagerPort, Path: "/healthz"}, - "scheduler": {EnableHTTPS: true, TLSConfig: &tls.Config{InsecureSkipVerify: true}, Addr: "127.0.0.1", Port: kubeschedulerconfig.DefaultKubeSchedulerPort, Path: "/healthz"}, + serversToValidate := map[string]componentstatus.Server{ + "controller-manager": &componentstatus.HttpServer{EnableHTTPS: true, TLSConfig: &tls.Config{InsecureSkipVerify: true}, Addr: "127.0.0.1", Port: ports.KubeControllerManagerPort, Path: "/healthz"}, + "scheduler": &componentstatus.HttpServer{EnableHTTPS: true, TLSConfig: &tls.Config{InsecureSkipVerify: true}, Addr: "127.0.0.1", Port: kubeschedulerconfig.DefaultKubeSchedulerPort, Path: "/healthz"}, } - for ix, machine := range s.storageFactory.Backends() { - etcdUrl, err := url.Parse(machine.Server) - if err != nil { - klog.Errorf("Failed to parse etcd url for validation: %v", err) - continue - } - var port int - var addr string - if strings.Contains(etcdUrl.Host, ":") { - var portString string - addr, portString, err = net.SplitHostPort(etcdUrl.Host) - if err != nil { - klog.Errorf("Failed to split host/port: %s (%v)", etcdUrl.Host, err) - continue - } - port, _ = utilsnet.ParsePort(portString, true) - } else { - addr = etcdUrl.Host - port = 2379 - } - // TODO: etcd health checking should be abstracted in the storage tier - serversToValidate[fmt.Sprintf("etcd-%d", ix)] = &componentstatus.Server{ - Addr: addr, - EnableHTTPS: etcdUrl.Scheme == "https", - TLSConfig: machine.TLSConfig, - Port: port, - Path: "/health", - Validate: etcd3.EtcdHealthCheck, - } + for ix, cfg := range s.storageFactory.Configs() { + serversToValidate[fmt.Sprintf("etcd-%d", ix)] = &componentstatus.EtcdServer{Config: cfg} } return serversToValidate } diff --git a/pkg/registry/core/rest/storage_core_test.go b/pkg/registry/core/rest/storage_core_test.go index 7e674f17aa3..411efa211dc 100644 --- a/pkg/registry/core/rest/storage_core_test.go +++ b/pkg/registry/core/rest/storage_core_test.go @@ -51,3 +51,7 @@ func (f fakeStorageFactory) ResourcePrefix(groupResource schema.GroupResource) s func (f fakeStorageFactory) Backends() []storage.Backend { return []storage.Backend{{Server: "etcd-0"}} } + +func (f fakeStorageFactory) Configs() []storagebackend.Config { + return []storagebackend.Config{{Transport: storagebackend.TransportConfig{ServerList: []string{"etcd-0"}}}} +} diff --git a/staging/src/k8s.io/apiserver/pkg/server/options/etcd.go b/staging/src/k8s.io/apiserver/pkg/server/options/etcd.go index 6aabbf255be..a3b20a4a324 100644 --- a/staging/src/k8s.io/apiserver/pkg/server/options/etcd.go +++ b/staging/src/k8s.io/apiserver/pkg/server/options/etcd.go @@ -444,6 +444,10 @@ func (s *SimpleStorageFactory) ResourcePrefix(resource schema.GroupResource) str return resource.Group + "/" + resource.Resource } +func (s *SimpleStorageFactory) Configs() []storagebackend.Config { + return serverstorage.Configs(s.StorageConfig) +} + func (s *SimpleStorageFactory) Backends() []serverstorage.Backend { // nothing should ever call this method but we still provide a functional implementation return serverstorage.Backends(s.StorageConfig) @@ -474,6 +478,10 @@ func (t *transformerStorageFactory) ResourcePrefix(resource schema.GroupResource return t.delegate.ResourcePrefix(resource) } +func (t *transformerStorageFactory) Configs() []storagebackend.Config { + return t.delegate.Configs() +} + func (t *transformerStorageFactory) Backends() []serverstorage.Backend { return t.delegate.Backends() } diff --git a/staging/src/k8s.io/apiserver/pkg/server/storage/storage_factory.go b/staging/src/k8s.io/apiserver/pkg/server/storage/storage_factory.go index 5b1c24446c7..1c32b977239 100644 --- a/staging/src/k8s.io/apiserver/pkg/server/storage/storage_factory.go +++ b/staging/src/k8s.io/apiserver/pkg/server/storage/storage_factory.go @@ -22,14 +22,13 @@ import ( "io/ioutil" "strings" - "k8s.io/klog/v2" - "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/runtime/schema" "k8s.io/apimachinery/pkg/util/sets" "k8s.io/apiserver/pkg/features" "k8s.io/apiserver/pkg/storage/storagebackend" utilfeature "k8s.io/apiserver/pkg/util/feature" + "k8s.io/klog/v2" ) // Backend describes the storage servers, the information here should be enough @@ -52,8 +51,12 @@ type StorageFactory interface { // centralized control over the shape of etcd directories ResourcePrefix(groupResource schema.GroupResource) string + // Configs gets configurations for all of registered storage destinations. + Configs() []storagebackend.Config + // Backends gets all backends for all registered storage destinations. // Used for getting all instances for health validations. + // Deprecated: Use Configs instead Backends() []Backend } @@ -276,14 +279,52 @@ func (s *DefaultStorageFactory) NewConfig(groupResource schema.GroupResource) (* return storageConfig.ForResource(groupResource), nil } -// Backends returns all backends for all registered storage destinations. -// Used for getting all instances for health validations. +// Configs implements StorageFactory. +func (s *DefaultStorageFactory) Configs() []storagebackend.Config { + return configs(s.StorageConfig, s.Overrides) +} + +// Configs gets configurations for all of registered storage destinations. +func Configs(storageConfig storagebackend.Config) []storagebackend.Config { + return configs(storageConfig, nil) +} + +// Returns all storage configurations including those for group resource overrides +func configs(storageConfig storagebackend.Config, grOverrides map[schema.GroupResource]groupResourceOverrides) []storagebackend.Config { + locations := sets.NewString() + configs := []storagebackend.Config{} + for _, loc := range storageConfig.Transport.ServerList { + // copy + newConfig := storageConfig + newConfig.Transport.ServerList = []string{loc} + configs = append(configs, newConfig) + locations.Insert(loc) + } + + for _, override := range grOverrides { + for _, loc := range override.etcdLocation { + if locations.Has(loc) { + continue + } + // copy + newConfig := storageConfig + override.Apply(&newConfig, &StorageCodecConfig{}) + newConfig.Transport.ServerList = []string{loc} + configs = append(configs, newConfig) + locations.Insert(loc) + } + } + return configs +} + +// Backends implements StorageFactory. func (s *DefaultStorageFactory) Backends() []Backend { return backends(s.StorageConfig, s.Overrides) } // Backends returns all backends for all registered storage destinations. // Used for getting all instances for health validations. +// Deprecated: Validate health by passing storagebackend.Config directly to storagefactory.CreateProber. func Backends(storageConfig storagebackend.Config) []Backend { return backends(storageConfig, nil) } diff --git a/staging/src/k8s.io/apiserver/pkg/server/storage/storage_factory_test.go b/staging/src/k8s.io/apiserver/pkg/server/storage/storage_factory_test.go index e0f07f24f6c..84f01e09aa2 100644 --- a/staging/src/k8s.io/apiserver/pkg/server/storage/storage_factory_test.go +++ b/staging/src/k8s.io/apiserver/pkg/server/storage/storage_factory_test.go @@ -185,3 +185,59 @@ func TestUpdateEtcdOverrides(t *testing.T) { } } + +func TestConfigs(t *testing.T) { + exampleinstall.Install(scheme) + defaultEtcdLocations := []string{"http://127.0.0.1", "http://127.0.0.2"} + + testCases := []struct { + resource schema.GroupResource + servers []string + wantConfigs []storagebackend.Config + }{ + { + wantConfigs: []storagebackend.Config{ + {Transport: storagebackend.TransportConfig{ServerList: []string{"http://127.0.0.1"}}, Prefix: "/registry", Paging: true}, + {Transport: storagebackend.TransportConfig{ServerList: []string{"http://127.0.0.2"}}, Prefix: "/registry", Paging: true}, + }, + }, + { + resource: schema.GroupResource{Group: example.GroupName, Resource: "resource"}, + servers: []string{"http://127.0.0.1:10000"}, + wantConfigs: []storagebackend.Config{ + {Transport: storagebackend.TransportConfig{ServerList: []string{"http://127.0.0.1"}}, Prefix: "/registry", Paging: true}, + {Transport: storagebackend.TransportConfig{ServerList: []string{"http://127.0.0.2"}}, Prefix: "/registry", Paging: true}, + {Transport: storagebackend.TransportConfig{ServerList: []string{"http://127.0.0.1:10000"}}, Prefix: "/registry", Paging: true}, + }, + }, + { + resource: schema.GroupResource{Group: example.GroupName, Resource: "resource"}, + servers: []string{"http://127.0.0.1:10000", "https://127.0.0.1", "http://127.0.0.2"}, + wantConfigs: []storagebackend.Config{ + {Transport: storagebackend.TransportConfig{ServerList: []string{"http://127.0.0.1"}}, Prefix: "/registry", Paging: true}, + {Transport: storagebackend.TransportConfig{ServerList: []string{"http://127.0.0.2"}}, Prefix: "/registry", Paging: true}, + {Transport: storagebackend.TransportConfig{ServerList: []string{"http://127.0.0.1:10000"}}, Prefix: "/registry", Paging: true}, + {Transport: storagebackend.TransportConfig{ServerList: []string{"https://127.0.0.1"}}, Prefix: "/registry", Paging: true}, + }, + }, + } + + for i, test := range testCases { + defaultConfig := storagebackend.Config{ + Prefix: "/registry", + Transport: storagebackend.TransportConfig{ + ServerList: defaultEtcdLocations, + }, + } + storageFactory := NewDefaultStorageFactory(defaultConfig, "", codecs, NewDefaultResourceEncodingConfig(scheme), NewResourceConfig(), nil) + if len(test.servers) > 0 { + storageFactory.SetEtcdLocation(test.resource, test.servers) + } + + got := storageFactory.Configs() + if !reflect.DeepEqual(test.wantConfigs, got) { + t.Errorf("%d: expected %v, got %v", i, test.wantConfigs, got) + continue + } + } +} diff --git a/staging/src/k8s.io/apiserver/pkg/storage/etcd3/healthcheck.go b/staging/src/k8s.io/apiserver/pkg/storage/etcd3/healthcheck.go index ad051d2d6cd..3d489810378 100644 --- a/staging/src/k8s.io/apiserver/pkg/storage/etcd3/healthcheck.go +++ b/staging/src/k8s.io/apiserver/pkg/storage/etcd3/healthcheck.go @@ -28,6 +28,7 @@ type etcdHealth struct { } // EtcdHealthCheck decodes data returned from etcd /healthz handler. +// Deprecated: Validate health by passing storagebackend.Config directly to storagefactory.CreateProber. func EtcdHealthCheck(data []byte) error { obj := etcdHealth{} if err := json.Unmarshal(data, &obj); err != nil { diff --git a/staging/src/k8s.io/apiserver/pkg/storage/storagebackend/factory/etcd3.go b/staging/src/k8s.io/apiserver/pkg/storage/storagebackend/factory/etcd3.go index c1785964956..64bcabadb97 100644 --- a/staging/src/k8s.io/apiserver/pkg/storage/storagebackend/factory/etcd3.go +++ b/staging/src/k8s.io/apiserver/pkg/storage/storagebackend/factory/etcd3.go @@ -153,18 +153,18 @@ func newETCD3Check(c storagebackend.Config, timeout time.Duration, stopCh <-chan // retry in a loop in the background until we successfully create the client, storing the client or error encountered lock := sync.RWMutex{} - var client *clientv3.Client + var prober *etcd3Prober clientErr := fmt.Errorf("etcd client connection not yet established") go wait.PollUntil(time.Second, func() (bool, error) { - newClient, err := newETCD3Client(c.Transport) + newProber, err := newETCD3Prober(c) lock.Lock() defer lock.Unlock() // Ensure that server is already not shutting down. select { case <-stopCh: if err == nil { - newClient.Close() + newProber.Close() } return true, nil default: @@ -173,7 +173,7 @@ func newETCD3Check(c storagebackend.Config, timeout time.Duration, stopCh <-chan clientErr = err return false, nil } - client = newClient + prober = newProber clientErr = nil return true, nil }, stopCh) @@ -185,8 +185,8 @@ func newETCD3Check(c storagebackend.Config, timeout time.Duration, stopCh <-chan lock.Lock() defer lock.Unlock() - if client != nil { - client.Close() + if prober != nil { + prober.Close() clientErr = fmt.Errorf("server is shutting down") } }() @@ -214,17 +214,56 @@ func newETCD3Check(c storagebackend.Config, timeout time.Duration, stopCh <-chan } ctx, cancel := context.WithTimeout(context.Background(), timeout) defer cancel() - // See https://github.com/etcd-io/etcd/blob/c57f8b3af865d1b531b979889c602ba14377420e/etcdctl/ctlv3/command/ep_command.go#L118 now := time.Now() - _, err := client.Get(ctx, path.Join("/", c.Prefix, "health")) - if err != nil { - err = fmt.Errorf("error getting data from etcd: %w", err) - } + err := prober.Probe(ctx) lastError.Store(err, now) return err }, nil } +func newETCD3Prober(c storagebackend.Config) (*etcd3Prober, error) { + client, err := newETCD3Client(c.Transport) + if err != nil { + return nil, err + } + return &etcd3Prober{ + client: client, + prefix: c.Prefix, + }, nil +} + +type etcd3Prober struct { + prefix string + + mux sync.RWMutex + client *clientv3.Client + closed bool +} + +func (p *etcd3Prober) Close() error { + p.mux.Lock() + defer p.mux.Unlock() + if !p.closed { + p.closed = true + return p.client.Close() + } + return fmt.Errorf("prober was closed") +} + +func (p *etcd3Prober) Probe(ctx context.Context) error { + p.mux.RLock() + defer p.mux.RUnlock() + if p.closed { + return fmt.Errorf("prober was closed") + } + // See https://github.com/etcd-io/etcd/blob/c57f8b3af865d1b531b979889c602ba14377420e/etcdctl/ctlv3/command/ep_command.go#L118 + _, err := p.client.Get(ctx, path.Join("/", p.prefix, "health")) + if err != nil { + return fmt.Errorf("error getting data from etcd: %w", err) + } + return nil +} + var newETCD3Client = func(c storagebackend.TransportConfig) (*clientv3.Client, error) { tlsInfo := transport.TLSInfo{ CertFile: c.CertFile, diff --git a/staging/src/k8s.io/apiserver/pkg/storage/storagebackend/factory/factory.go b/staging/src/k8s.io/apiserver/pkg/storage/storagebackend/factory/factory.go index 4c8a409d659..c8cdd19b97a 100644 --- a/staging/src/k8s.io/apiserver/pkg/storage/storagebackend/factory/factory.go +++ b/staging/src/k8s.io/apiserver/pkg/storage/storagebackend/factory/factory.go @@ -17,6 +17,7 @@ limitations under the License. package factory import ( + "context" "fmt" "k8s.io/apimachinery/pkg/runtime" @@ -61,3 +62,20 @@ func CreateReadyCheck(c storagebackend.Config, stopCh <-chan struct{}) (func() e return nil, fmt.Errorf("unknown storage type: %s", c.Type) } } + +func CreateProber(c storagebackend.Config) (Prober, error) { + switch c.Type { + case storagebackend.StorageTypeETCD2: + return nil, fmt.Errorf("%s is no longer a supported storage backend", c.Type) + case storagebackend.StorageTypeUnset, storagebackend.StorageTypeETCD3: + return newETCD3Prober(c) + default: + return nil, fmt.Errorf("unknown storage type: %s", c.Type) + } +} + +// Prober is an interface that defines the Probe function for doing etcd readiness/liveness checks. +type Prober interface { + Probe(ctx context.Context) error + Close() error +}