mirror of
https://github.com/k3s-io/kubernetes.git
synced 2025-08-12 21:36:24 +00:00
Make etcd component status consistent with health probes
Co-authored-by: Antonio Ojea <antonio.ojea.garcia@gmail.com>
This commit is contained in:
parent
eca1f9d2d5
commit
a60314c47e
@ -38,12 +38,12 @@ import (
|
|||||||
)
|
)
|
||||||
|
|
||||||
type REST struct {
|
type REST struct {
|
||||||
GetServersToValidate func() map[string]*Server
|
GetServersToValidate func() map[string]Server
|
||||||
rest.TableConvertor
|
rest.TableConvertor
|
||||||
}
|
}
|
||||||
|
|
||||||
// NewStorage returns a new REST.
|
// NewStorage returns a new REST.
|
||||||
func NewStorage(serverRetriever func() map[string]*Server) *REST {
|
func NewStorage(serverRetriever func() map[string]Server) *REST {
|
||||||
return &REST{
|
return &REST{
|
||||||
GetServersToValidate: serverRetriever,
|
GetServersToValidate: serverRetriever,
|
||||||
TableConvertor: printerstorage.TableConvertor{TableGenerator: printers.NewTableGenerator().With(printersinternal.AddHandlers)},
|
TableConvertor: printerstorage.TableConvertor{TableGenerator: printers.NewTableGenerator().With(printersinternal.AddHandlers)},
|
||||||
@ -83,7 +83,7 @@ func (rs *REST) List(ctx context.Context, options *metainternalversion.ListOptio
|
|||||||
wait.Add(len(servers))
|
wait.Add(len(servers))
|
||||||
statuses := make(chan api.ComponentStatus, len(servers))
|
statuses := make(chan api.ComponentStatus, len(servers))
|
||||||
for k, v := range servers {
|
for k, v := range servers {
|
||||||
go func(name string, server *Server) {
|
go func(name string, server Server) {
|
||||||
defer wait.Done()
|
defer wait.Done()
|
||||||
status := rs.getComponentStatus(name, server)
|
status := rs.getComponentStatus(name, server)
|
||||||
statuses <- *status
|
statuses <- *status
|
||||||
@ -153,7 +153,7 @@ func ToConditionStatus(s probe.Result) api.ConditionStatus {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (rs *REST) getComponentStatus(name string, server *Server) *api.ComponentStatus {
|
func (rs *REST) getComponentStatus(name string, server Server) *api.ComponentStatus {
|
||||||
status, msg, err := server.DoServerCheck()
|
status, msg, err := server.DoServerCheck()
|
||||||
errorMsg := ""
|
errorMsg := ""
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
@ -59,9 +59,9 @@ func NewTestREST(resp testResponse) *REST {
|
|||||||
err: resp.err,
|
err: resp.err,
|
||||||
}
|
}
|
||||||
return &REST{
|
return &REST{
|
||||||
GetServersToValidate: func() map[string]*Server {
|
GetServersToValidate: func() map[string]Server {
|
||||||
return map[string]*Server{
|
return map[string]Server{
|
||||||
"test1": {Addr: "testserver1", Port: 8000, Path: "/healthz", Prober: prober},
|
"test1": &HttpServer{Addr: "testserver1", Port: 8000, Path: "/healthz", Prober: prober},
|
||||||
}
|
}
|
||||||
},
|
},
|
||||||
}
|
}
|
||||||
|
@ -17,12 +17,15 @@ limitations under the License.
|
|||||||
package componentstatus
|
package componentstatus
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"context"
|
||||||
"crypto/tls"
|
"crypto/tls"
|
||||||
"fmt"
|
"fmt"
|
||||||
"sync"
|
"sync"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
utilnet "k8s.io/apimachinery/pkg/util/net"
|
utilnet "k8s.io/apimachinery/pkg/util/net"
|
||||||
|
"k8s.io/apiserver/pkg/storage/storagebackend"
|
||||||
|
"k8s.io/apiserver/pkg/storage/storagebackend/factory"
|
||||||
"k8s.io/kubernetes/pkg/probe"
|
"k8s.io/kubernetes/pkg/probe"
|
||||||
httpprober "k8s.io/kubernetes/pkg/probe/http"
|
httpprober "k8s.io/kubernetes/pkg/probe/http"
|
||||||
)
|
)
|
||||||
@ -33,7 +36,11 @@ const (
|
|||||||
|
|
||||||
type ValidatorFn func([]byte) error
|
type ValidatorFn func([]byte) error
|
||||||
|
|
||||||
type Server struct {
|
type Server interface {
|
||||||
|
DoServerCheck() (probe.Result, string, error)
|
||||||
|
}
|
||||||
|
|
||||||
|
type HttpServer struct {
|
||||||
Addr string
|
Addr string
|
||||||
Port int
|
Port int
|
||||||
Path string
|
Path string
|
||||||
@ -57,7 +64,7 @@ type ServerStatus struct {
|
|||||||
Err string `json:"err,omitempty"`
|
Err string `json:"err,omitempty"`
|
||||||
}
|
}
|
||||||
|
|
||||||
func (server *Server) DoServerCheck() (probe.Result, string, error) {
|
func (server *HttpServer) DoServerCheck() (probe.Result, string, error) {
|
||||||
// setup the prober
|
// setup the prober
|
||||||
server.Once.Do(func() {
|
server.Once.Do(func() {
|
||||||
if server.Prober != nil {
|
if server.Prober != nil {
|
||||||
@ -92,3 +99,23 @@ func (server *Server) DoServerCheck() (probe.Result, string, error) {
|
|||||||
}
|
}
|
||||||
return result, data, nil
|
return result, data, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
type EtcdServer struct {
|
||||||
|
storagebackend.Config
|
||||||
|
}
|
||||||
|
|
||||||
|
func (server *EtcdServer) DoServerCheck() (probe.Result, string, error) {
|
||||||
|
prober, err := factory.CreateProber(server.Config)
|
||||||
|
if err != nil {
|
||||||
|
return probe.Failure, "", err
|
||||||
|
}
|
||||||
|
defer prober.Close()
|
||||||
|
|
||||||
|
ctx, cancel := context.WithTimeout(context.Background(), probeTimeOut)
|
||||||
|
defer cancel()
|
||||||
|
err = prober.Probe(ctx)
|
||||||
|
if err != nil {
|
||||||
|
return probe.Failure, "", err
|
||||||
|
}
|
||||||
|
return probe.Success, "", err
|
||||||
|
}
|
||||||
|
@ -49,7 +49,7 @@ func TestValidate(t *testing.T) {
|
|||||||
{probe.Success, "foo", nil, probe.Success, "foo", false, nil},
|
{probe.Success, "foo", nil, probe.Success, "foo", false, nil},
|
||||||
}
|
}
|
||||||
|
|
||||||
s := Server{Addr: "foo.com", Port: 8080, Path: "/healthz"}
|
s := HttpServer{Addr: "foo.com", Port: 8080, Path: "/healthz"}
|
||||||
|
|
||||||
for _, test := range tests {
|
for _, test := range tests {
|
||||||
fakeProber := &fakeHttpProber{
|
fakeProber := &fakeHttpProber{
|
||||||
|
@ -21,12 +21,8 @@ import (
|
|||||||
"fmt"
|
"fmt"
|
||||||
"net"
|
"net"
|
||||||
"net/http"
|
"net/http"
|
||||||
"net/url"
|
|
||||||
"strings"
|
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
"k8s.io/klog/v2"
|
|
||||||
|
|
||||||
corev1 "k8s.io/api/core/v1"
|
corev1 "k8s.io/api/core/v1"
|
||||||
"k8s.io/apimachinery/pkg/runtime/schema"
|
"k8s.io/apimachinery/pkg/runtime/schema"
|
||||||
utilnet "k8s.io/apimachinery/pkg/util/net"
|
utilnet "k8s.io/apimachinery/pkg/util/net"
|
||||||
@ -35,7 +31,6 @@ import (
|
|||||||
"k8s.io/apiserver/pkg/registry/rest"
|
"k8s.io/apiserver/pkg/registry/rest"
|
||||||
genericapiserver "k8s.io/apiserver/pkg/server"
|
genericapiserver "k8s.io/apiserver/pkg/server"
|
||||||
serverstorage "k8s.io/apiserver/pkg/server/storage"
|
serverstorage "k8s.io/apiserver/pkg/server/storage"
|
||||||
"k8s.io/apiserver/pkg/storage/etcd3"
|
|
||||||
utilfeature "k8s.io/apiserver/pkg/util/feature"
|
utilfeature "k8s.io/apiserver/pkg/util/feature"
|
||||||
"k8s.io/client-go/informers"
|
"k8s.io/client-go/informers"
|
||||||
networkingv1alpha1client "k8s.io/client-go/kubernetes/typed/networking/v1alpha1"
|
networkingv1alpha1client "k8s.io/client-go/kubernetes/typed/networking/v1alpha1"
|
||||||
@ -69,7 +64,6 @@ import (
|
|||||||
serviceaccountstore "k8s.io/kubernetes/pkg/registry/core/serviceaccount/storage"
|
serviceaccountstore "k8s.io/kubernetes/pkg/registry/core/serviceaccount/storage"
|
||||||
kubeschedulerconfig "k8s.io/kubernetes/pkg/scheduler/apis/config"
|
kubeschedulerconfig "k8s.io/kubernetes/pkg/scheduler/apis/config"
|
||||||
"k8s.io/kubernetes/pkg/serviceaccount"
|
"k8s.io/kubernetes/pkg/serviceaccount"
|
||||||
utilsnet "k8s.io/utils/net"
|
|
||||||
)
|
)
|
||||||
|
|
||||||
// LegacyRESTStorageProvider provides information needed to build RESTStorage for core, but
|
// LegacyRESTStorageProvider provides information needed to build RESTStorage for core, but
|
||||||
@ -416,43 +410,16 @@ type componentStatusStorage struct {
|
|||||||
storageFactory serverstorage.StorageFactory
|
storageFactory serverstorage.StorageFactory
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s componentStatusStorage) serversToValidate() map[string]*componentstatus.Server {
|
func (s componentStatusStorage) serversToValidate() map[string]componentstatus.Server {
|
||||||
// this is fragile, which assumes that the default port is being used
|
// this is fragile, which assumes that the default port is being used
|
||||||
// TODO: switch to secure port until these components remove the ability to serve insecurely.
|
// TODO: switch to secure port until these components remove the ability to serve insecurely.
|
||||||
serversToValidate := map[string]*componentstatus.Server{
|
serversToValidate := map[string]componentstatus.Server{
|
||||||
"controller-manager": {EnableHTTPS: true, TLSConfig: &tls.Config{InsecureSkipVerify: true}, Addr: "127.0.0.1", Port: ports.KubeControllerManagerPort, Path: "/healthz"},
|
"controller-manager": &componentstatus.HttpServer{EnableHTTPS: true, TLSConfig: &tls.Config{InsecureSkipVerify: true}, Addr: "127.0.0.1", Port: ports.KubeControllerManagerPort, Path: "/healthz"},
|
||||||
"scheduler": {EnableHTTPS: true, TLSConfig: &tls.Config{InsecureSkipVerify: true}, Addr: "127.0.0.1", Port: kubeschedulerconfig.DefaultKubeSchedulerPort, Path: "/healthz"},
|
"scheduler": &componentstatus.HttpServer{EnableHTTPS: true, TLSConfig: &tls.Config{InsecureSkipVerify: true}, Addr: "127.0.0.1", Port: kubeschedulerconfig.DefaultKubeSchedulerPort, Path: "/healthz"},
|
||||||
}
|
}
|
||||||
|
|
||||||
for ix, machine := range s.storageFactory.Backends() {
|
for ix, cfg := range s.storageFactory.Configs() {
|
||||||
etcdUrl, err := url.Parse(machine.Server)
|
serversToValidate[fmt.Sprintf("etcd-%d", ix)] = &componentstatus.EtcdServer{Config: cfg}
|
||||||
if err != nil {
|
|
||||||
klog.Errorf("Failed to parse etcd url for validation: %v", err)
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
var port int
|
|
||||||
var addr string
|
|
||||||
if strings.Contains(etcdUrl.Host, ":") {
|
|
||||||
var portString string
|
|
||||||
addr, portString, err = net.SplitHostPort(etcdUrl.Host)
|
|
||||||
if err != nil {
|
|
||||||
klog.Errorf("Failed to split host/port: %s (%v)", etcdUrl.Host, err)
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
port, _ = utilsnet.ParsePort(portString, true)
|
|
||||||
} else {
|
|
||||||
addr = etcdUrl.Host
|
|
||||||
port = 2379
|
|
||||||
}
|
|
||||||
// TODO: etcd health checking should be abstracted in the storage tier
|
|
||||||
serversToValidate[fmt.Sprintf("etcd-%d", ix)] = &componentstatus.Server{
|
|
||||||
Addr: addr,
|
|
||||||
EnableHTTPS: etcdUrl.Scheme == "https",
|
|
||||||
TLSConfig: machine.TLSConfig,
|
|
||||||
Port: port,
|
|
||||||
Path: "/health",
|
|
||||||
Validate: etcd3.EtcdHealthCheck,
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
return serversToValidate
|
return serversToValidate
|
||||||
}
|
}
|
||||||
|
@ -51,3 +51,7 @@ func (f fakeStorageFactory) ResourcePrefix(groupResource schema.GroupResource) s
|
|||||||
func (f fakeStorageFactory) Backends() []storage.Backend {
|
func (f fakeStorageFactory) Backends() []storage.Backend {
|
||||||
return []storage.Backend{{Server: "etcd-0"}}
|
return []storage.Backend{{Server: "etcd-0"}}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (f fakeStorageFactory) Configs() []storagebackend.Config {
|
||||||
|
return []storagebackend.Config{{Transport: storagebackend.TransportConfig{ServerList: []string{"etcd-0"}}}}
|
||||||
|
}
|
||||||
|
@ -444,6 +444,10 @@ func (s *SimpleStorageFactory) ResourcePrefix(resource schema.GroupResource) str
|
|||||||
return resource.Group + "/" + resource.Resource
|
return resource.Group + "/" + resource.Resource
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (s *SimpleStorageFactory) Configs() []storagebackend.Config {
|
||||||
|
return serverstorage.Configs(s.StorageConfig)
|
||||||
|
}
|
||||||
|
|
||||||
func (s *SimpleStorageFactory) Backends() []serverstorage.Backend {
|
func (s *SimpleStorageFactory) Backends() []serverstorage.Backend {
|
||||||
// nothing should ever call this method but we still provide a functional implementation
|
// nothing should ever call this method but we still provide a functional implementation
|
||||||
return serverstorage.Backends(s.StorageConfig)
|
return serverstorage.Backends(s.StorageConfig)
|
||||||
@ -474,6 +478,10 @@ func (t *transformerStorageFactory) ResourcePrefix(resource schema.GroupResource
|
|||||||
return t.delegate.ResourcePrefix(resource)
|
return t.delegate.ResourcePrefix(resource)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (t *transformerStorageFactory) Configs() []storagebackend.Config {
|
||||||
|
return t.delegate.Configs()
|
||||||
|
}
|
||||||
|
|
||||||
func (t *transformerStorageFactory) Backends() []serverstorage.Backend {
|
func (t *transformerStorageFactory) Backends() []serverstorage.Backend {
|
||||||
return t.delegate.Backends()
|
return t.delegate.Backends()
|
||||||
}
|
}
|
||||||
|
@ -22,14 +22,13 @@ import (
|
|||||||
"io/ioutil"
|
"io/ioutil"
|
||||||
"strings"
|
"strings"
|
||||||
|
|
||||||
"k8s.io/klog/v2"
|
|
||||||
|
|
||||||
"k8s.io/apimachinery/pkg/runtime"
|
"k8s.io/apimachinery/pkg/runtime"
|
||||||
"k8s.io/apimachinery/pkg/runtime/schema"
|
"k8s.io/apimachinery/pkg/runtime/schema"
|
||||||
"k8s.io/apimachinery/pkg/util/sets"
|
"k8s.io/apimachinery/pkg/util/sets"
|
||||||
"k8s.io/apiserver/pkg/features"
|
"k8s.io/apiserver/pkg/features"
|
||||||
"k8s.io/apiserver/pkg/storage/storagebackend"
|
"k8s.io/apiserver/pkg/storage/storagebackend"
|
||||||
utilfeature "k8s.io/apiserver/pkg/util/feature"
|
utilfeature "k8s.io/apiserver/pkg/util/feature"
|
||||||
|
"k8s.io/klog/v2"
|
||||||
)
|
)
|
||||||
|
|
||||||
// Backend describes the storage servers, the information here should be enough
|
// Backend describes the storage servers, the information here should be enough
|
||||||
@ -52,8 +51,12 @@ type StorageFactory interface {
|
|||||||
// centralized control over the shape of etcd directories
|
// centralized control over the shape of etcd directories
|
||||||
ResourcePrefix(groupResource schema.GroupResource) string
|
ResourcePrefix(groupResource schema.GroupResource) string
|
||||||
|
|
||||||
|
// Configs gets configurations for all of registered storage destinations.
|
||||||
|
Configs() []storagebackend.Config
|
||||||
|
|
||||||
// Backends gets all backends for all registered storage destinations.
|
// Backends gets all backends for all registered storage destinations.
|
||||||
// Used for getting all instances for health validations.
|
// Used for getting all instances for health validations.
|
||||||
|
// Deprecated: Use Configs instead
|
||||||
Backends() []Backend
|
Backends() []Backend
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -276,14 +279,52 @@ func (s *DefaultStorageFactory) NewConfig(groupResource schema.GroupResource) (*
|
|||||||
return storageConfig.ForResource(groupResource), nil
|
return storageConfig.ForResource(groupResource), nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// Backends returns all backends for all registered storage destinations.
|
// Configs implements StorageFactory.
|
||||||
// Used for getting all instances for health validations.
|
func (s *DefaultStorageFactory) Configs() []storagebackend.Config {
|
||||||
|
return configs(s.StorageConfig, s.Overrides)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Configs gets configurations for all of registered storage destinations.
|
||||||
|
func Configs(storageConfig storagebackend.Config) []storagebackend.Config {
|
||||||
|
return configs(storageConfig, nil)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Returns all storage configurations including those for group resource overrides
|
||||||
|
func configs(storageConfig storagebackend.Config, grOverrides map[schema.GroupResource]groupResourceOverrides) []storagebackend.Config {
|
||||||
|
locations := sets.NewString()
|
||||||
|
configs := []storagebackend.Config{}
|
||||||
|
for _, loc := range storageConfig.Transport.ServerList {
|
||||||
|
// copy
|
||||||
|
newConfig := storageConfig
|
||||||
|
newConfig.Transport.ServerList = []string{loc}
|
||||||
|
configs = append(configs, newConfig)
|
||||||
|
locations.Insert(loc)
|
||||||
|
}
|
||||||
|
|
||||||
|
for _, override := range grOverrides {
|
||||||
|
for _, loc := range override.etcdLocation {
|
||||||
|
if locations.Has(loc) {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
// copy
|
||||||
|
newConfig := storageConfig
|
||||||
|
override.Apply(&newConfig, &StorageCodecConfig{})
|
||||||
|
newConfig.Transport.ServerList = []string{loc}
|
||||||
|
configs = append(configs, newConfig)
|
||||||
|
locations.Insert(loc)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return configs
|
||||||
|
}
|
||||||
|
|
||||||
|
// Backends implements StorageFactory.
|
||||||
func (s *DefaultStorageFactory) Backends() []Backend {
|
func (s *DefaultStorageFactory) Backends() []Backend {
|
||||||
return backends(s.StorageConfig, s.Overrides)
|
return backends(s.StorageConfig, s.Overrides)
|
||||||
}
|
}
|
||||||
|
|
||||||
// Backends returns all backends for all registered storage destinations.
|
// Backends returns all backends for all registered storage destinations.
|
||||||
// Used for getting all instances for health validations.
|
// Used for getting all instances for health validations.
|
||||||
|
// Deprecated: Validate health by passing storagebackend.Config directly to storagefactory.CreateProber.
|
||||||
func Backends(storageConfig storagebackend.Config) []Backend {
|
func Backends(storageConfig storagebackend.Config) []Backend {
|
||||||
return backends(storageConfig, nil)
|
return backends(storageConfig, nil)
|
||||||
}
|
}
|
||||||
|
@ -185,3 +185,59 @@ func TestUpdateEtcdOverrides(t *testing.T) {
|
|||||||
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func TestConfigs(t *testing.T) {
|
||||||
|
exampleinstall.Install(scheme)
|
||||||
|
defaultEtcdLocations := []string{"http://127.0.0.1", "http://127.0.0.2"}
|
||||||
|
|
||||||
|
testCases := []struct {
|
||||||
|
resource schema.GroupResource
|
||||||
|
servers []string
|
||||||
|
wantConfigs []storagebackend.Config
|
||||||
|
}{
|
||||||
|
{
|
||||||
|
wantConfigs: []storagebackend.Config{
|
||||||
|
{Transport: storagebackend.TransportConfig{ServerList: []string{"http://127.0.0.1"}}, Prefix: "/registry", Paging: true},
|
||||||
|
{Transport: storagebackend.TransportConfig{ServerList: []string{"http://127.0.0.2"}}, Prefix: "/registry", Paging: true},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
{
|
||||||
|
resource: schema.GroupResource{Group: example.GroupName, Resource: "resource"},
|
||||||
|
servers: []string{"http://127.0.0.1:10000"},
|
||||||
|
wantConfigs: []storagebackend.Config{
|
||||||
|
{Transport: storagebackend.TransportConfig{ServerList: []string{"http://127.0.0.1"}}, Prefix: "/registry", Paging: true},
|
||||||
|
{Transport: storagebackend.TransportConfig{ServerList: []string{"http://127.0.0.2"}}, Prefix: "/registry", Paging: true},
|
||||||
|
{Transport: storagebackend.TransportConfig{ServerList: []string{"http://127.0.0.1:10000"}}, Prefix: "/registry", Paging: true},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
{
|
||||||
|
resource: schema.GroupResource{Group: example.GroupName, Resource: "resource"},
|
||||||
|
servers: []string{"http://127.0.0.1:10000", "https://127.0.0.1", "http://127.0.0.2"},
|
||||||
|
wantConfigs: []storagebackend.Config{
|
||||||
|
{Transport: storagebackend.TransportConfig{ServerList: []string{"http://127.0.0.1"}}, Prefix: "/registry", Paging: true},
|
||||||
|
{Transport: storagebackend.TransportConfig{ServerList: []string{"http://127.0.0.2"}}, Prefix: "/registry", Paging: true},
|
||||||
|
{Transport: storagebackend.TransportConfig{ServerList: []string{"http://127.0.0.1:10000"}}, Prefix: "/registry", Paging: true},
|
||||||
|
{Transport: storagebackend.TransportConfig{ServerList: []string{"https://127.0.0.1"}}, Prefix: "/registry", Paging: true},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
}
|
||||||
|
|
||||||
|
for i, test := range testCases {
|
||||||
|
defaultConfig := storagebackend.Config{
|
||||||
|
Prefix: "/registry",
|
||||||
|
Transport: storagebackend.TransportConfig{
|
||||||
|
ServerList: defaultEtcdLocations,
|
||||||
|
},
|
||||||
|
}
|
||||||
|
storageFactory := NewDefaultStorageFactory(defaultConfig, "", codecs, NewDefaultResourceEncodingConfig(scheme), NewResourceConfig(), nil)
|
||||||
|
if len(test.servers) > 0 {
|
||||||
|
storageFactory.SetEtcdLocation(test.resource, test.servers)
|
||||||
|
}
|
||||||
|
|
||||||
|
got := storageFactory.Configs()
|
||||||
|
if !reflect.DeepEqual(test.wantConfigs, got) {
|
||||||
|
t.Errorf("%d: expected %v, got %v", i, test.wantConfigs, got)
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
@ -28,6 +28,7 @@ type etcdHealth struct {
|
|||||||
}
|
}
|
||||||
|
|
||||||
// EtcdHealthCheck decodes data returned from etcd /healthz handler.
|
// EtcdHealthCheck decodes data returned from etcd /healthz handler.
|
||||||
|
// Deprecated: Validate health by passing storagebackend.Config directly to storagefactory.CreateProber.
|
||||||
func EtcdHealthCheck(data []byte) error {
|
func EtcdHealthCheck(data []byte) error {
|
||||||
obj := etcdHealth{}
|
obj := etcdHealth{}
|
||||||
if err := json.Unmarshal(data, &obj); err != nil {
|
if err := json.Unmarshal(data, &obj); err != nil {
|
||||||
|
@ -153,18 +153,18 @@ func newETCD3Check(c storagebackend.Config, timeout time.Duration, stopCh <-chan
|
|||||||
// retry in a loop in the background until we successfully create the client, storing the client or error encountered
|
// retry in a loop in the background until we successfully create the client, storing the client or error encountered
|
||||||
|
|
||||||
lock := sync.RWMutex{}
|
lock := sync.RWMutex{}
|
||||||
var client *clientv3.Client
|
var prober *etcd3Prober
|
||||||
clientErr := fmt.Errorf("etcd client connection not yet established")
|
clientErr := fmt.Errorf("etcd client connection not yet established")
|
||||||
|
|
||||||
go wait.PollUntil(time.Second, func() (bool, error) {
|
go wait.PollUntil(time.Second, func() (bool, error) {
|
||||||
newClient, err := newETCD3Client(c.Transport)
|
newProber, err := newETCD3Prober(c)
|
||||||
lock.Lock()
|
lock.Lock()
|
||||||
defer lock.Unlock()
|
defer lock.Unlock()
|
||||||
// Ensure that server is already not shutting down.
|
// Ensure that server is already not shutting down.
|
||||||
select {
|
select {
|
||||||
case <-stopCh:
|
case <-stopCh:
|
||||||
if err == nil {
|
if err == nil {
|
||||||
newClient.Close()
|
newProber.Close()
|
||||||
}
|
}
|
||||||
return true, nil
|
return true, nil
|
||||||
default:
|
default:
|
||||||
@ -173,7 +173,7 @@ func newETCD3Check(c storagebackend.Config, timeout time.Duration, stopCh <-chan
|
|||||||
clientErr = err
|
clientErr = err
|
||||||
return false, nil
|
return false, nil
|
||||||
}
|
}
|
||||||
client = newClient
|
prober = newProber
|
||||||
clientErr = nil
|
clientErr = nil
|
||||||
return true, nil
|
return true, nil
|
||||||
}, stopCh)
|
}, stopCh)
|
||||||
@ -185,8 +185,8 @@ func newETCD3Check(c storagebackend.Config, timeout time.Duration, stopCh <-chan
|
|||||||
|
|
||||||
lock.Lock()
|
lock.Lock()
|
||||||
defer lock.Unlock()
|
defer lock.Unlock()
|
||||||
if client != nil {
|
if prober != nil {
|
||||||
client.Close()
|
prober.Close()
|
||||||
clientErr = fmt.Errorf("server is shutting down")
|
clientErr = fmt.Errorf("server is shutting down")
|
||||||
}
|
}
|
||||||
}()
|
}()
|
||||||
@ -214,17 +214,56 @@ func newETCD3Check(c storagebackend.Config, timeout time.Duration, stopCh <-chan
|
|||||||
}
|
}
|
||||||
ctx, cancel := context.WithTimeout(context.Background(), timeout)
|
ctx, cancel := context.WithTimeout(context.Background(), timeout)
|
||||||
defer cancel()
|
defer cancel()
|
||||||
// See https://github.com/etcd-io/etcd/blob/c57f8b3af865d1b531b979889c602ba14377420e/etcdctl/ctlv3/command/ep_command.go#L118
|
|
||||||
now := time.Now()
|
now := time.Now()
|
||||||
_, err := client.Get(ctx, path.Join("/", c.Prefix, "health"))
|
err := prober.Probe(ctx)
|
||||||
if err != nil {
|
|
||||||
err = fmt.Errorf("error getting data from etcd: %w", err)
|
|
||||||
}
|
|
||||||
lastError.Store(err, now)
|
lastError.Store(err, now)
|
||||||
return err
|
return err
|
||||||
}, nil
|
}, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func newETCD3Prober(c storagebackend.Config) (*etcd3Prober, error) {
|
||||||
|
client, err := newETCD3Client(c.Transport)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
return &etcd3Prober{
|
||||||
|
client: client,
|
||||||
|
prefix: c.Prefix,
|
||||||
|
}, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
type etcd3Prober struct {
|
||||||
|
prefix string
|
||||||
|
|
||||||
|
mux sync.RWMutex
|
||||||
|
client *clientv3.Client
|
||||||
|
closed bool
|
||||||
|
}
|
||||||
|
|
||||||
|
func (p *etcd3Prober) Close() error {
|
||||||
|
p.mux.Lock()
|
||||||
|
defer p.mux.Unlock()
|
||||||
|
if !p.closed {
|
||||||
|
p.closed = true
|
||||||
|
return p.client.Close()
|
||||||
|
}
|
||||||
|
return fmt.Errorf("prober was closed")
|
||||||
|
}
|
||||||
|
|
||||||
|
func (p *etcd3Prober) Probe(ctx context.Context) error {
|
||||||
|
p.mux.RLock()
|
||||||
|
defer p.mux.RUnlock()
|
||||||
|
if p.closed {
|
||||||
|
return fmt.Errorf("prober was closed")
|
||||||
|
}
|
||||||
|
// See https://github.com/etcd-io/etcd/blob/c57f8b3af865d1b531b979889c602ba14377420e/etcdctl/ctlv3/command/ep_command.go#L118
|
||||||
|
_, err := p.client.Get(ctx, path.Join("/", p.prefix, "health"))
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Errorf("error getting data from etcd: %w", err)
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
var newETCD3Client = func(c storagebackend.TransportConfig) (*clientv3.Client, error) {
|
var newETCD3Client = func(c storagebackend.TransportConfig) (*clientv3.Client, error) {
|
||||||
tlsInfo := transport.TLSInfo{
|
tlsInfo := transport.TLSInfo{
|
||||||
CertFile: c.CertFile,
|
CertFile: c.CertFile,
|
||||||
|
@ -17,6 +17,7 @@ limitations under the License.
|
|||||||
package factory
|
package factory
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"context"
|
||||||
"fmt"
|
"fmt"
|
||||||
|
|
||||||
"k8s.io/apimachinery/pkg/runtime"
|
"k8s.io/apimachinery/pkg/runtime"
|
||||||
@ -61,3 +62,20 @@ func CreateReadyCheck(c storagebackend.Config, stopCh <-chan struct{}) (func() e
|
|||||||
return nil, fmt.Errorf("unknown storage type: %s", c.Type)
|
return nil, fmt.Errorf("unknown storage type: %s", c.Type)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func CreateProber(c storagebackend.Config) (Prober, error) {
|
||||||
|
switch c.Type {
|
||||||
|
case storagebackend.StorageTypeETCD2:
|
||||||
|
return nil, fmt.Errorf("%s is no longer a supported storage backend", c.Type)
|
||||||
|
case storagebackend.StorageTypeUnset, storagebackend.StorageTypeETCD3:
|
||||||
|
return newETCD3Prober(c)
|
||||||
|
default:
|
||||||
|
return nil, fmt.Errorf("unknown storage type: %s", c.Type)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Prober is an interface that defines the Probe function for doing etcd readiness/liveness checks.
|
||||||
|
type Prober interface {
|
||||||
|
Probe(ctx context.Context) error
|
||||||
|
Close() error
|
||||||
|
}
|
||||||
|
Loading…
Reference in New Issue
Block a user