mirror of
https://github.com/k3s-io/kubernetes.git
synced 2025-07-24 04:06:03 +00:00
Move metrics to etcd3 and clean up
This commit is contained in:
parent
81c8552d7e
commit
a06f2cfc3f
@ -44,7 +44,7 @@ import (
|
||||
"k8s.io/apiserver/pkg/registry/rest"
|
||||
"k8s.io/apiserver/pkg/storage"
|
||||
storeerr "k8s.io/apiserver/pkg/storage/errors"
|
||||
"k8s.io/apiserver/pkg/storage/etcd/metrics"
|
||||
"k8s.io/apiserver/pkg/storage/etcd3/metrics"
|
||||
"k8s.io/apiserver/pkg/util/dryrun"
|
||||
|
||||
"k8s.io/klog"
|
||||
|
@ -22,7 +22,7 @@ import (
|
||||
|
||||
apimetrics "k8s.io/apiserver/pkg/endpoints/metrics"
|
||||
"k8s.io/apiserver/pkg/server/mux"
|
||||
etcdmetrics "k8s.io/apiserver/pkg/storage/etcd/metrics"
|
||||
etcd3metrics "k8s.io/apiserver/pkg/storage/etcd3/metrics"
|
||||
|
||||
"github.com/prometheus/client_golang/prometheus"
|
||||
)
|
||||
@ -47,7 +47,7 @@ func (m MetricsWithReset) Install(c *mux.PathRecorderMux) {
|
||||
c.HandleFunc("/metrics", func(w http.ResponseWriter, req *http.Request) {
|
||||
if req.Method == "DELETE" {
|
||||
apimetrics.Reset()
|
||||
etcdmetrics.Reset()
|
||||
etcd3metrics.Reset()
|
||||
io.WriteString(w, "metrics reset\n")
|
||||
return
|
||||
}
|
||||
@ -58,5 +58,5 @@ func (m MetricsWithReset) Install(c *mux.PathRecorderMux) {
|
||||
// register apiserver and etcd metrics
|
||||
func register() {
|
||||
apimetrics.Register()
|
||||
etcdmetrics.Register()
|
||||
etcd3metrics.Register()
|
||||
}
|
||||
|
@ -19,69 +19,8 @@ package util
|
||||
import (
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"io/ioutil"
|
||||
"net/http"
|
||||
|
||||
etcd "github.com/coreos/etcd/client"
|
||||
)
|
||||
|
||||
// IsEtcdNotFound returns true if and only if err is an etcd not found error.
|
||||
func IsEtcdNotFound(err error) bool {
|
||||
return isEtcdErrorNum(err, etcd.ErrorCodeKeyNotFound)
|
||||
}
|
||||
|
||||
// IsEtcdNodeExist returns true if and only if err is an etcd node already exist error.
|
||||
func IsEtcdNodeExist(err error) bool {
|
||||
return isEtcdErrorNum(err, etcd.ErrorCodeNodeExist)
|
||||
}
|
||||
|
||||
// IsEtcdTestFailed returns true if and only if err is an etcd write conflict.
|
||||
func IsEtcdTestFailed(err error) bool {
|
||||
return isEtcdErrorNum(err, etcd.ErrorCodeTestFailed)
|
||||
}
|
||||
|
||||
// IsEtcdWatchExpired returns true if and only if err indicates the watch has expired.
|
||||
func IsEtcdWatchExpired(err error) bool {
|
||||
// NOTE: This seems weird why it wouldn't be etcd.ErrorCodeWatcherCleared
|
||||
// I'm using the previous matching value
|
||||
return isEtcdErrorNum(err, etcd.ErrorCodeEventIndexCleared)
|
||||
}
|
||||
|
||||
// IsEtcdUnreachable returns true if and only if err indicates the server could not be reached.
|
||||
func IsEtcdUnreachable(err error) bool {
|
||||
// NOTE: The logic has changed previous error code no longer applies
|
||||
return err == etcd.ErrClusterUnavailable
|
||||
}
|
||||
|
||||
// isEtcdErrorNum returns true if and only if err is an etcd error, whose errorCode matches errorCode
|
||||
func isEtcdErrorNum(err error, errorCode int) bool {
|
||||
if err != nil {
|
||||
if etcdError, ok := err.(etcd.Error); ok {
|
||||
return etcdError.Code == errorCode
|
||||
}
|
||||
// NOTE: There are other error types returned
|
||||
}
|
||||
return false
|
||||
}
|
||||
|
||||
// GetEtcdVersion performs a version check against the provided Etcd server,
|
||||
// returning the string response, and error (if any).
|
||||
func GetEtcdVersion(host string) (string, error) {
|
||||
response, err := http.Get(host + "/version")
|
||||
if err != nil {
|
||||
return "", err
|
||||
}
|
||||
defer response.Body.Close()
|
||||
if response.StatusCode != http.StatusOK {
|
||||
return "", fmt.Errorf("unsuccessful response from etcd server %q: %v", host, err)
|
||||
}
|
||||
versionBytes, err := ioutil.ReadAll(response.Body)
|
||||
if err != nil {
|
||||
return "", err
|
||||
}
|
||||
return string(versionBytes), nil
|
||||
}
|
||||
|
||||
type etcdHealth struct {
|
||||
// Note this has to be public so the json library can modify it.
|
||||
Health string `json:"health"`
|
||||
|
@ -17,79 +17,9 @@ limitations under the License.
|
||||
package util
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"math/rand"
|
||||
"net"
|
||||
"net/http"
|
||||
"net/http/httptest"
|
||||
"strconv"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
etcd "github.com/coreos/etcd/client"
|
||||
"github.com/stretchr/testify/assert"
|
||||
)
|
||||
|
||||
const validEtcdVersion = "etcd 2.0.9"
|
||||
|
||||
func TestIsEtcdNotFound(t *testing.T) {
|
||||
try := func(err error, isNotFound bool) {
|
||||
if IsEtcdNotFound(err) != isNotFound {
|
||||
t.Errorf("Expected %#v to return %v, but it did not", err, isNotFound)
|
||||
}
|
||||
}
|
||||
try(&etcd.Error{Code: 101}, false)
|
||||
try(nil, false)
|
||||
try(fmt.Errorf("some other kind of error"), false)
|
||||
}
|
||||
|
||||
func TestGetEtcdVersion_ValidVersion(t *testing.T) {
|
||||
testServer := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
||||
fmt.Fprint(w, validEtcdVersion)
|
||||
}))
|
||||
defer testServer.Close()
|
||||
|
||||
var version string
|
||||
var err error
|
||||
if version, err = GetEtcdVersion(testServer.URL); err != nil {
|
||||
t.Errorf("Unexpected error: %v", err)
|
||||
}
|
||||
assert.Equal(t, validEtcdVersion, version, "Unexpected version")
|
||||
assert.Nil(t, err)
|
||||
}
|
||||
|
||||
func TestGetEtcdVersion_ErrorStatus(t *testing.T) {
|
||||
testServer := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
||||
http.Error(w, "", http.StatusServiceUnavailable)
|
||||
}))
|
||||
defer testServer.Close()
|
||||
|
||||
_, err := GetEtcdVersion(testServer.URL)
|
||||
assert.NotNil(t, err)
|
||||
}
|
||||
|
||||
func TestGetEtcdVersion_NotListening(t *testing.T) {
|
||||
portIsOpen := func(port int) bool {
|
||||
conn, err := net.DialTimeout("tcp", "127.0.0.1:"+strconv.Itoa(port), 1*time.Second)
|
||||
if err == nil {
|
||||
conn.Close()
|
||||
return true
|
||||
}
|
||||
return false
|
||||
}
|
||||
|
||||
port := rand.Intn((1 << 16) - 1)
|
||||
for tried := 0; portIsOpen(port); tried++ {
|
||||
if tried >= 10 {
|
||||
t.Fatal("Couldn't find a closed TCP port to continue testing")
|
||||
}
|
||||
port++
|
||||
}
|
||||
|
||||
_, err := GetEtcdVersion("http://127.0.0.1:" + strconv.Itoa(port))
|
||||
assert.NotNil(t, err)
|
||||
}
|
||||
|
||||
func TestEtcdHealthCheck(t *testing.T) {
|
||||
tests := []struct {
|
||||
data string
|
||||
|
@ -62,15 +62,18 @@ func Register() {
|
||||
})
|
||||
}
|
||||
|
||||
// UpdateObjectCount sets the etcd_object_counts metric.
|
||||
func UpdateObjectCount(resourcePrefix string, count int64) {
|
||||
objectCounts.WithLabelValues(resourcePrefix).Set(float64(count))
|
||||
}
|
||||
|
||||
// RecordEtcdRequestLatency sets the etcd_request_duration_seconds metrics.
|
||||
func RecordEtcdRequestLatency(verb, resource string, startTime time.Time) {
|
||||
etcdRequestLatency.WithLabelValues(verb, resource).Observe(sinceInSeconds(startTime))
|
||||
deprecatedEtcdRequestLatenciesSummary.WithLabelValues(verb, resource).Observe(sinceInMicroseconds(startTime))
|
||||
}
|
||||
|
||||
// Reset resets the etcd_request_duration_seconds metric.
|
||||
func Reset() {
|
||||
etcdRequestLatency.Reset()
|
||||
|
@ -39,7 +39,7 @@ import (
|
||||
"k8s.io/apiserver/pkg/features"
|
||||
"k8s.io/apiserver/pkg/storage"
|
||||
"k8s.io/apiserver/pkg/storage/etcd"
|
||||
"k8s.io/apiserver/pkg/storage/etcd/metrics"
|
||||
"k8s.io/apiserver/pkg/storage/etcd3/metrics"
|
||||
"k8s.io/apiserver/pkg/storage/value"
|
||||
utilfeature "k8s.io/apiserver/pkg/util/feature"
|
||||
utiltrace "k8s.io/utils/trace"
|
||||
|
Loading…
Reference in New Issue
Block a user