diff --git a/cluster/images/etcd/rollback/rollback.go b/cluster/images/etcd/rollback/rollback.go index 76cdb066f8d..5ae0e821a8a 100644 --- a/cluster/images/etcd/rollback/rollback.go +++ b/cluster/images/etcd/rollback/rollback.go @@ -26,6 +26,8 @@ import ( "strings" "time" + wal237 "k8s.io/kubernetes/third_party/forked/etcd237/wal" + "github.com/coreos/etcd/etcdserver" pb "github.com/coreos/etcd/etcdserver/etcdserverpb" "github.com/coreos/etcd/etcdserver/membership" @@ -42,7 +44,7 @@ import ( "github.com/golang/glog" ) -const rollbackVersion = "2.3.7" +const rollbackVersion = "2.2.0" var ( migrateDatadir = flag.String("data-dir", "", "Path to the data directory") @@ -116,7 +118,7 @@ func main() { } walDir := path.Join(*migrateDatadir, "member", "wal") - w, err := wal.Create(walDir, metadata) + w, err := wal237.Create(walDir, metadata) if err != nil { glog.Fatal(err) } diff --git a/third_party/forked/etcd237/README.md b/third_party/forked/etcd237/README.md new file mode 100644 index 00000000000..821ef39d8c3 --- /dev/null +++ b/third_party/forked/etcd237/README.md @@ -0,0 +1 @@ +Forked from etcd 2.3 release branch to support migration from 3.0 WAL to 2.3 WAL format diff --git a/third_party/forked/etcd237/pkg/fileutil/BUILD b/third_party/forked/etcd237/pkg/fileutil/BUILD new file mode 100644 index 00000000000..38be7fe2eb3 --- /dev/null +++ b/third_party/forked/etcd237/pkg/fileutil/BUILD @@ -0,0 +1,36 @@ +package(default_visibility = ["//visibility:public"]) + +licenses(["notice"]) + +load( + "@io_bazel_rules_go//go:def.bzl", + "go_library", + "go_test", +) + +go_test( + name = "go_default_test", + srcs = [ + "fileutil_test.go", + "lock_test.go", + "preallocate_test.go", + "purge_test.go", + ], + library = "go_default_library", + tags = ["automanaged"], + deps = [], +) + +go_library( + name = "go_default_library", + srcs = [ + "fileutil.go", + "lock.go", + "lock_unix.go", + "preallocate.go", + "purge.go", + "sync_linux.go", + ], + tags = ["automanaged"], + deps = ["//vendor:github.com/coreos/pkg/capnslog"], +) diff --git a/third_party/forked/etcd237/pkg/fileutil/fileutil.go b/third_party/forked/etcd237/pkg/fileutil/fileutil.go new file mode 100644 index 00000000000..145886a1a03 --- /dev/null +++ b/third_party/forked/etcd237/pkg/fileutil/fileutil.go @@ -0,0 +1,75 @@ +// Copyright 2015 CoreOS, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +// Package fileutil implements utility functions related to files and paths. +package fileutil + +import ( + "io/ioutil" + "os" + "path" + "sort" + + "github.com/coreos/pkg/capnslog" +) + +const ( + privateFileMode = 0600 + // owner can make/remove files inside the directory + privateDirMode = 0700 +) + +var ( + plog = capnslog.NewPackageLogger("github.com/coreos/etcd/pkg", "fileutil") +) + +// IsDirWriteable checks if dir is writable by writing and removing a file +// to dir. It returns nil if dir is writable. +func IsDirWriteable(dir string) error { + f := path.Join(dir, ".touch") + if err := ioutil.WriteFile(f, []byte(""), privateFileMode); err != nil { + return err + } + return os.Remove(f) +} + +// ReadDir returns the filenames in the given directory in sorted order. +func ReadDir(dirpath string) ([]string, error) { + dir, err := os.Open(dirpath) + if err != nil { + return nil, err + } + defer dir.Close() + names, err := dir.Readdirnames(-1) + if err != nil { + return nil, err + } + sort.Strings(names) + return names, nil +} + +// TouchDirAll is similar to os.MkdirAll. It creates directories with 0700 permission if any directory +// does not exists. TouchDirAll also ensures the given directory is writable. +func TouchDirAll(dir string) error { + err := os.MkdirAll(dir, privateDirMode) + if err != nil && err != os.ErrExist { + return err + } + return IsDirWriteable(dir) +} + +func Exist(name string) bool { + _, err := os.Stat(name) + return err == nil +} diff --git a/third_party/forked/etcd237/pkg/fileutil/fileutil_test.go b/third_party/forked/etcd237/pkg/fileutil/fileutil_test.go new file mode 100644 index 00000000000..b2c644110f6 --- /dev/null +++ b/third_party/forked/etcd237/pkg/fileutil/fileutil_test.go @@ -0,0 +1,96 @@ +// Copyright 2015 CoreOS, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package fileutil + +import ( + "io/ioutil" + "os" + "os/user" + "path/filepath" + "reflect" + "testing" +) + +func TestIsDirWriteable(t *testing.T) { + tmpdir, err := ioutil.TempDir("", "") + if err != nil { + t.Fatalf("unexpected ioutil.TempDir error: %v", err) + } + defer os.RemoveAll(tmpdir) + if err = IsDirWriteable(tmpdir); err != nil { + t.Fatalf("unexpected IsDirWriteable error: %v", err) + } + if err = os.Chmod(tmpdir, 0444); err != nil { + t.Fatalf("unexpected os.Chmod error: %v", err) + } + me, err := user.Current() + if err != nil { + // err can be non-nil when cross compiled + // http://stackoverflow.com/questions/20609415/cross-compiling-user-current-not-implemented-on-linux-amd64 + t.Skipf("failed to get current user: %v", err) + } + if me.Name == "root" || me.Name == "Administrator" { + // ideally we should check CAP_DAC_OVERRIDE. + // but it does not matter for tests. + t.Skipf("running as a superuser") + } + if err := IsDirWriteable(tmpdir); err == nil { + t.Fatalf("expected IsDirWriteable to error") + } +} + +func TestReadDir(t *testing.T) { + tmpdir, err := ioutil.TempDir("", "") + defer os.RemoveAll(tmpdir) + if err != nil { + t.Fatalf("unexpected ioutil.TempDir error: %v", err) + } + files := []string{"def", "abc", "xyz", "ghi"} + for _, f := range files { + var fh *os.File + fh, err = os.Create(filepath.Join(tmpdir, f)) + if err != nil { + t.Fatalf("error creating file: %v", err) + } + if err = fh.Close(); err != nil { + t.Fatalf("error closing file: %v", err) + } + } + fs, err := ReadDir(tmpdir) + if err != nil { + t.Fatalf("error calling ReadDir: %v", err) + } + wfs := []string{"abc", "def", "ghi", "xyz"} + if !reflect.DeepEqual(fs, wfs) { + t.Fatalf("ReadDir: got %v, want %v", fs, wfs) + } +} + +func TestExist(t *testing.T) { + f, err := ioutil.TempFile(os.TempDir(), "fileutil") + if err != nil { + t.Fatal(err) + } + f.Close() + + if g := Exist(f.Name()); g != true { + t.Errorf("exist = %v, want true", g) + } + + os.Remove(f.Name()) + if g := Exist(f.Name()); g != false { + t.Errorf("exist = %v, want false", g) + } +} diff --git a/third_party/forked/etcd237/pkg/fileutil/lock.go b/third_party/forked/etcd237/pkg/fileutil/lock.go new file mode 100644 index 00000000000..bf411d3a179 --- /dev/null +++ b/third_party/forked/etcd237/pkg/fileutil/lock.go @@ -0,0 +1,29 @@ +// Copyright 2016 CoreOS, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package fileutil + +type Lock interface { + // Name returns the name of the file. + Name() string + // TryLock acquires exclusivity on the lock without blocking. + TryLock() error + // Lock acquires exclusivity on the lock. + Lock() error + // Unlock unlocks the lock. + Unlock() error + // Destroy should be called after Unlock to clean up + // the resources. + Destroy() error +} diff --git a/third_party/forked/etcd237/pkg/fileutil/lock_plan9.go b/third_party/forked/etcd237/pkg/fileutil/lock_plan9.go new file mode 100644 index 00000000000..bd2bc867645 --- /dev/null +++ b/third_party/forked/etcd237/pkg/fileutil/lock_plan9.go @@ -0,0 +1,79 @@ +// Copyright 2015 CoreOS, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package fileutil + +import ( + "errors" + "os" + "syscall" + "time" +) + +var ( + ErrLocked = errors.New("file already locked") +) + +type lock struct { + fname string + file *os.File +} + +func (l *lock) Name() string { + return l.fname +} + +func (l *lock) TryLock() error { + err := os.Chmod(l.fname, syscall.DMEXCL|0600) + if err != nil { + return err + } + + f, err := os.Open(l.fname) + if err != nil { + return ErrLocked + } + + l.file = f + return nil +} + +func (l *lock) Lock() error { + err := os.Chmod(l.fname, syscall.DMEXCL|0600) + if err != nil { + return err + } + + for { + f, err := os.Open(l.fname) + if err == nil { + l.file = f + return nil + } + time.Sleep(10 * time.Millisecond) + } +} + +func (l *lock) Unlock() error { + return l.file.Close() +} + +func (l *lock) Destroy() error { + return nil +} + +func NewLock(file string) (Lock, error) { + l := &lock{fname: file} + return l, nil +} diff --git a/third_party/forked/etcd237/pkg/fileutil/lock_solaris.go b/third_party/forked/etcd237/pkg/fileutil/lock_solaris.go new file mode 100644 index 00000000000..e3b0a017683 --- /dev/null +++ b/third_party/forked/etcd237/pkg/fileutil/lock_solaris.go @@ -0,0 +1,87 @@ +// Copyright 2015 CoreOS, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +// +build solaris + +package fileutil + +import ( + "errors" + "os" + "syscall" +) + +var ( + ErrLocked = errors.New("file already locked") +) + +type lock struct { + fd int + file *os.File +} + +func (l *lock) Name() string { + return l.file.Name() +} + +func (l *lock) TryLock() error { + var lock syscall.Flock_t + lock.Start = 0 + lock.Len = 0 + lock.Pid = 0 + lock.Type = syscall.F_WRLCK + lock.Whence = 0 + lock.Pid = 0 + err := syscall.FcntlFlock(uintptr(l.fd), syscall.F_SETLK, &lock) + if err != nil && err == syscall.EAGAIN { + return ErrLocked + } + return err +} + +func (l *lock) Lock() error { + var lock syscall.Flock_t + lock.Start = 0 + lock.Len = 0 + lock.Type = syscall.F_WRLCK + lock.Whence = 0 + lock.Pid = 0 + return syscall.FcntlFlock(uintptr(l.fd), syscall.F_SETLK, &lock) +} + +func (l *lock) Unlock() error { + var lock syscall.Flock_t + lock.Start = 0 + lock.Len = 0 + lock.Type = syscall.F_UNLCK + lock.Whence = 0 + err := syscall.FcntlFlock(uintptr(l.fd), syscall.F_SETLK, &lock) + if err != nil && err == syscall.EAGAIN { + return ErrLocked + } + return err +} + +func (l *lock) Destroy() error { + return l.file.Close() +} + +func NewLock(file string) (Lock, error) { + f, err := os.OpenFile(file, os.O_WRONLY, 0600) + if err != nil { + return nil, err + } + l := &lock{int(f.Fd()), f} + return l, nil +} diff --git a/third_party/forked/etcd237/pkg/fileutil/lock_test.go b/third_party/forked/etcd237/pkg/fileutil/lock_test.go new file mode 100644 index 00000000000..abd4a1037c8 --- /dev/null +++ b/third_party/forked/etcd237/pkg/fileutil/lock_test.go @@ -0,0 +1,96 @@ +// Copyright 2015 CoreOS, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package fileutil + +import ( + "io/ioutil" + "os" + "testing" + "time" +) + +func TestLockAndUnlock(t *testing.T) { + f, err := ioutil.TempFile("", "lock") + if err != nil { + t.Fatal(err) + } + f.Close() + defer func() { + err = os.Remove(f.Name()) + if err != nil { + t.Fatal(err) + } + }() + + // lock the file + l, err := NewLock(f.Name()) + if err != nil { + t.Fatal(err) + } + defer l.Destroy() + err = l.Lock() + if err != nil { + t.Fatal(err) + } + + // try lock a locked file + dupl, err := NewLock(f.Name()) + if err != nil { + t.Fatal(err) + } + err = dupl.TryLock() + if err != ErrLocked { + t.Errorf("err = %v, want %v", err, ErrLocked) + } + + // unlock the file + err = l.Unlock() + if err != nil { + t.Fatal(err) + } + + // try lock the unlocked file + err = dupl.TryLock() + if err != nil { + t.Errorf("err = %v, want %v", err, nil) + } + defer dupl.Destroy() + + // blocking on locked file + locked := make(chan struct{}, 1) + go func() { + l.Lock() + locked <- struct{}{} + }() + + select { + case <-locked: + t.Error("unexpected unblocking") + case <-time.After(100 * time.Millisecond): + } + + // unlock + err = dupl.Unlock() + if err != nil { + t.Fatal(err) + } + + // the previously blocked routine should be unblocked + select { + case <-locked: + case <-time.After(1 * time.Second): + t.Error("unexpected blocking") + } +} diff --git a/third_party/forked/etcd237/pkg/fileutil/lock_unix.go b/third_party/forked/etcd237/pkg/fileutil/lock_unix.go new file mode 100644 index 00000000000..4f90e42aced --- /dev/null +++ b/third_party/forked/etcd237/pkg/fileutil/lock_unix.go @@ -0,0 +1,65 @@ +// Copyright 2015 CoreOS, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +// +build !windows,!plan9,!solaris + +package fileutil + +import ( + "errors" + "os" + "syscall" +) + +var ( + ErrLocked = errors.New("file already locked") +) + +type lock struct { + fd int + file *os.File +} + +func (l *lock) Name() string { + return l.file.Name() +} + +func (l *lock) TryLock() error { + err := syscall.Flock(l.fd, syscall.LOCK_EX|syscall.LOCK_NB) + if err != nil && err == syscall.EWOULDBLOCK { + return ErrLocked + } + return err +} + +func (l *lock) Lock() error { + return syscall.Flock(l.fd, syscall.LOCK_EX) +} + +func (l *lock) Unlock() error { + return syscall.Flock(l.fd, syscall.LOCK_UN) +} + +func (l *lock) Destroy() error { + return l.file.Close() +} + +func NewLock(file string) (Lock, error) { + f, err := os.Open(file) + if err != nil { + return nil, err + } + l := &lock{int(f.Fd()), f} + return l, nil +} diff --git a/third_party/forked/etcd237/pkg/fileutil/lock_windows.go b/third_party/forked/etcd237/pkg/fileutil/lock_windows.go new file mode 100644 index 00000000000..ddca9a66959 --- /dev/null +++ b/third_party/forked/etcd237/pkg/fileutil/lock_windows.go @@ -0,0 +1,60 @@ +// Copyright 2015 CoreOS, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +// +build windows + +package fileutil + +import ( + "errors" + "os" +) + +var ( + ErrLocked = errors.New("file already locked") +) + +type lock struct { + fd int + file *os.File +} + +func (l *lock) Name() string { + return l.file.Name() +} + +func (l *lock) TryLock() error { + return nil +} + +func (l *lock) Lock() error { + return nil +} + +func (l *lock) Unlock() error { + return nil +} + +func (l *lock) Destroy() error { + return l.file.Close() +} + +func NewLock(file string) (Lock, error) { + f, err := os.Open(file) + if err != nil { + return nil, err + } + l := &lock{int(f.Fd()), f} + return l, nil +} diff --git a/third_party/forked/etcd237/pkg/fileutil/perallocate_unsupported.go b/third_party/forked/etcd237/pkg/fileutil/perallocate_unsupported.go new file mode 100644 index 00000000000..c1a952bb796 --- /dev/null +++ b/third_party/forked/etcd237/pkg/fileutil/perallocate_unsupported.go @@ -0,0 +1,28 @@ +// Copyright 2015 CoreOS, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +// +build !linux + +package fileutil + +import "os" + +// Preallocate tries to allocate the space for given +// file. This operation is only supported on linux by a +// few filesystems (btrfs, ext4, etc.). +// If the operation is unsupported, no error will be returned. +// Otherwise, the error encountered will be returned. +func Preallocate(f *os.File, sizeInBytes int) error { + return nil +} diff --git a/third_party/forked/etcd237/pkg/fileutil/preallocate.go b/third_party/forked/etcd237/pkg/fileutil/preallocate.go new file mode 100644 index 00000000000..c4bd4f4c815 --- /dev/null +++ b/third_party/forked/etcd237/pkg/fileutil/preallocate.go @@ -0,0 +1,42 @@ +// Copyright 2015 CoreOS, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +// +build linux + +package fileutil + +import ( + "os" + "syscall" +) + +// Preallocate tries to allocate the space for given +// file. This operation is only supported on linux by a +// few filesystems (btrfs, ext4, etc.). +// If the operation is unsupported, no error will be returned. +// Otherwise, the error encountered will be returned. +func Preallocate(f *os.File, sizeInBytes int) error { + // use mode = 1 to keep size + // see FALLOC_FL_KEEP_SIZE + err := syscall.Fallocate(int(f.Fd()), 1, 0, int64(sizeInBytes)) + if err != nil { + errno, ok := err.(syscall.Errno) + // treat not support as nil error + if ok && errno == syscall.ENOTSUP { + return nil + } + return err + } + return nil +} diff --git a/third_party/forked/etcd237/pkg/fileutil/preallocate_test.go b/third_party/forked/etcd237/pkg/fileutil/preallocate_test.go new file mode 100644 index 00000000000..d5f2a71f304 --- /dev/null +++ b/third_party/forked/etcd237/pkg/fileutil/preallocate_test.go @@ -0,0 +1,53 @@ +// Copyright 2015 CoreOS, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package fileutil + +import ( + "io/ioutil" + "os" + "runtime" + "testing" +) + +func TestPreallocate(t *testing.T) { + if runtime.GOOS != "linux" { + t.Skipf("skip testPreallocate, OS = %s", runtime.GOOS) + } + + p, err := ioutil.TempDir(os.TempDir(), "preallocateTest") + if err != nil { + t.Fatal(err) + } + defer os.RemoveAll(p) + + f, err := ioutil.TempFile(p, "") + if err != nil { + t.Fatal(err) + } + + size := 64 * 1000 + err = Preallocate(f, size) + if err != nil { + t.Fatal(err) + } + + stat, err := f.Stat() + if err != nil { + t.Fatal(err) + } + if stat.Size() != 0 { + t.Errorf("size = %d, want %d", stat.Size(), 0) + } +} diff --git a/third_party/forked/etcd237/pkg/fileutil/purge.go b/third_party/forked/etcd237/pkg/fileutil/purge.go new file mode 100644 index 00000000000..375aa971974 --- /dev/null +++ b/third_party/forked/etcd237/pkg/fileutil/purge.go @@ -0,0 +1,80 @@ +// Copyright 2015 CoreOS, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package fileutil + +import ( + "os" + "path" + "sort" + "strings" + "time" +) + +func PurgeFile(dirname string, suffix string, max uint, interval time.Duration, stop <-chan struct{}) <-chan error { + errC := make(chan error, 1) + go func() { + for { + fnames, err := ReadDir(dirname) + if err != nil { + errC <- err + return + } + newfnames := make([]string, 0) + for _, fname := range fnames { + if strings.HasSuffix(fname, suffix) { + newfnames = append(newfnames, fname) + } + } + sort.Strings(newfnames) + for len(newfnames) > int(max) { + f := path.Join(dirname, newfnames[0]) + l, err := NewLock(f) + if err != nil { + errC <- err + return + } + err = l.TryLock() + if err != nil { + break + } + err = os.Remove(f) + if err != nil { + errC <- err + return + } + err = l.Unlock() + if err != nil { + plog.Errorf("error unlocking %s when purging file (%v)", l.Name(), err) + errC <- err + return + } + err = l.Destroy() + if err != nil { + plog.Errorf("error destroying lock %s when purging file (%v)", l.Name(), err) + errC <- err + return + } + plog.Infof("purged file %s successfully", f) + newfnames = newfnames[1:] + } + select { + case <-time.After(interval): + case <-stop: + return + } + } + }() + return errC +} diff --git a/third_party/forked/etcd237/pkg/fileutil/purge_test.go b/third_party/forked/etcd237/pkg/fileutil/purge_test.go new file mode 100644 index 00000000000..b11a4b05f20 --- /dev/null +++ b/third_party/forked/etcd237/pkg/fileutil/purge_test.go @@ -0,0 +1,161 @@ +// Copyright 2015 CoreOS, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package fileutil + +import ( + "fmt" + "io/ioutil" + "os" + "path" + "reflect" + "testing" + "time" +) + +func TestPurgeFile(t *testing.T) { + dir, err := ioutil.TempDir("", "purgefile") + if err != nil { + t.Fatal(err) + } + defer os.RemoveAll(dir) + + for i := 0; i < 5; i++ { + _, err = os.Create(path.Join(dir, fmt.Sprintf("%d.test", i))) + if err != nil { + t.Fatal(err) + } + } + + stop := make(chan struct{}) + + // keep at most 3 most recent files + errch := PurgeFile(dir, "test", 3, time.Millisecond, stop) + + // create 5 more files + for i := 5; i < 10; i++ { + _, err = os.Create(path.Join(dir, fmt.Sprintf("%d.test", i))) + if err != nil { + t.Fatal(err) + } + time.Sleep(10 * time.Millisecond) + } + + // purge routine should purge 7 out of 10 files and only keep the + // 3 most recent ones. + // wait for purging for at most 100ms. + var fnames []string + for i := 0; i < 10; i++ { + fnames, err = ReadDir(dir) + if err != nil { + t.Fatal(err) + } + if len(fnames) <= 3 { + break + } + time.Sleep(10 * time.Millisecond) + } + wnames := []string{"7.test", "8.test", "9.test"} + if !reflect.DeepEqual(fnames, wnames) { + t.Errorf("filenames = %v, want %v", fnames, wnames) + } + + // no error should be reported from purge routine + select { + case err := <-errch: + t.Errorf("unexpected purge error %v", err) + case <-time.After(time.Millisecond): + } + close(stop) +} + +func TestPurgeFileHoldingLock(t *testing.T) { + dir, err := ioutil.TempDir("", "purgefile") + if err != nil { + t.Fatal(err) + } + defer os.RemoveAll(dir) + + for i := 0; i < 10; i++ { + _, err = os.Create(path.Join(dir, fmt.Sprintf("%d.test", i))) + if err != nil { + t.Fatal(err) + } + } + + // create a purge barrier at 5 + l, err := NewLock(path.Join(dir, fmt.Sprintf("%d.test", 5))) + err = l.Lock() + if err != nil { + t.Fatal(err) + } + + stop := make(chan struct{}) + errch := PurgeFile(dir, "test", 3, time.Millisecond, stop) + + var fnames []string + for i := 0; i < 10; i++ { + fnames, err = ReadDir(dir) + if err != nil { + t.Fatal(err) + } + if len(fnames) <= 5 { + break + } + time.Sleep(10 * time.Millisecond) + } + wnames := []string{"5.test", "6.test", "7.test", "8.test", "9.test"} + if !reflect.DeepEqual(fnames, wnames) { + t.Errorf("filenames = %v, want %v", fnames, wnames) + } + + select { + case err = <-errch: + t.Errorf("unexpected purge error %v", err) + case <-time.After(time.Millisecond): + } + + // remove the purge barrier + err = l.Unlock() + if err != nil { + t.Fatal(err) + } + err = l.Destroy() + if err != nil { + t.Fatal(err) + } + + for i := 0; i < 10; i++ { + fnames, err = ReadDir(dir) + if err != nil { + t.Fatal(err) + } + if len(fnames) <= 3 { + break + } + time.Sleep(10 * time.Millisecond) + } + wnames = []string{"7.test", "8.test", "9.test"} + if !reflect.DeepEqual(fnames, wnames) { + t.Errorf("filenames = %v, want %v", fnames, wnames) + } + + select { + case err := <-errch: + t.Errorf("unexpected purge error %v", err) + case <-time.After(time.Millisecond): + } + + close(stop) +} diff --git a/third_party/forked/etcd237/pkg/fileutil/sync.go b/third_party/forked/etcd237/pkg/fileutil/sync.go new file mode 100644 index 00000000000..cd7fff08f64 --- /dev/null +++ b/third_party/forked/etcd237/pkg/fileutil/sync.go @@ -0,0 +1,26 @@ +// Copyright 2016 CoreOS, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +// +build !linux + +package fileutil + +import "os" + +// Fdatasync is similar to fsync(), but does not flush modified metadata +// unless that metadata is needed in order to allow a subsequent data retrieval +// to be correctly handled. +func Fdatasync(f *os.File) error { + return f.Sync() +} diff --git a/third_party/forked/etcd237/pkg/fileutil/sync_linux.go b/third_party/forked/etcd237/pkg/fileutil/sync_linux.go new file mode 100644 index 00000000000..14c4b4808e3 --- /dev/null +++ b/third_party/forked/etcd237/pkg/fileutil/sync_linux.go @@ -0,0 +1,29 @@ +// Copyright 2016 CoreOS, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +// +build linux + +package fileutil + +import ( + "os" + "syscall" +) + +// Fdatasync is similar to fsync(), but does not flush modified metadata +// unless that metadata is needed in order to allow a subsequent data retrieval +// to be correctly handled. +func Fdatasync(f *os.File) error { + return syscall.Fdatasync(int(f.Fd())) +} diff --git a/third_party/forked/etcd237/wal/BUILD b/third_party/forked/etcd237/wal/BUILD new file mode 100644 index 00000000000..bd7f9cd3094 --- /dev/null +++ b/third_party/forked/etcd237/wal/BUILD @@ -0,0 +1,32 @@ +package(default_visibility = ["//visibility:public"]) + +licenses(["notice"]) + +load( + "@io_bazel_rules_go//go:def.bzl", + "go_library", +) + +go_library( + name = "go_default_library", + srcs = [ + "decoder.go", + "doc.go", + "encoder.go", + "metrics.go", + "multi_readcloser.go", + "repair.go", + "util.go", + "wal.go", + ], + tags = ["automanaged"], + deps = [ + "//third_party/forked/etcd237/pkg/fileutil:go_default_library", + "//vendor:github.com/coreos/etcd/pkg/crc", + "//vendor:github.com/coreos/etcd/pkg/pbutil", + "//vendor:github.com/coreos/etcd/raft/raftpb", + "//vendor:github.com/coreos/etcd/wal/walpb", + "//vendor:github.com/coreos/pkg/capnslog", + "//vendor:github.com/prometheus/client_golang/prometheus", + ], +) diff --git a/third_party/forked/etcd237/wal/decoder.go b/third_party/forked/etcd237/wal/decoder.go new file mode 100644 index 00000000000..f75c919fba6 --- /dev/null +++ b/third_party/forked/etcd237/wal/decoder.go @@ -0,0 +1,103 @@ +// Copyright 2015 CoreOS, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package wal + +import ( + "bufio" + "encoding/binary" + "hash" + "io" + "sync" + + "github.com/coreos/etcd/pkg/crc" + "github.com/coreos/etcd/pkg/pbutil" + "github.com/coreos/etcd/raft/raftpb" + "github.com/coreos/etcd/wal/walpb" +) + +type decoder struct { + mu sync.Mutex + br *bufio.Reader + + c io.Closer + crc hash.Hash32 +} + +func newDecoder(rc io.ReadCloser) *decoder { + return &decoder{ + br: bufio.NewReader(rc), + c: rc, + crc: crc.New(0, crcTable), + } +} + +func (d *decoder) decode(rec *walpb.Record) error { + d.mu.Lock() + defer d.mu.Unlock() + + rec.Reset() + l, err := readInt64(d.br) + if err != nil { + return err + } + data := make([]byte, l) + if _, err = io.ReadFull(d.br, data); err != nil { + // ReadFull returns io.EOF only if no bytes were read + // the decoder should treat this as an ErrUnexpectedEOF instead. + if err == io.EOF { + err = io.ErrUnexpectedEOF + } + return err + } + if err := rec.Unmarshal(data); err != nil { + return err + } + // skip crc checking if the record type is crcType + if rec.Type == crcType { + return nil + } + d.crc.Write(rec.Data) + return rec.Validate(d.crc.Sum32()) +} + +func (d *decoder) updateCRC(prevCrc uint32) { + d.crc = crc.New(prevCrc, crcTable) +} + +func (d *decoder) lastCRC() uint32 { + return d.crc.Sum32() +} + +func (d *decoder) close() error { + return d.c.Close() +} + +func mustUnmarshalEntry(d []byte) raftpb.Entry { + var e raftpb.Entry + pbutil.MustUnmarshal(&e, d) + return e +} + +func mustUnmarshalState(d []byte) raftpb.HardState { + var s raftpb.HardState + pbutil.MustUnmarshal(&s, d) + return s +} + +func readInt64(r io.Reader) (int64, error) { + var n int64 + err := binary.Read(r, binary.LittleEndian, &n) + return n, err +} diff --git a/third_party/forked/etcd237/wal/doc.go b/third_party/forked/etcd237/wal/doc.go new file mode 100644 index 00000000000..769b522f040 --- /dev/null +++ b/third_party/forked/etcd237/wal/doc.go @@ -0,0 +1,68 @@ +// Copyright 2015 CoreOS, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +/* +Package wal provides an implementation of a write ahead log that is used by +etcd. + +A WAL is created at a particular directory and is made up of a number of +segmented WAL files. Inside of each file the raft state and entries are appended +to it with the Save method: + + metadata := []byte{} + w, err := wal.Create("/var/lib/etcd", metadata) + ... + err := w.Save(s, ents) + +After saving an raft snapshot to disk, SaveSnapshot method should be called to +record it. So WAL can match with the saved snapshot when restarting. + + err := w.SaveSnapshot(walpb.Snapshot{Index: 10, Term: 2}) + +When a user has finished using a WAL it must be closed: + + w.Close() + +WAL files are placed inside of the directory in the following format: +$seq-$index.wal + +The first WAL file to be created will be 0000000000000000-0000000000000000.wal +indicating an initial sequence of 0 and an initial raft index of 0. The first +entry written to WAL MUST have raft index 0. + +WAL will cuts its current wal files if its size exceeds 8MB. This will increment an internal +sequence number and cause a new file to be created. If the last raft index saved +was 0x20 and this is the first time cut has been called on this WAL then the sequence will +increment from 0x0 to 0x1. The new file will be: 0000000000000001-0000000000000021.wal. +If a second cut issues 0x10 entries with incremental index later then the file will be called: +0000000000000002-0000000000000031.wal. + +At a later time a WAL can be opened at a particular snapshot. If there is no +snapshot, an empty snapshot should be passed in. + + w, err := wal.Open("/var/lib/etcd", walpb.Snapshot{Index: 10, Term: 2}) + ... + +The snapshot must have been written to the WAL. + +Additional items cannot be Saved to this WAL until all of the items from the given +snapshot to the end of the WAL are read first: + + metadata, state, ents, err := w.ReadAll() + +This will give you the metadata, the last raft.State and the slice of +raft.Entry items in the log. + +*/ +package wal diff --git a/third_party/forked/etcd237/wal/encoder.go b/third_party/forked/etcd237/wal/encoder.go new file mode 100644 index 00000000000..f5b73fe12b4 --- /dev/null +++ b/third_party/forked/etcd237/wal/encoder.go @@ -0,0 +1,89 @@ +// Copyright 2015 CoreOS, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package wal + +import ( + "bufio" + "encoding/binary" + "hash" + "io" + "sync" + + "github.com/coreos/etcd/pkg/crc" + "github.com/coreos/etcd/wal/walpb" +) + +type encoder struct { + mu sync.Mutex + bw *bufio.Writer + + crc hash.Hash32 + buf []byte + uint64buf []byte +} + +func newEncoder(w io.Writer, prevCrc uint32) *encoder { + return &encoder{ + bw: bufio.NewWriter(w), + crc: crc.New(prevCrc, crcTable), + // 1MB buffer + buf: make([]byte, 1024*1024), + uint64buf: make([]byte, 8), + } +} + +func (e *encoder) encode(rec *walpb.Record) error { + e.mu.Lock() + defer e.mu.Unlock() + + e.crc.Write(rec.Data) + rec.Crc = e.crc.Sum32() + var ( + data []byte + err error + n int + ) + + if rec.Size() > len(e.buf) { + data, err = rec.Marshal() + if err != nil { + return err + } + } else { + n, err = rec.MarshalTo(e.buf) + if err != nil { + return err + } + data = e.buf[:n] + } + if err = writeInt64(e.bw, int64(len(data)), e.uint64buf); err != nil { + return err + } + _, err = e.bw.Write(data) + return err +} + +func (e *encoder) flush() error { + e.mu.Lock() + defer e.mu.Unlock() + return e.bw.Flush() +} + +func writeInt64(w io.Writer, n int64, buf []byte) error { + // http://golang.org/src/encoding/binary/binary.go + binary.LittleEndian.PutUint64(buf, uint64(n)) + _, err := w.Write(buf) + return err +} diff --git a/third_party/forked/etcd237/wal/metrics.go b/third_party/forked/etcd237/wal/metrics.go new file mode 100644 index 00000000000..ed270fac634 --- /dev/null +++ b/third_party/forked/etcd237/wal/metrics.go @@ -0,0 +1,38 @@ +// Copyright 2015 CoreOS, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package wal + +import "github.com/prometheus/client_golang/prometheus" + +var ( + syncDurations = prometheus.NewHistogram(prometheus.HistogramOpts{ + Namespace: "etcd", + Subsystem: "wal", + Name: "fsync_durations_seconds", + Help: "The latency distributions of fsync called by wal.", + Buckets: prometheus.ExponentialBuckets(0.001, 2, 14), + }) + lastIndexSaved = prometheus.NewGauge(prometheus.GaugeOpts{ + Namespace: "etcd", + Subsystem: "wal", + Name: "last_index_saved", + Help: "The index of the last entry saved by wal.", + }) +) + +func init() { + prometheus.MustRegister(syncDurations) + prometheus.MustRegister(lastIndexSaved) +} diff --git a/third_party/forked/etcd237/wal/multi_readcloser.go b/third_party/forked/etcd237/wal/multi_readcloser.go new file mode 100644 index 00000000000..513c6d17d94 --- /dev/null +++ b/third_party/forked/etcd237/wal/multi_readcloser.go @@ -0,0 +1,45 @@ +// Copyright 2015 CoreOS, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package wal + +import "io" + +type multiReadCloser struct { + closers []io.Closer + reader io.Reader +} + +func (mc *multiReadCloser) Close() error { + var err error + for i := range mc.closers { + err = mc.closers[i].Close() + } + return err +} + +func (mc *multiReadCloser) Read(p []byte) (int, error) { + return mc.reader.Read(p) +} + +func MultiReadCloser(readClosers ...io.ReadCloser) io.ReadCloser { + cs := make([]io.Closer, len(readClosers)) + rs := make([]io.Reader, len(readClosers)) + for i := range readClosers { + cs[i] = readClosers[i] + rs[i] = readClosers[i] + } + r := io.MultiReader(rs...) + return &multiReadCloser{cs, r} +} diff --git a/third_party/forked/etcd237/wal/repair.go b/third_party/forked/etcd237/wal/repair.go new file mode 100644 index 00000000000..95a62596de4 --- /dev/null +++ b/third_party/forked/etcd237/wal/repair.go @@ -0,0 +1,107 @@ +// Copyright 2015 CoreOS, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package wal + +import ( + "io" + "os" + "path" + + "k8s.io/kubernetes/third_party/forked/etcd237/pkg/fileutil" + + "github.com/coreos/etcd/wal/walpb" +) + +// Repair tries to repair ErrUnexpectedEOF in the +// last wal file by truncating. +func Repair(dirpath string) bool { + f, err := openLast(dirpath) + if err != nil { + return false + } + defer f.Close() + + n := 0 + rec := &walpb.Record{} + + decoder := newDecoder(f) + defer decoder.close() + for { + err := decoder.decode(rec) + switch err { + case nil: + n += 8 + rec.Size() + // update crc of the decoder when necessary + switch rec.Type { + case crcType: + crc := decoder.crc.Sum32() + // current crc of decoder must match the crc of the record. + // do no need to match 0 crc, since the decoder is a new one at this case. + if crc != 0 && rec.Validate(crc) != nil { + return false + } + decoder.updateCRC(rec.Crc) + } + continue + case io.EOF: + return true + case io.ErrUnexpectedEOF: + plog.Noticef("repairing %v", f.Name()) + bf, bferr := os.Create(f.Name() + ".broken") + if bferr != nil { + plog.Errorf("could not repair %v, failed to create backup file", f.Name()) + return false + } + defer bf.Close() + + if _, err = f.Seek(0, os.SEEK_SET); err != nil { + plog.Errorf("could not repair %v, failed to read file", f.Name()) + return false + } + + if _, err = io.Copy(bf, f); err != nil { + plog.Errorf("could not repair %v, failed to copy file", f.Name()) + return false + } + + if err = f.Truncate(int64(n)); err != nil { + plog.Errorf("could not repair %v, failed to truncate file", f.Name()) + return false + } + if err = f.Sync(); err != nil { + plog.Errorf("could not repair %v, failed to sync file", f.Name()) + return false + } + return true + default: + plog.Errorf("could not repair error (%v)", err) + return false + } + } +} + +// openLast opens the last wal file for read and write. +func openLast(dirpath string) (*os.File, error) { + names, err := fileutil.ReadDir(dirpath) + if err != nil { + return nil, err + } + names = checkWalNames(names) + if len(names) == 0 { + return nil, ErrFileNotFound + } + last := path.Join(dirpath, names[len(names)-1]) + return os.OpenFile(last, os.O_RDWR, 0) +} diff --git a/third_party/forked/etcd237/wal/util.go b/third_party/forked/etcd237/wal/util.go new file mode 100644 index 00000000000..54699656550 --- /dev/null +++ b/third_party/forked/etcd237/wal/util.go @@ -0,0 +1,93 @@ +// Copyright 2015 CoreOS, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package wal + +import ( + "errors" + "fmt" + "strings" + + "k8s.io/kubernetes/third_party/forked/etcd237/pkg/fileutil" +) + +var ( + badWalName = errors.New("bad wal name") +) + +func Exist(dirpath string) bool { + names, err := fileutil.ReadDir(dirpath) + if err != nil { + return false + } + return len(names) != 0 +} + +// searchIndex returns the last array index of names whose raft index section is +// equal to or smaller than the given index. +// The given names MUST be sorted. +func searchIndex(names []string, index uint64) (int, bool) { + for i := len(names) - 1; i >= 0; i-- { + name := names[i] + _, curIndex, err := parseWalName(name) + if err != nil { + plog.Panicf("parse correct name should never fail: %v", err) + } + if index >= curIndex { + return i, true + } + } + return -1, false +} + +// names should have been sorted based on sequence number. +// isValidSeq checks whether seq increases continuously. +func isValidSeq(names []string) bool { + var lastSeq uint64 + for _, name := range names { + curSeq, _, err := parseWalName(name) + if err != nil { + plog.Panicf("parse correct name should never fail: %v", err) + } + if lastSeq != 0 && lastSeq != curSeq-1 { + return false + } + lastSeq = curSeq + } + return true +} + +func checkWalNames(names []string) []string { + wnames := make([]string, 0) + for _, name := range names { + if _, _, err := parseWalName(name); err != nil { + plog.Warningf("ignored file %v in wal", name) + continue + } + wnames = append(wnames, name) + } + return wnames +} + +func parseWalName(str string) (seq, index uint64, err error) { + if !strings.HasSuffix(str, ".wal") { + return 0, 0, badWalName + } + _, err = fmt.Sscanf(str, "%016x-%016x.wal", &seq, &index) + return seq, index, err +} + +func walName(seq, index uint64) string { + return fmt.Sprintf("%016x-%016x.wal", seq, index) +} diff --git a/third_party/forked/etcd237/wal/wal.go b/third_party/forked/etcd237/wal/wal.go new file mode 100644 index 00000000000..ee51728e97f --- /dev/null +++ b/third_party/forked/etcd237/wal/wal.go @@ -0,0 +1,571 @@ +// Copyright 2015 CoreOS, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package wal + +import ( + "errors" + "fmt" + "hash/crc32" + "io" + "os" + "path" + "reflect" + "sync" + "time" + + "k8s.io/kubernetes/third_party/forked/etcd237/pkg/fileutil" + + "github.com/coreos/etcd/pkg/pbutil" + "github.com/coreos/etcd/raft/raftpb" + "github.com/coreos/etcd/wal/walpb" + "github.com/coreos/pkg/capnslog" +) + +const ( + metadataType int64 = iota + 1 + entryType + stateType + crcType + snapshotType + + // the owner can make/remove files inside the directory + privateDirMode = 0700 + + // the expected size of each wal segment file. + // the actual size might be bigger than it. + segmentSizeBytes = 64 * 1000 * 1000 // 64MB +) + +var ( + plog = capnslog.NewPackageLogger("github.com/coreos/etcd", "wal") + + ErrMetadataConflict = errors.New("wal: conflicting metadata found") + ErrFileNotFound = errors.New("wal: file not found") + ErrCRCMismatch = errors.New("wal: crc mismatch") + ErrSnapshotMismatch = errors.New("wal: snapshot mismatch") + ErrSnapshotNotFound = errors.New("wal: snapshot not found") + crcTable = crc32.MakeTable(crc32.Castagnoli) +) + +// WAL is a logical representation of the stable storage. +// WAL is either in read mode or append mode but not both. +// A newly created WAL is in append mode, and ready for appending records. +// A just opened WAL is in read mode, and ready for reading records. +// The WAL will be ready for appending after reading out all the previous records. +type WAL struct { + dir string // the living directory of the underlay files + metadata []byte // metadata recorded at the head of each WAL + state raftpb.HardState // hardstate recorded at the head of WAL + + start walpb.Snapshot // snapshot to start reading + decoder *decoder // decoder to decode records + + mu sync.Mutex + f *os.File // underlay file opened for appending, sync + seq uint64 // sequence of the wal file currently used for writes + enti uint64 // index of the last entry saved to the wal + encoder *encoder // encoder to encode records + + locks []fileutil.Lock // the file locks the WAL is holding (the name is increasing) +} + +// Create creates a WAL ready for appending records. The given metadata is +// recorded at the head of each WAL file, and can be retrieved with ReadAll. +func Create(dirpath string, metadata []byte) (*WAL, error) { + if Exist(dirpath) { + return nil, os.ErrExist + } + + if err := os.MkdirAll(dirpath, privateDirMode); err != nil { + return nil, err + } + + p := path.Join(dirpath, walName(0, 0)) + f, err := os.OpenFile(p, os.O_WRONLY|os.O_APPEND|os.O_CREATE, 0600) + if err != nil { + return nil, err + } + l, err := fileutil.NewLock(f.Name()) + if err != nil { + return nil, err + } + if err = l.Lock(); err != nil { + return nil, err + } + + w := &WAL{ + dir: dirpath, + metadata: metadata, + seq: 0, + f: f, + encoder: newEncoder(f, 0), + } + w.locks = append(w.locks, l) + if err := w.saveCrc(0); err != nil { + return nil, err + } + if err := w.encoder.encode(&walpb.Record{Type: metadataType, Data: metadata}); err != nil { + return nil, err + } + if err := w.SaveSnapshot(walpb.Snapshot{}); err != nil { + return nil, err + } + return w, nil +} + +// Open opens the WAL at the given snap. +// The snap SHOULD have been previously saved to the WAL, or the following +// ReadAll will fail. +// The returned WAL is ready to read and the first record will be the one after +// the given snap. The WAL cannot be appended to before reading out all of its +// previous records. +func Open(dirpath string, snap walpb.Snapshot) (*WAL, error) { + return openAtIndex(dirpath, snap, true) +} + +// OpenForRead only opens the wal files for read. +// Write on a read only wal panics. +func OpenForRead(dirpath string, snap walpb.Snapshot) (*WAL, error) { + return openAtIndex(dirpath, snap, false) +} + +func openAtIndex(dirpath string, snap walpb.Snapshot, write bool) (*WAL, error) { + names, err := fileutil.ReadDir(dirpath) + if err != nil { + return nil, err + } + names = checkWalNames(names) + if len(names) == 0 { + return nil, ErrFileNotFound + } + + nameIndex, ok := searchIndex(names, snap.Index) + if !ok || !isValidSeq(names[nameIndex:]) { + return nil, ErrFileNotFound + } + + // open the wal files for reading + rcs := make([]io.ReadCloser, 0) + ls := make([]fileutil.Lock, 0) + for _, name := range names[nameIndex:] { + f, err := os.Open(path.Join(dirpath, name)) + if err != nil { + return nil, err + } + l, err := fileutil.NewLock(f.Name()) + if err != nil { + return nil, err + } + err = l.TryLock() + if err != nil { + if write { + return nil, err + } + } + rcs = append(rcs, f) + ls = append(ls, l) + } + rc := MultiReadCloser(rcs...) + + // create a WAL ready for reading + w := &WAL{ + dir: dirpath, + start: snap, + decoder: newDecoder(rc), + locks: ls, + } + + if write { + // open the last wal file for appending + seq, _, err := parseWalName(names[len(names)-1]) + if err != nil { + rc.Close() + return nil, err + } + last := path.Join(dirpath, names[len(names)-1]) + + f, err := os.OpenFile(last, os.O_WRONLY|os.O_APPEND, 0) + if err != nil { + rc.Close() + return nil, err + } + err = fileutil.Preallocate(f, segmentSizeBytes) + if err != nil { + rc.Close() + plog.Errorf("failed to allocate space when creating new wal file (%v)", err) + return nil, err + } + + w.f = f + w.seq = seq + } + + return w, nil +} + +// ReadAll reads out records of the current WAL. +// If opened in write mode, it must read out all records until EOF. Or an error +// will be returned. +// If opened in read mode, it will try to read all records if possible. +// If it cannot read out the expected snap, it will return ErrSnapshotNotFound. +// If loaded snap doesn't match with the expected one, it will return +// all the records and error ErrSnapshotMismatch. +// TODO: detect not-last-snap error. +// TODO: maybe loose the checking of match. +// After ReadAll, the WAL will be ready for appending new records. +func (w *WAL) ReadAll() (metadata []byte, state raftpb.HardState, ents []raftpb.Entry, err error) { + w.mu.Lock() + defer w.mu.Unlock() + + rec := &walpb.Record{} + decoder := w.decoder + + var match bool + for err = decoder.decode(rec); err == nil; err = decoder.decode(rec) { + switch rec.Type { + case entryType: + e := mustUnmarshalEntry(rec.Data) + if e.Index > w.start.Index { + ents = append(ents[:e.Index-w.start.Index-1], e) + } + w.enti = e.Index + case stateType: + state = mustUnmarshalState(rec.Data) + case metadataType: + if metadata != nil && !reflect.DeepEqual(metadata, rec.Data) { + state.Reset() + return nil, state, nil, ErrMetadataConflict + } + metadata = rec.Data + case crcType: + crc := decoder.crc.Sum32() + // current crc of decoder must match the crc of the record. + // do no need to match 0 crc, since the decoder is a new one at this case. + if crc != 0 && rec.Validate(crc) != nil { + state.Reset() + return nil, state, nil, ErrCRCMismatch + } + decoder.updateCRC(rec.Crc) + case snapshotType: + var snap walpb.Snapshot + pbutil.MustUnmarshal(&snap, rec.Data) + if snap.Index == w.start.Index { + if snap.Term != w.start.Term { + state.Reset() + return nil, state, nil, ErrSnapshotMismatch + } + match = true + } + default: + state.Reset() + return nil, state, nil, fmt.Errorf("unexpected block type %d", rec.Type) + } + } + + switch w.f { + case nil: + // We do not have to read out all entries in read mode. + // The last record maybe a partial written one, so + // ErrunexpectedEOF might be returned. + if err != io.EOF && err != io.ErrUnexpectedEOF { + state.Reset() + return nil, state, nil, err + } + default: + // We must read all of the entries if WAL is opened in write mode. + if err != io.EOF { + state.Reset() + return nil, state, nil, err + } + } + + err = nil + if !match { + err = ErrSnapshotNotFound + } + + // close decoder, disable reading + w.decoder.close() + w.start = walpb.Snapshot{} + + w.metadata = metadata + + if w.f != nil { + // create encoder (chain crc with the decoder), enable appending + w.encoder = newEncoder(w.f, w.decoder.lastCRC()) + w.decoder = nil + lastIndexSaved.Set(float64(w.enti)) + } + + return metadata, state, ents, err +} + +// cut closes current file written and creates a new one ready to append. +// cut first creates a temp wal file and writes necessary headers into it. +// Then cut atomically rename temp wal file to a wal file. +func (w *WAL) cut() error { + // close old wal file + if err := w.sync(); err != nil { + return err + } + if err := w.f.Close(); err != nil { + return err + } + + fpath := path.Join(w.dir, walName(w.seq+1, w.enti+1)) + ftpath := fpath + ".tmp" + + // create a temp wal file with name sequence + 1, or truncate the existing one + ft, err := os.OpenFile(ftpath, os.O_WRONLY|os.O_APPEND|os.O_CREATE|os.O_TRUNC, 0600) + if err != nil { + return err + } + + // update writer and save the previous crc + w.f = ft + prevCrc := w.encoder.crc.Sum32() + w.encoder = newEncoder(w.f, prevCrc) + if err = w.saveCrc(prevCrc); err != nil { + return err + } + if err = w.encoder.encode(&walpb.Record{Type: metadataType, Data: w.metadata}); err != nil { + return err + } + if err = w.saveState(&w.state); err != nil { + return err + } + // close temp wal file + if err = w.sync(); err != nil { + return err + } + if err = w.f.Close(); err != nil { + return err + } + + // atomically move temp wal file to wal file + if err = os.Rename(ftpath, fpath); err != nil { + return err + } + + // open the wal file and update writer again + f, err := os.OpenFile(fpath, os.O_WRONLY|os.O_APPEND, 0600) + if err != nil { + return err + } + if err = fileutil.Preallocate(f, segmentSizeBytes); err != nil { + plog.Errorf("failed to allocate space when creating new wal file (%v)", err) + return err + } + + w.f = f + prevCrc = w.encoder.crc.Sum32() + w.encoder = newEncoder(w.f, prevCrc) + + // lock the new wal file + l, err := fileutil.NewLock(f.Name()) + if err != nil { + return err + } + + if err := l.Lock(); err != nil { + return err + } + w.locks = append(w.locks, l) + + // increase the wal seq + w.seq++ + + plog.Infof("segmented wal file %v is created", fpath) + return nil +} + +func (w *WAL) sync() error { + if w.encoder != nil { + if err := w.encoder.flush(); err != nil { + return err + } + } + start := time.Now() + err := fileutil.Fdatasync(w.f) + syncDurations.Observe(float64(time.Since(start)) / float64(time.Second)) + return err +} + +// ReleaseLockTo releases the locks, which has smaller index than the given index +// except the largest one among them. +// For example, if WAL is holding lock 1,2,3,4,5,6, ReleaseLockTo(4) will release +// lock 1,2 but keep 3. ReleaseLockTo(5) will release 1,2,3 but keep 4. +func (w *WAL) ReleaseLockTo(index uint64) error { + w.mu.Lock() + defer w.mu.Unlock() + + var smaller int + found := false + + for i, l := range w.locks { + _, lockIndex, err := parseWalName(path.Base(l.Name())) + if err != nil { + return err + } + if lockIndex >= index { + smaller = i - 1 + found = true + break + } + } + + // if no lock index is greater than the release index, we can + // release lock up to the last one(excluding). + if !found && len(w.locks) != 0 { + smaller = len(w.locks) - 1 + } + + if smaller <= 0 { + return nil + } + + for i := 0; i < smaller; i++ { + w.locks[i].Unlock() + w.locks[i].Destroy() + } + w.locks = w.locks[smaller:] + + return nil +} + +func (w *WAL) Close() error { + w.mu.Lock() + defer w.mu.Unlock() + + if w.f != nil { + if err := w.sync(); err != nil { + return err + } + if err := w.f.Close(); err != nil { + return err + } + } + for _, l := range w.locks { + err := l.Unlock() + if err != nil { + plog.Errorf("failed to unlock during closing wal: %s", err) + } + err = l.Destroy() + if err != nil { + plog.Errorf("failed to destroy lock during closing wal: %s", err) + } + } + return nil +} + +func (w *WAL) saveEntry(e *raftpb.Entry) error { + // TODO: add MustMarshalTo to reduce one allocation. + b := pbutil.MustMarshal(e) + rec := &walpb.Record{Type: entryType, Data: b} + if err := w.encoder.encode(rec); err != nil { + return err + } + w.enti = e.Index + lastIndexSaved.Set(float64(w.enti)) + return nil +} + +func (w *WAL) saveState(s *raftpb.HardState) error { + if isEmptyHardState(*s) { + return nil + } + w.state = *s + b := pbutil.MustMarshal(s) + rec := &walpb.Record{Type: stateType, Data: b} + return w.encoder.encode(rec) +} + +func (w *WAL) Save(st raftpb.HardState, ents []raftpb.Entry) error { + w.mu.Lock() + defer w.mu.Unlock() + + // short cut, do not call sync + if isEmptyHardState(st) && len(ents) == 0 { + return nil + } + + mustSync := mustSync(st, w.state, len(ents)) + + // TODO(xiangli): no more reference operator + for i := range ents { + if err := w.saveEntry(&ents[i]); err != nil { + return err + } + } + if err := w.saveState(&st); err != nil { + return err + } + + fstat, err := w.f.Stat() + if err != nil { + return err + } + if fstat.Size() < segmentSizeBytes { + if mustSync { + return w.sync() + } + return nil + } + // TODO: add a test for this code path when refactoring the tests + return w.cut() +} + +func (w *WAL) SaveSnapshot(e walpb.Snapshot) error { + w.mu.Lock() + defer w.mu.Unlock() + + b := pbutil.MustMarshal(&e) + rec := &walpb.Record{Type: snapshotType, Data: b} + if err := w.encoder.encode(rec); err != nil { + return err + } + // update enti only when snapshot is ahead of last index + if w.enti < e.Index { + w.enti = e.Index + } + lastIndexSaved.Set(float64(w.enti)) + return w.sync() +} + +func (w *WAL) saveCrc(prevCrc uint32) error { + return w.encoder.encode(&walpb.Record{Type: crcType, Crc: prevCrc}) +} + +func mustSync(st, prevst raftpb.HardState, entsnum int) bool { + // Persistent state on all servers: + // (Updated on stable storage before responding to RPCs) + // currentTerm + // votedFor + // log entries[] + if entsnum != 0 || st.Vote != prevst.Vote || st.Term != prevst.Term { + return true + } + return false +} + +func isHardStateEqual(a, b raftpb.HardState) bool { + return a.Term == b.Term && a.Vote == b.Vote && a.Commit == b.Commit +} + +var emptyState = raftpb.HardState{} + +func isEmptyHardState(st raftpb.HardState) bool { + return isHardStateEqual(st, emptyState) +} diff --git a/third_party/forked/etcd237/wal/walpb/BUILD b/third_party/forked/etcd237/wal/walpb/BUILD new file mode 100644 index 00000000000..805e0556813 --- /dev/null +++ b/third_party/forked/etcd237/wal/walpb/BUILD @@ -0,0 +1,18 @@ +package(default_visibility = ["//visibility:public"]) + +licenses(["notice"]) + +load( + "@io_bazel_rules_go//go:def.bzl", + "go_library", +) + +go_library( + name = "go_default_library", + srcs = [ + "record.go", + "record.pb.go", + ], + tags = ["automanaged"], + deps = ["//vendor:github.com/golang/protobuf/proto"], +) diff --git a/third_party/forked/etcd237/wal/walpb/record.go b/third_party/forked/etcd237/wal/walpb/record.go new file mode 100644 index 00000000000..bb536856976 --- /dev/null +++ b/third_party/forked/etcd237/wal/walpb/record.go @@ -0,0 +1,29 @@ +// Copyright 2015 CoreOS, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package walpb + +import "errors" + +var ( + ErrCRCMismatch = errors.New("walpb: crc mismatch") +) + +func (rec *Record) Validate(crc uint32) error { + if rec.Crc == crc { + return nil + } + rec.Reset() + return ErrCRCMismatch +} diff --git a/third_party/forked/etcd237/wal/walpb/record.pb.go b/third_party/forked/etcd237/wal/walpb/record.pb.go new file mode 100644 index 00000000000..52bdf56156f --- /dev/null +++ b/third_party/forked/etcd237/wal/walpb/record.pb.go @@ -0,0 +1,495 @@ +// Code generated by protoc-gen-gogo. +// source: record.proto +// DO NOT EDIT! + +/* + Package walpb is a generated protocol buffer package. + + It is generated from these files: + record.proto + + It has these top-level messages: + Record + Snapshot +*/ +package walpb + +import ( + "fmt" + + proto "github.com/golang/protobuf/proto" +) + +import math "math" + +import io "io" + +// Reference imports to suppress errors if they are not otherwise used. +var _ = proto.Marshal +var _ = fmt.Errorf +var _ = math.Inf + +type Record struct { + Type int64 `protobuf:"varint,1,opt,name=type" json:"type"` + Crc uint32 `protobuf:"varint,2,opt,name=crc" json:"crc"` + Data []byte `protobuf:"bytes,3,opt,name=data" json:"data,omitempty"` + XXX_unrecognized []byte `json:"-"` +} + +func (m *Record) Reset() { *m = Record{} } +func (m *Record) String() string { return proto.CompactTextString(m) } +func (*Record) ProtoMessage() {} + +type Snapshot struct { + Index uint64 `protobuf:"varint,1,opt,name=index" json:"index"` + Term uint64 `protobuf:"varint,2,opt,name=term" json:"term"` + XXX_unrecognized []byte `json:"-"` +} + +func (m *Snapshot) Reset() { *m = Snapshot{} } +func (m *Snapshot) String() string { return proto.CompactTextString(m) } +func (*Snapshot) ProtoMessage() {} + +func init() { + proto.RegisterType((*Record)(nil), "walpb.Record") + proto.RegisterType((*Snapshot)(nil), "walpb.Snapshot") +} +func (m *Record) Marshal() (data []byte, err error) { + size := m.Size() + data = make([]byte, size) + n, err := m.MarshalTo(data) + if err != nil { + return nil, err + } + return data[:n], nil +} + +func (m *Record) MarshalTo(data []byte) (int, error) { + var i int + _ = i + var l int + _ = l + data[i] = 0x8 + i++ + i = encodeVarintRecord(data, i, uint64(m.Type)) + data[i] = 0x10 + i++ + i = encodeVarintRecord(data, i, uint64(m.Crc)) + if m.Data != nil { + data[i] = 0x1a + i++ + i = encodeVarintRecord(data, i, uint64(len(m.Data))) + i += copy(data[i:], m.Data) + } + if m.XXX_unrecognized != nil { + i += copy(data[i:], m.XXX_unrecognized) + } + return i, nil +} + +func (m *Snapshot) Marshal() (data []byte, err error) { + size := m.Size() + data = make([]byte, size) + n, err := m.MarshalTo(data) + if err != nil { + return nil, err + } + return data[:n], nil +} + +func (m *Snapshot) MarshalTo(data []byte) (int, error) { + var i int + _ = i + var l int + _ = l + data[i] = 0x8 + i++ + i = encodeVarintRecord(data, i, uint64(m.Index)) + data[i] = 0x10 + i++ + i = encodeVarintRecord(data, i, uint64(m.Term)) + if m.XXX_unrecognized != nil { + i += copy(data[i:], m.XXX_unrecognized) + } + return i, nil +} + +func encodeFixed64Record(data []byte, offset int, v uint64) int { + data[offset] = uint8(v) + data[offset+1] = uint8(v >> 8) + data[offset+2] = uint8(v >> 16) + data[offset+3] = uint8(v >> 24) + data[offset+4] = uint8(v >> 32) + data[offset+5] = uint8(v >> 40) + data[offset+6] = uint8(v >> 48) + data[offset+7] = uint8(v >> 56) + return offset + 8 +} +func encodeFixed32Record(data []byte, offset int, v uint32) int { + data[offset] = uint8(v) + data[offset+1] = uint8(v >> 8) + data[offset+2] = uint8(v >> 16) + data[offset+3] = uint8(v >> 24) + return offset + 4 +} +func encodeVarintRecord(data []byte, offset int, v uint64) int { + for v >= 1<<7 { + data[offset] = uint8(v&0x7f | 0x80) + v >>= 7 + offset++ + } + data[offset] = uint8(v) + return offset + 1 +} +func (m *Record) Size() (n int) { + var l int + _ = l + n += 1 + sovRecord(uint64(m.Type)) + n += 1 + sovRecord(uint64(m.Crc)) + if m.Data != nil { + l = len(m.Data) + n += 1 + l + sovRecord(uint64(l)) + } + if m.XXX_unrecognized != nil { + n += len(m.XXX_unrecognized) + } + return n +} + +func (m *Snapshot) Size() (n int) { + var l int + _ = l + n += 1 + sovRecord(uint64(m.Index)) + n += 1 + sovRecord(uint64(m.Term)) + if m.XXX_unrecognized != nil { + n += len(m.XXX_unrecognized) + } + return n +} + +func sovRecord(x uint64) (n int) { + for { + n++ + x >>= 7 + if x == 0 { + break + } + } + return n +} +func sozRecord(x uint64) (n int) { + return sovRecord(uint64((x << 1) ^ uint64((int64(x) >> 63)))) +} +func (m *Record) Unmarshal(data []byte) error { + l := len(data) + iNdEx := 0 + for iNdEx < l { + preIndex := iNdEx + var wire uint64 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowRecord + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := data[iNdEx] + iNdEx++ + wire |= (uint64(b) & 0x7F) << shift + if b < 0x80 { + break + } + } + fieldNum := int32(wire >> 3) + wireType := int(wire & 0x7) + if wireType == 4 { + return fmt.Errorf("proto: Record: wiretype end group for non-group") + } + if fieldNum <= 0 { + return fmt.Errorf("proto: Record: illegal tag %d (wire type %d)", fieldNum, wire) + } + switch fieldNum { + case 1: + if wireType != 0 { + return fmt.Errorf("proto: wrong wireType = %d for field Type", wireType) + } + m.Type = 0 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowRecord + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := data[iNdEx] + iNdEx++ + m.Type |= (int64(b) & 0x7F) << shift + if b < 0x80 { + break + } + } + case 2: + if wireType != 0 { + return fmt.Errorf("proto: wrong wireType = %d for field Crc", wireType) + } + m.Crc = 0 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowRecord + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := data[iNdEx] + iNdEx++ + m.Crc |= (uint32(b) & 0x7F) << shift + if b < 0x80 { + break + } + } + case 3: + if wireType != 2 { + return fmt.Errorf("proto: wrong wireType = %d for field Data", wireType) + } + var byteLen int + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowRecord + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := data[iNdEx] + iNdEx++ + byteLen |= (int(b) & 0x7F) << shift + if b < 0x80 { + break + } + } + if byteLen < 0 { + return ErrInvalidLengthRecord + } + postIndex := iNdEx + byteLen + if postIndex > l { + return io.ErrUnexpectedEOF + } + m.Data = append(m.Data[:0], data[iNdEx:postIndex]...) + if m.Data == nil { + m.Data = []byte{} + } + iNdEx = postIndex + default: + iNdEx = preIndex + skippy, err := skipRecord(data[iNdEx:]) + if err != nil { + return err + } + if skippy < 0 { + return ErrInvalidLengthRecord + } + if (iNdEx + skippy) > l { + return io.ErrUnexpectedEOF + } + m.XXX_unrecognized = append(m.XXX_unrecognized, data[iNdEx:iNdEx+skippy]...) + iNdEx += skippy + } + } + + if iNdEx > l { + return io.ErrUnexpectedEOF + } + return nil +} +func (m *Snapshot) Unmarshal(data []byte) error { + l := len(data) + iNdEx := 0 + for iNdEx < l { + preIndex := iNdEx + var wire uint64 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowRecord + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := data[iNdEx] + iNdEx++ + wire |= (uint64(b) & 0x7F) << shift + if b < 0x80 { + break + } + } + fieldNum := int32(wire >> 3) + wireType := int(wire & 0x7) + if wireType == 4 { + return fmt.Errorf("proto: Snapshot: wiretype end group for non-group") + } + if fieldNum <= 0 { + return fmt.Errorf("proto: Snapshot: illegal tag %d (wire type %d)", fieldNum, wire) + } + switch fieldNum { + case 1: + if wireType != 0 { + return fmt.Errorf("proto: wrong wireType = %d for field Index", wireType) + } + m.Index = 0 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowRecord + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := data[iNdEx] + iNdEx++ + m.Index |= (uint64(b) & 0x7F) << shift + if b < 0x80 { + break + } + } + case 2: + if wireType != 0 { + return fmt.Errorf("proto: wrong wireType = %d for field Term", wireType) + } + m.Term = 0 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowRecord + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := data[iNdEx] + iNdEx++ + m.Term |= (uint64(b) & 0x7F) << shift + if b < 0x80 { + break + } + } + default: + iNdEx = preIndex + skippy, err := skipRecord(data[iNdEx:]) + if err != nil { + return err + } + if skippy < 0 { + return ErrInvalidLengthRecord + } + if (iNdEx + skippy) > l { + return io.ErrUnexpectedEOF + } + m.XXX_unrecognized = append(m.XXX_unrecognized, data[iNdEx:iNdEx+skippy]...) + iNdEx += skippy + } + } + + if iNdEx > l { + return io.ErrUnexpectedEOF + } + return nil +} +func skipRecord(data []byte) (n int, err error) { + l := len(data) + iNdEx := 0 + for iNdEx < l { + var wire uint64 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return 0, ErrIntOverflowRecord + } + if iNdEx >= l { + return 0, io.ErrUnexpectedEOF + } + b := data[iNdEx] + iNdEx++ + wire |= (uint64(b) & 0x7F) << shift + if b < 0x80 { + break + } + } + wireType := int(wire & 0x7) + switch wireType { + case 0: + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return 0, ErrIntOverflowRecord + } + if iNdEx >= l { + return 0, io.ErrUnexpectedEOF + } + iNdEx++ + if data[iNdEx-1] < 0x80 { + break + } + } + return iNdEx, nil + case 1: + iNdEx += 8 + return iNdEx, nil + case 2: + var length int + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return 0, ErrIntOverflowRecord + } + if iNdEx >= l { + return 0, io.ErrUnexpectedEOF + } + b := data[iNdEx] + iNdEx++ + length |= (int(b) & 0x7F) << shift + if b < 0x80 { + break + } + } + iNdEx += length + if length < 0 { + return 0, ErrInvalidLengthRecord + } + return iNdEx, nil + case 3: + for { + var innerWire uint64 + var start int = iNdEx + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return 0, ErrIntOverflowRecord + } + if iNdEx >= l { + return 0, io.ErrUnexpectedEOF + } + b := data[iNdEx] + iNdEx++ + innerWire |= (uint64(b) & 0x7F) << shift + if b < 0x80 { + break + } + } + innerWireType := int(innerWire & 0x7) + if innerWireType == 4 { + break + } + next, err := skipRecord(data[start:]) + if err != nil { + return 0, err + } + iNdEx = start + next + } + return iNdEx, nil + case 4: + return iNdEx, nil + case 5: + iNdEx += 4 + return iNdEx, nil + default: + return 0, fmt.Errorf("proto: illegal wireType %d", wireType) + } + } + panic("unreachable") +} + +var ( + ErrInvalidLengthRecord = fmt.Errorf("proto: negative length found during unmarshaling") + ErrIntOverflowRecord = fmt.Errorf("proto: integer overflow") +) diff --git a/third_party/forked/etcd237/wal/walpb/record.proto b/third_party/forked/etcd237/wal/walpb/record.proto new file mode 100644 index 00000000000..b694cb2338a --- /dev/null +++ b/third_party/forked/etcd237/wal/walpb/record.proto @@ -0,0 +1,20 @@ +syntax = "proto2"; +package walpb; + +import "gogoproto/gogo.proto"; + +option (gogoproto.marshaler_all) = true; +option (gogoproto.sizer_all) = true; +option (gogoproto.unmarshaler_all) = true; +option (gogoproto.goproto_getters_all) = false; + +message Record { + optional int64 type = 1 [(gogoproto.nullable) = false]; + optional uint32 crc = 2 [(gogoproto.nullable) = false]; + optional bytes data = 3; +} + +message Snapshot { + optional uint64 index = 1 [(gogoproto.nullable) = false]; + optional uint64 term = 2 [(gogoproto.nullable) = false]; +}