update etcd packages to v3.5.4

in e9e8be3 we updated etcd server components to 3.5.4, here let's update the vendor/ as well to match

Signed-off-by: Davanum Srinivas <davanum@gmail.com>
This commit is contained in:
Davanum Srinivas
2022-06-11 17:33:39 -04:00
parent 867b5cc31b
commit 4672bfa26f
54 changed files with 775 additions and 383 deletions

View File

@@ -68,6 +68,9 @@ type Backend interface {
Defrag() error
ForceCommit()
Close() error
// SetTxPostLockInsideApplyHook sets a txPostLockInsideApplyHook.
SetTxPostLockInsideApplyHook(func())
}
type Snapshot interface {
@@ -100,8 +103,9 @@ type backend struct {
// mlock prevents backend database file to be swapped
mlock bool
mu sync.RWMutex
db *bolt.DB
mu sync.RWMutex
bopts *bolt.Options
db *bolt.DB
batchInterval time.Duration
batchLimit int
@@ -119,6 +123,9 @@ type backend struct {
hooks Hooks
// txPostLockInsideApplyHook is called each time right after locking the tx.
txPostLockInsideApplyHook func()
lg *zap.Logger
}
@@ -185,7 +192,8 @@ func newBackend(bcfg BackendConfig) *backend {
// In future, may want to make buffering optional for low-concurrency systems
// or dynamically swap between buffered/non-buffered depending on workload.
b := &backend{
db: db,
bopts: bopts,
db: db,
batchInterval: bcfg.BatchInterval,
batchLimit: bcfg.BatchLimit,
@@ -229,6 +237,14 @@ func (b *backend) BatchTx() BatchTx {
return b.batchTx
}
func (b *backend) SetTxPostLockInsideApplyHook(hook func()) {
// It needs to lock the batchTx, because the periodic commit
// may be accessing the txPostLockInsideApplyHook at the moment.
b.batchTx.lock()
defer b.batchTx.Unlock()
b.txPostLockInsideApplyHook = hook
}
func (b *backend) ReadTx() ReadTx { return b.readTx }
// ConcurrentReadTx creates and returns a new ReadTx, which:
@@ -432,11 +448,13 @@ func (b *backend) Defrag() error {
func (b *backend) defrag() error {
now := time.Now()
isDefragActive.Set(1)
defer isDefragActive.Set(0)
// TODO: make this non-blocking?
// lock batchTx to ensure nobody is using previous tx, and then
// close previous ongoing tx.
b.batchTx.Lock()
b.batchTx.LockOutsideApply()
defer b.batchTx.Unlock()
// lock database after lock tx to avoid deadlock.
@@ -509,13 +527,7 @@ func (b *backend) defrag() error {
b.lg.Fatal("failed to rename tmp database", zap.Error(err))
}
defragmentedBoltOptions := bolt.Options{}
if boltOpenOptions != nil {
defragmentedBoltOptions = *boltOpenOptions
}
defragmentedBoltOptions.Mlock = b.mlock
b.db, err = bolt.Open(dbp, 0600, &defragmentedBoltOptions)
b.db, err = bolt.Open(dbp, 0600, b.bopts)
if err != nil {
b.lg.Fatal("failed to open database", zap.String("path", dbp), zap.Error(err))
}

View File

@@ -53,6 +53,8 @@ type BatchTx interface {
Commit()
// CommitAndStop commits the previous tx and does not create a new one.
CommitAndStop()
LockInsideApply()
LockOutsideApply()
}
type batchTx struct {
@@ -63,10 +65,34 @@ type batchTx struct {
pending int
}
// Lock is supposed to be called only by the unit test.
func (t *batchTx) Lock() {
ValidateCalledInsideUnittest(t.backend.lg)
t.lock()
}
func (t *batchTx) lock() {
t.Mutex.Lock()
}
func (t *batchTx) LockInsideApply() {
t.lock()
if t.backend.txPostLockInsideApplyHook != nil {
// The callers of some methods (i.e., (*RaftCluster).AddMember)
// can be coming from both InsideApply and OutsideApply, but the
// callers from OutsideApply will have a nil txPostLockInsideApplyHook.
// So we should check the txPostLockInsideApplyHook before validating
// the callstack.
ValidateCalledInsideApply(t.backend.lg)
t.backend.txPostLockInsideApplyHook()
}
}
func (t *batchTx) LockOutsideApply() {
ValidateCalledOutSideApply(t.backend.lg)
t.lock()
}
func (t *batchTx) Unlock() {
if t.pending >= t.backend.batchLimit {
t.commit(false)
@@ -214,14 +240,14 @@ func unsafeForEach(tx *bolt.Tx, bucket Bucket, visitor func(k, v []byte) error)
// Commit commits a previous tx and begins a new writable one.
func (t *batchTx) Commit() {
t.Lock()
t.lock()
t.commit(false)
t.Unlock()
}
// CommitAndStop commits the previous tx and does not create a new one.
func (t *batchTx) CommitAndStop() {
t.Lock()
t.lock()
t.commit(true)
t.Unlock()
}
@@ -291,13 +317,13 @@ func (t *batchTxBuffered) Unlock() {
}
func (t *batchTxBuffered) Commit() {
t.Lock()
t.lock()
t.commit(false)
t.Unlock()
}
func (t *batchTxBuffered) CommitAndStop() {
t.Lock()
t.lock()
t.commit(true)
t.Unlock()
}

View File

@@ -83,6 +83,13 @@ var (
// highest bucket start of 0.01 sec * 2^16 == 655.36 sec
Buckets: prometheus.ExponentialBuckets(.01, 2, 17),
})
isDefragActive = prometheus.NewGauge(prometheus.GaugeOpts{
Namespace: "etcd",
Subsystem: "disk",
Name: "defrag_inflight",
Help: "Whether or not defrag is active on the member. 1 means active, 0 means not.",
})
)
func init() {
@@ -92,4 +99,5 @@ func init() {
prometheus.MustRegister(writeSec)
prometheus.MustRegister(defragSec)
prometheus.MustRegister(snapshotTransferSec)
prometheus.MustRegister(isDefragActive)
}

View File

@@ -0,0 +1,70 @@
// Copyright 2022 The etcd Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package backend
import (
"os"
"runtime/debug"
"strings"
"go.uber.org/zap"
)
const (
ENV_VERIFY = "ETCD_VERIFY"
ENV_VERIFY_ALL_VALUE = "all"
ENV_VERIFY_LOCK = "lock"
)
func ValidateCalledInsideApply(lg *zap.Logger) {
if !verifyLockEnabled() {
return
}
if !insideApply() {
lg.Panic("Called outside of APPLY!", zap.Stack("stacktrace"))
}
}
func ValidateCalledOutSideApply(lg *zap.Logger) {
if !verifyLockEnabled() {
return
}
if insideApply() {
lg.Panic("Called inside of APPLY!", zap.Stack("stacktrace"))
}
}
func ValidateCalledInsideUnittest(lg *zap.Logger) {
if !verifyLockEnabled() {
return
}
if !insideUnittest() {
lg.Fatal("Lock called outside of unit test!", zap.Stack("stacktrace"))
}
}
func verifyLockEnabled() bool {
return os.Getenv(ENV_VERIFY) == ENV_VERIFY_ALL_VALUE || os.Getenv(ENV_VERIFY) == ENV_VERIFY_LOCK
}
func insideApply() bool {
stackTraceStr := string(debug.Stack())
return strings.Contains(stackTraceStr, ".applyEntries")
}
func insideUnittest() bool {
stackTraceStr := string(debug.Stack())
return strings.Contains(stackTraceStr, "_test.go") && !strings.Contains(stackTraceStr, "tests/")
}