diff --git a/pkg/registry/pod/etcd/etcd_test.go b/pkg/registry/pod/etcd/etcd_test.go index 1f70c7db167..ea146ddf497 100644 --- a/pkg/registry/pod/etcd/etcd_test.go +++ b/pkg/registry/pod/etcd/etcd_test.go @@ -20,7 +20,6 @@ import ( "strings" "testing" - etcd "github.com/coreos/etcd/client" "golang.org/x/net/context" "k8s.io/kubernetes/pkg/api" "k8s.io/kubernetes/pkg/api/errors" @@ -140,7 +139,7 @@ type FailDeletionStorage struct { func (f FailDeletionStorage) Delete(ctx context.Context, key string, out runtime.Object) error { *f.Called = true - return etcd.Error{Code: etcd.ErrorCodeKeyNotFound} + return storage.NewKeyNotFoundError(key, 0) } func newFailDeleteStorage(t *testing.T, called *bool) (*REST, *etcdtesting.EtcdTestServer) { diff --git a/pkg/storage/errors.go b/pkg/storage/errors.go index 8fe1df643d2..64f2f716996 100644 --- a/pkg/storage/errors.go +++ b/pkg/storage/errors.go @@ -16,30 +16,91 @@ limitations under the License. package storage -import ( - etcdutil "k8s.io/kubernetes/pkg/storage/etcd/util" +import "fmt" + +const ( + ErrCodeKeyNotFound int = iota + 1 + ErrCodeKeyExists + ErrCodeResourceVersionConflicts + ErrCodeUnreachable ) +var errCodeToMessage = map[int]string{ + ErrCodeKeyNotFound: "key not found", + ErrCodeKeyExists: "key exists", + ErrCodeResourceVersionConflicts: "resource version conflicts", + ErrCodeUnreachable: "server unreachable", +} + +func NewKeyNotFoundError(key string, rv int64) *StorageError { + return &StorageError{ + Code: ErrCodeKeyNotFound, + Key: key, + ResourceVersion: rv, + } +} + +func NewKeyExistsError(key string, rv int64) *StorageError { + return &StorageError{ + Code: ErrCodeKeyExists, + Key: key, + ResourceVersion: rv, + } +} + +func NewResourceVersionConflictsError(key string, rv int64) *StorageError { + return &StorageError{ + Code: ErrCodeResourceVersionConflicts, + Key: key, + ResourceVersion: rv, + } +} + +func NewUnreachableError(key string, rv int64) *StorageError { + return &StorageError{ + Code: ErrCodeUnreachable, + Key: key, + ResourceVersion: rv, + } +} + +type StorageError struct { + Code int + Key string + ResourceVersion int64 +} + +func (e *StorageError) Error() string { + return fmt.Sprintf("StorageError: %s, Code: %d, Key: %s, ResourceVersion: %d", + errCodeToMessage[e.Code], e.Code, e.Key, e.ResourceVersion) +} + // IsNotFound returns true if and only if err is "key" not found error. func IsNotFound(err error) bool { - // TODO: add alternate storage error here - return etcdutil.IsEtcdNotFound(err) + return isErrCode(err, ErrCodeKeyNotFound) } // IsNodeExist returns true if and only if err is an node already exist error. func IsNodeExist(err error) bool { - // TODO: add alternate storage error here - return etcdutil.IsEtcdNodeExist(err) + return isErrCode(err, ErrCodeKeyExists) } // IsUnreachable returns true if and only if err indicates the server could not be reached. func IsUnreachable(err error) bool { - // TODO: add alternate storage error here - return etcdutil.IsEtcdUnreachable(err) + return isErrCode(err, ErrCodeUnreachable) } // IsTestFailed returns true if and only if err is a write conflict. func IsTestFailed(err error) bool { - // TODO: add alternate storage error here - return etcdutil.IsEtcdTestFailed(err) + return isErrCode(err, ErrCodeResourceVersionConflicts) +} + +func isErrCode(err error, code int) bool { + if err == nil { + return false + } + if e, ok := err.(*StorageError); ok { + return e.Code == code + } + return false } diff --git a/pkg/storage/etcd/etcd_helper.go b/pkg/storage/etcd/etcd_helper.go index 469b2ed88f1..c86fe7ac7d8 100644 --- a/pkg/storage/etcd/etcd_helper.go +++ b/pkg/storage/etcd/etcd_helper.go @@ -217,7 +217,7 @@ func (h *etcdHelper) Create(ctx context.Context, key string, obj, out runtime.Ob metrics.RecordEtcdRequestLatency("create", getTypeName(obj), startTime) trace.Step("Object created") if err != nil { - return err + return toStorageErr(err, key, 0) } if out != nil { if _, err := conversion.EnforcePtr(out); err != nil { @@ -264,7 +264,7 @@ func (h *etcdHelper) Set(ctx context.Context, key string, obj, out runtime.Objec response, err = h.etcdKeysAPI.Set(ctx, key, string(data), &opts) metrics.RecordEtcdRequestLatency("compareAndSwap", getTypeName(obj), startTime) if err != nil { - return err + return toStorageErr(err, key, int64(version)) } } if create { @@ -276,7 +276,7 @@ func (h *etcdHelper) Set(ctx context.Context, key string, obj, out runtime.Objec } response, err = h.etcdKeysAPI.Set(ctx, key, string(data), &opts) if err != nil { - return err + return toStorageErr(err, key, 0) } metrics.RecordEtcdRequestLatency("create", getTypeName(obj), startTime) } @@ -310,7 +310,7 @@ func (h *etcdHelper) Delete(ctx context.Context, key string, out runtime.Object) _, _, err = h.extractObj(response, err, out, false, true) } } - return err + return toStorageErr(err, key, 0) } // Implements storage.Interface. @@ -367,12 +367,11 @@ func (h *etcdHelper) bodyAndExtractObj(ctx context.Context, key string, objPtr r response, err := h.etcdKeysAPI.Get(ctx, key, opts) metrics.RecordEtcdRequestLatency("get", getTypeName(objPtr), startTime) - if err != nil && !etcdutil.IsEtcdNotFound(err) { - return "", nil, nil, err + return "", nil, nil, toStorageErr(err, key, 0) } body, node, err = h.extractObj(response, err, objPtr, ignoreNotFound, false) - return body, node, response, err + return body, node, response, toStorageErr(err, key, 0) } func (h *etcdHelper) extractObj(response *etcd.Response, inErr error, objPtr runtime.Object, ignoreNotFound, prevNode bool) (body string, node *etcd.Node, err error) { @@ -434,7 +433,7 @@ func (h *etcdHelper) GetToList(ctx context.Context, key string, filter storage.F if etcdutil.IsEtcdNotFound(err) { return nil } - return err + return toStorageErr(err, key, 0) } nodes := make([]*etcd.Node, 0) @@ -541,7 +540,7 @@ func (h *etcdHelper) listEtcdNode(ctx context.Context, key string) ([]*etcd.Node if etcdutil.IsEtcdNotFound(err) { return nodes, index, nil } else { - return nodes, index, err + return nodes, index, toStorageErr(err, key, 0) } } return result.Node.Nodes, result.Index, nil @@ -623,7 +622,7 @@ func (h *etcdHelper) GuaranteedUpdate(ctx context.Context, key string, ptrToType continue } _, _, err = h.extractObj(response, err, ptrToType, false, false) - return err + return toStorageErr(err, key, 0) } if string(data) == origBody { @@ -647,7 +646,7 @@ func (h *etcdHelper) GuaranteedUpdate(ctx context.Context, key string, ptrToType continue } _, _, err = h.extractObj(response, err, ptrToType, false, false) - return err + return toStorageErr(err, key, int64(index)) } } @@ -712,3 +711,21 @@ func (h *etcdHelper) addToCache(index uint64, obj runtime.Object) { metrics.ObserveNewEntry() } } + +func toStorageErr(err error, key string, rv int64) error { + if err == nil { + return nil + } + switch { + case etcdutil.IsEtcdNotFound(err): + return storage.NewKeyNotFoundError(key, rv) + case etcdutil.IsEtcdNodeExist(err): + return storage.NewKeyExistsError(key, rv) + case etcdutil.IsEtcdTestFailed(err): + return storage.NewResourceVersionConflictsError(key, rv) + case etcdutil.IsEtcdUnreachable(err): + return storage.NewUnreachableError(key, rv) + default: + return err + } +} diff --git a/pkg/storage/etcd/etcd_helper_test.go b/pkg/storage/etcd/etcd_helper_test.go index 5df9a63d5a4..ca02a18f77f 100644 --- a/pkg/storage/etcd/etcd_helper_test.go +++ b/pkg/storage/etcd/etcd_helper_test.go @@ -35,7 +35,6 @@ import ( "k8s.io/kubernetes/pkg/storage" "k8s.io/kubernetes/pkg/storage/etcd/etcdtest" etcdtesting "k8s.io/kubernetes/pkg/storage/etcd/testing" - etcdutil "k8s.io/kubernetes/pkg/storage/etcd/util" storagetesting "k8s.io/kubernetes/pkg/storage/testing" ) @@ -251,7 +250,7 @@ func TestGetNotFoundErr(t *testing.T) { var got api.Pod err := helper.Get(context.TODO(), boguskey, &got, false) - if !etcdutil.IsEtcdNotFound(err) { + if !storage.IsNotFound(err) { t.Errorf("Unexpected reponse on key=%v, err=%v", key, err) } } diff --git a/pkg/storage/etcd/etcd_watcher.go b/pkg/storage/etcd/etcd_watcher.go index d9bfd3722ff..eff5ed6e6c1 100644 --- a/pkg/storage/etcd/etcd_watcher.go +++ b/pkg/storage/etcd/etcd_watcher.go @@ -221,7 +221,7 @@ func etcdGetInitialWatchState(ctx context.Context, client etcd.KeysAPI, key stri if err != nil { if !etcdutil.IsEtcdNotFound(err) { utilruntime.HandleError(fmt.Errorf("watch was unable to retrieve the current index for the provided key (%q): %v", key, err)) - return resourceVersion, err + return resourceVersion, toStorageErr(err, key, 0) } if etcdError, ok := err.(etcd.Error); ok { resourceVersion = etcdError.Index